# -*- coding: utf-8 -*-
import itertools
import operator
import re
import attr
import distlib.markers
import packaging.version
import six
from packaging.markers import InvalidMarker, Marker
from packaging.specifiers import Specifier, SpecifierSet
from vistir.compat import Mapping, Set, lru_cache
from vistir.misc import dedup
from ..environment import MYPY_RUNNING
from ..exceptions import RequirementError
from .utils import filter_none, validate_markers
from six.moves import reduce # isort:skip
if MYPY_RUNNING:
from typing import Optional, List, Type, Any, Tuple, Union, AnyStr, Text, Iterator
STRING_TYPE = Union[str, bytes, Text]
MAX_VERSIONS = {1: 7, 2: 7, 3: 11, 4: 0}
DEPRECATED_VERSIONS = ["3.0", "3.1", "3.2", "3.3"]
[docs]def is_instance(item, cls):
# type: (Any, Type) -> bool
if isinstance(item, cls) or item.__class__.__name__ == cls.__name__:
return True
return False
[docs]@attr.s
class PipenvMarkers(object):
"""System-level requirements - see PEP508 for more detail"""
os_name = attr.ib(default=None, validator=attr.validators.optional(validate_markers))
sys_platform = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
platform_machine = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
platform_python_implementation = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
platform_release = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
platform_system = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
platform_version = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
python_version = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
python_full_version = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
implementation_name = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
implementation_version = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
@property
def line_part(self):
return " and ".join(
[
"{0} {1}".format(k, v)
for k, v in attr.asdict(self, filter=filter_none).items()
]
)
@property
def pipfile_part(self):
return {"markers": self.as_line}
[docs] @classmethod
def make_marker(cls, marker_string):
try:
marker = Marker(marker_string)
except InvalidMarker:
raise RequirementError(
"Invalid requirement: Invalid marker %r" % marker_string
)
return marker
[docs] @classmethod
def from_line(cls, line):
if ";" in line:
line = line.rsplit(";", 1)[1].strip()
marker = cls.make_marker(line)
return marker
[docs] @classmethod
def from_pipfile(cls, name, pipfile):
attr_fields = [field.name for field in attr.fields(cls)]
found_keys = [k for k in pipfile.keys() if k in attr_fields]
marker_strings = ["{0} {1}".format(k, pipfile[k]) for k in found_keys]
if pipfile.get("markers"):
marker_strings.append(pipfile.get("markers"))
markers = set()
for marker in marker_strings:
markers.add(marker)
combined_marker = None
try:
combined_marker = cls.make_marker(" and ".join(sorted(markers)))
except RequirementError:
pass
else:
return combined_marker
@lru_cache(maxsize=1024)
def _tuplize_version(version):
# type: (STRING_TYPE) -> Tuple[int, ...]
return tuple(int(x) for x in filter(lambda i: i != "*", version.split(".")))
@lru_cache(maxsize=1024)
def _format_version(version):
# type: (Tuple[int, ...]) -> STRING_TYPE
if not isinstance(version, six.string_types):
return ".".join(str(i) for i in version)
return version
# Prefer [x,y) ranges.
REPLACE_RANGES = {">": ">=", "<=": "<"}
@lru_cache(maxsize=1024)
def _format_pyspec(specifier):
# type: (Union[STRING_TYPE, Specifier]) -> Specifier
if isinstance(specifier, str):
if not any(op in specifier for op in Specifier._operators.keys()):
specifier = "=={0}".format(specifier)
specifier = Specifier(specifier)
version = getattr(specifier, "version", specifier).rstrip()
if version and version.endswith("*"):
if version.endswith(".*"):
version = version[:-2]
version = version.rstrip("*")
specifier = Specifier("{0}{1}".format(specifier.operator, version))
try:
op = REPLACE_RANGES[specifier.operator]
except KeyError:
return specifier
curr_tuple = _tuplize_version(version)
try:
next_tuple = (curr_tuple[0], curr_tuple[1] + 1)
except IndexError:
next_tuple = (curr_tuple[0], 1)
if not next_tuple[1] <= MAX_VERSIONS[next_tuple[0]]:
if specifier.operator == "<" and curr_tuple[1] <= MAX_VERSIONS[next_tuple[0]]:
op = "<="
next_tuple = (next_tuple[0], curr_tuple[1])
else:
return specifier
specifier = Specifier("{0}{1}".format(op, _format_version(next_tuple)))
return specifier
@lru_cache(maxsize=1024)
def _get_specs(specset):
if specset is None:
return
if is_instance(specset, Specifier):
new_specset = SpecifierSet()
specs = set()
specs.add(specset)
new_specset._specs = frozenset(specs)
specset = new_specset
if isinstance(specset, str):
specset = SpecifierSet(specset)
result = []
for spec in set(specset):
version = spec.version
op = spec.operator
if op in ("in", "not in"):
versions = version.split(",")
op = "==" if op == "in" else "!="
for ver in versions:
result.append((op, _tuplize_version(ver.strip())))
else:
result.append((spec.operator, _tuplize_version(spec.version)))
return sorted(result, key=operator.itemgetter(1))
# TODO: Rename this to something meaningful
def _group_by_op(specs):
# type: (Union[Set[Specifier], SpecifierSet]) -> Iterator
specs = [_get_specs(x) for x in list(specs)]
flattened = [
((op, len(version) > 2), version) for spec in specs for op, version in spec
]
specs = sorted(flattened)
grouping = itertools.groupby(specs, key=operator.itemgetter(0))
return grouping
# TODO: rename this to something meaningful
[docs]def normalize_specifier_set(specs):
# type: (Union[str, SpecifierSet]) -> Optional[Set[Specifier]]
"""Given a specifier set, a string, or an iterable, normalize the specifiers
.. note:: This function exists largely to deal with ``pyzmq`` which handles
the ``requires_python`` specifier incorrectly, using ``3.7*`` rather than
the correct form of ``3.7.*``. This workaround can likely go away if
we ever introduce enforcement for metadata standards on PyPI.
:param Union[str, SpecifierSet] specs: Supplied specifiers to normalize
:return: A new set of specifiers or specifierset
:rtype: Union[Set[Specifier], :class:`~packaging.specifiers.SpecifierSet`]
"""
if not specs:
return None
if isinstance(specs, set):
return specs
# when we aren't dealing with a string at all, we can normalize this as usual
elif not isinstance(specs, six.string_types):
return {_format_pyspec(spec) for spec in specs}
spec_list = []
for spec in specs.split(","):
spec = spec.strip()
if spec.endswith(".*"):
spec = spec[:-2]
spec = spec.rstrip("*")
spec_list.append(spec)
return normalize_specifier_set(SpecifierSet(",".join(spec_list)))
# TODO: Check if this is used by anything public otherwise make it private
# And rename it to something meaningful
[docs]def get_sorted_version_string(version_set):
# type: (Set[AnyStr]) -> AnyStr
version_list = sorted(
"{0}".format(_format_version(version)) for version in version_set
)
version = ", ".join(version_list)
return version
# TODO: Rename this to something meaningful
# TODO: Add a deprecation decorator and deprecate this -- i'm sure it's used
# in other libraries
[docs]@lru_cache(maxsize=1024)
def cleanup_pyspecs(specs, joiner="or"):
specs = normalize_specifier_set(specs)
# for != operator we want to group by version
# if all are consecutive, join as a list
results = {}
translation_map = {
# if we are doing an or operation, we need to use the min for >=
# this way OR(>=2.6, >=2.7, >=3.6) picks >=2.6
# if we do an AND operation we need to use MAX to be more selective
(">", ">="): {
"or": lambda x: _format_version(min(x)),
"and": lambda x: _format_version(max(x)),
},
# we use inverse logic here so we will take the max value if we are
# using OR but the min value if we are using AND
("<", "<="): {
"or": lambda x: _format_version(max(x)),
"and": lambda x: _format_version(min(x)),
},
# leave these the same no matter what operator we use
("!=", "==", "~=", "==="): {
"or": get_sorted_version_string,
"and": get_sorted_version_string,
},
}
op_translations = {
"!=": lambda x: "not in" if len(x) > 1 else "!=",
"==": lambda x: "in" if len(x) > 1 else "==",
}
translation_keys = list(translation_map.keys())
for op_and_version_type, versions in _group_by_op(tuple(specs)):
op = op_and_version_type[0]
versions = [version[1] for version in versions]
versions = sorted(dedup(versions))
op_key = next(iter(k for k in translation_keys if op in k), None)
version_value = versions
if op_key is not None:
version_value = translation_map[op_key][joiner](versions)
if op in op_translations:
op = op_translations[op](versions)
results[(op, op_and_version_type[1])] = version_value
return sorted([(k[0], v) for k, v in results.items()], key=operator.itemgetter(1))
# TODO: Rename this to something meaningful
[docs]@lru_cache(maxsize=1024)
def fix_version_tuple(version_tuple):
# type: (Tuple[AnyStr, AnyStr]) -> Tuple[AnyStr, AnyStr]
op, version = version_tuple
max_major = max(MAX_VERSIONS.keys())
if version[0] > max_major:
return (op, (max_major, MAX_VERSIONS[max_major]))
max_allowed = MAX_VERSIONS[version[0]]
if op == "<" and version[1] > max_allowed and version[1] - 1 <= max_allowed:
op = "<="
version = (version[0], version[1] - 1)
return (op, version)
# TODO: Rename this to something meaningful, deprecate it (See prior function)
[docs]@lru_cache(maxsize=128)
def get_versions(specset, group_by_operator=True):
# type: (Union[Set[Specifier], SpecifierSet], bool) -> List[Tuple[STRING_TYPE, STRING_TYPE]]
specs = [_get_specs(x) for x in list(tuple(specset))]
initial_sort_key = lambda k: (k[0], k[1])
initial_grouping_key = operator.itemgetter(0)
if not group_by_operator:
initial_grouping_key = operator.itemgetter(1)
initial_sort_key = operator.itemgetter(1)
version_tuples = sorted(
set((op, version) for spec in specs for op, version in spec), key=initial_sort_key
)
version_tuples = [fix_version_tuple(t) for t in version_tuples]
op_groups = [
(grp, list(map(operator.itemgetter(1), keys)))
for grp, keys in itertools.groupby(version_tuples, key=initial_grouping_key)
]
versions = [
(op, packaging.version.parse(".".join(str(v) for v in val)))
for op, vals in op_groups
for val in vals
]
return sorted(versions, key=operator.itemgetter(1))
def _ensure_marker(marker):
# type: (Union[STRING_TYPE, Marker]) -> Marker
if not is_instance(marker, Marker):
return Marker(str(marker))
return marker
[docs]def gen_marker(mkr):
# type: (List[STRING_TYPE]) -> Marker
m = Marker("python_version == '1'")
m._markers.pop()
m._markers.append(mkr)
return m
def _strip_extra(elements):
"""Remove the "extra == ..." operands from the list."""
return _strip_marker_elem("extra", elements)
def _strip_pyversion(elements):
return _strip_marker_elem("python_version", elements)
def _strip_marker_elem(elem_name, elements):
"""Remove the supplied element from the marker.
This is not a comprehensive implementation, but relies on an important
characteristic of metadata generation: The element's operand is always
associated with an "and" operator. This means that we can simply remove the
operand and the "and" operator associated with it.
"""
extra_indexes = []
preceding_operators = ["and"] if elem_name == "extra" else ["and", "or"]
for i, element in enumerate(elements):
if isinstance(element, list):
cancelled = _strip_marker_elem(elem_name, element)
if cancelled:
extra_indexes.append(i)
elif isinstance(element, tuple) and element[0].value == elem_name:
extra_indexes.append(i)
for i in reversed(extra_indexes):
del elements[i]
if i > 0 and elements[i - 1] in preceding_operators:
# Remove the "and" before it.
del elements[i - 1]
elif elements:
# This shouldn't ever happen, but is included for completeness.
# If there is not an "and" before this element, try to remove the
# operator after it.
del elements[0]
return not elements
def _get_stripped_marker(marker, strip_func):
"""Build a new marker which is cleaned according to `strip_func`"""
if not marker:
return None
marker = _ensure_marker(marker)
elements = marker._markers
strip_func(elements)
if elements:
return marker
return None
[docs]def get_without_pyversion(marker):
"""Built a new marker without the `python_version` part.
This could return `None` if the `python_version` section is the only section in the
marker.
"""
return _get_stripped_marker(marker, _strip_pyversion)
def _markers_collect_extras(markers, collection):
# Optimization: the marker element is usually appended at the end.
for el in reversed(markers):
if isinstance(el, tuple) and el[0].value == "extra" and el[1].value == "==":
collection.add(el[2].value)
elif isinstance(el, list):
_markers_collect_extras(el, collection)
def _markers_collect_pyversions(markers, collection):
local_collection = []
marker_format_str = "{0}"
for i, el in enumerate(reversed(markers)):
if isinstance(el, tuple) and el[0].value == "python_version":
new_marker = str(gen_marker(el))
local_collection.append(marker_format_str.format(new_marker))
elif isinstance(el, list):
_markers_collect_pyversions(el, local_collection)
if local_collection:
# local_collection = "{0}".format(" ".join(local_collection))
collection.extend(local_collection)
def _markers_contains_extra(markers):
# Optimization: the marker element is usually appended at the end.
return _markers_contains_key(markers, "extra")
def _markers_contains_pyversion(markers):
return _markers_contains_key(markers, "python_version")
def _markers_contains_key(markers, key):
for element in reversed(markers):
if isinstance(element, tuple) and element[0].value == key:
return True
elif isinstance(element, list):
if _markers_contains_key(element, key):
return True
return False
[docs]@lru_cache(maxsize=1024)
def get_contained_pyversions(marker):
"""Collect all `python_version` operands from a marker.
"""
collection = []
if not marker:
return set()
marker = _ensure_marker(marker)
# Collect the (Variable, Op, Value) tuples and string joiners from the marker
_markers_collect_pyversions(marker._markers, collection)
marker_str = " and ".join(sorted(collection))
if not marker_str:
return set()
# Use the distlib dictionary parser to create a dictionary 'trie' which is a bit
# easier to reason about
marker_dict = distlib.markers.parse_marker(marker_str)[0]
version_set = set()
pyversions, _ = parse_marker_dict(marker_dict)
if isinstance(pyversions, set):
version_set.update(pyversions)
elif pyversions is not None:
version_set.add(pyversions)
# Each distinct element in the set was separated by an "and" operator in the marker
# So we will need to reduce them with an intersection here rather than a union
# in order to find the boundaries
versions = set()
if version_set:
versions = reduce(lambda x, y: x & y, version_set)
return versions
[docs]@lru_cache(maxsize=128)
def contains_pyversion(marker):
"""Check whether a marker contains a python_version operand.
"""
if not marker:
return False
marker = _ensure_marker(marker)
return _markers_contains_pyversion(marker._markers)
def _split_specifierset_str(specset_str, prefix="=="):
# type: (str, str) -> Set[Specifier]
"""
Take a specifierset string and split it into a list to join for specifier sets
:param str specset_str: A string containing python versions, often comma separated
:param str prefix: A prefix to use when generating the specifier set
:return: A list of :class:`Specifier` instances generated with the provided prefix
:rtype: Set[Specifier]
"""
specifiers = set()
if "," not in specset_str and " " in specset_str:
values = [v.strip() for v in specset_str.split()]
else:
values = [v.strip() for v in specset_str.split(",")]
if prefix == "!=" and any(v in values for v in DEPRECATED_VERSIONS):
values += DEPRECATED_VERSIONS[:]
for value in sorted(values):
specifiers.add(Specifier("{0}{1}".format(prefix, value)))
return specifiers
def _get_specifiers_from_markers(marker_item):
"""
Given a marker item, get specifiers from the version marker
:param :class:`~packaging.markers.Marker` marker_sequence: A marker describing a version constraint
:return: A set of specifiers corresponding to the marker constraint
:rtype: Set[Specifier]
"""
specifiers = set()
if isinstance(marker_item, tuple):
variable, op, value = marker_item
if variable.value != "python_version":
return specifiers
if op.value == "in":
specifiers.update(_split_specifierset_str(value.value, prefix="=="))
elif op.value == "not in":
specifiers.update(_split_specifierset_str(value.value, prefix="!="))
else:
specifiers.add(Specifier("{0}{1}".format(op.value, value.value)))
elif isinstance(marker_item, list):
parts = get_specset(marker_item)
if parts:
specifiers.update(parts)
return specifiers
[docs]def get_specset(marker_list):
# type: (List) -> Optional[SpecifierSet]
specset = set()
_last_str = "and"
for marker_parts in marker_list:
if isinstance(marker_parts, str):
_last_str = marker_parts # noqa
else:
specset.update(_get_specifiers_from_markers(marker_parts))
specifiers = SpecifierSet()
specifiers._specs = frozenset(specset)
return specifiers
# TODO: Refactor this (reduce complexity)
[docs]def parse_marker_dict(marker_dict):
op = marker_dict["op"]
lhs = marker_dict["lhs"]
rhs = marker_dict["rhs"]
# This is where the spec sets for each side land if we have an "or" operator
side_spec_list = []
side_markers_list = []
finalized_marker = ""
# And if we hit the end of the parse tree we use this format string to make a marker
format_string = "{lhs} {op} {rhs}"
specset = SpecifierSet()
specs = set()
# Essentially we will iterate over each side of the parsed marker if either one is
# A mapping instance (i.e. a dictionary) and recursively parse and reduce the specset
# Union the "and" specs, intersect the "or"s to find the most appropriate range
if any(issubclass(type(side), Mapping) for side in (lhs, rhs)):
for side in (lhs, rhs):
side_specs = set()
side_markers = set()
if issubclass(type(side), Mapping):
merged_side_specs, merged_side_markers = parse_marker_dict(side)
side_specs.update(merged_side_specs)
side_markers.update(merged_side_markers)
else:
marker = _ensure_marker(side)
marker_parts = getattr(marker, "_markers", [])
if marker_parts[0][0].value == "python_version":
side_specs |= set(get_specset(marker_parts))
else:
side_markers.add(str(marker))
side_spec_list.append(side_specs)
side_markers_list.append(side_markers)
if op == "and":
# When we are "and"-ing things together, it probably makes the most sense
# to reduce them here into a single PySpec instance
specs = reduce(lambda x, y: set(x) | set(y), side_spec_list)
markers = reduce(lambda x, y: set(x) | set(y), side_markers_list)
if not specs and not markers:
return specset, finalized_marker
if markers and isinstance(markers, (tuple, list, Set)):
finalized_marker = Marker(" and ".join([m for m in markers if m]))
elif markers:
finalized_marker = str(markers)
specset._specs = frozenset(specs)
return specset, finalized_marker
# Actually when we "or" things as well we can also just turn them into a reduced
# set using this logic now
sides = reduce(lambda x, y: set(x) & set(y), side_spec_list)
finalized_marker = " or ".join(
[normalize_marker_str(m) for m in side_markers_list]
)
specset._specs = frozenset(sorted(sides))
return specset, finalized_marker
else:
# At the tip of the tree we are dealing with strings all around and they just need
# to be smashed together
specs = set()
if lhs == "python_version":
format_string = "{lhs}{op}{rhs}"
marker = Marker(format_string.format(**marker_dict))
marker_parts = getattr(marker, "_markers", [])
_set = get_specset(marker_parts)
if _set:
specs |= set(_set)
specset._specs = frozenset(specs)
return specset, finalized_marker
def _contains_micro_version(version_string):
return re.search(r"\d+\.\d+\.\d+", version_string) is not None
[docs]def normalize_marker_str(marker):
# type: (Union[Marker, STRING_TYPE]) -> str
marker_str = ""
if not marker:
return None
if not is_instance(marker, Marker):
marker = _ensure_marker(marker)
pyversion = get_contained_pyversions(marker)
marker = get_without_pyversion(marker)
if pyversion:
parts = cleanup_pyspecs(pyversion)
marker_str = " and ".join([format_pyversion(pv) for pv in parts])
if marker:
if marker_str:
marker_str = "{0!s} and {1!s}".format(marker_str, marker)
else:
marker_str = "{0!s}".format(marker)
return marker_str.replace('"', "'")
[docs]@lru_cache(maxsize=1024)
def marker_from_specifier(spec):
# type: (STRING_TYPE) -> Marker
if not any(spec.startswith(k) for k in Specifier._operators.keys()):
if spec.strip().lower() in ["any", "<any>", "*"]:
return None
spec = "=={0}".format(spec)
elif spec.startswith("==") and spec.count("=") > 3:
spec = "=={0}".format(spec.lstrip("="))
if not spec:
return None
marker_segments = []
for marker_segment in cleanup_pyspecs(spec):
marker_segments.append(format_pyversion(marker_segment))
marker_str = " and ".join(marker_segments).replace('"', "'")
return Marker(marker_str)
[docs]def merge_markers(m1, m2):
# type: (Marker, Marker) -> Optional[Marker]
if not all((m1, m2)):
return next(iter(v for v in (m1, m2) if v), None)
m1 = _ensure_marker(m1)
m2 = _ensure_marker(m2)
_markers = [] # type: List[Marker]
for marker in (m1, m2):
_markers.append(str(marker))
marker_str = " and ".join([normalize_marker_str(m) for m in _markers if m])
return _ensure_marker(normalize_marker_str(marker_str))