Source code for requirementslib.models.metadata

# -*- coding=utf-8 -*-
import datetime
import functools
import io
import json
import logging
import operator
import os
import zipfile
from collections import defaultdict

import attr
import dateutil.parser
import distlib.metadata
import distlib.wheel
import packaging.version
import requests
import six
import vistir
from packaging.markers import Marker
from packaging.requirements import Requirement as PackagingRequirement
from packaging.specifiers import Specifier, SpecifierSet
from packaging.tags import Tag

from ..environment import MYPY_RUNNING
from .markers import (
    get_contained_extras,
    get_contained_pyversions,
    get_without_extra,
    get_without_pyversion,
    marker_from_specifier,
    merge_markers,
    normalize_specifier_set,
)
from .requirements import Requirement
from .utils import filter_dict, get_pinned_version, is_pinned_requirement

# fmt: off
from six.moves import Sequence  # type: ignore  # isort:skip
from six.moves import reduce  # type: ignore # isort:skip
# fmt: on # isort:skip


ch = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


if MYPY_RUNNING:
    from typing import (
        Any,
        Callable,
        Dict,
        Generator,
        Generic,
        Iterator,
        List,
        Optional,
        Set,
        Tuple,
        Type,
        TypeVar,
        Union,
    )
    from attr import Attribute  # noqa
    from .setup_info import SetupInfo

    TAttrsClass = TypeVar("TAttrsClass")
    AttrsClass = Generic[TAttrsClass]
    TDigestDict = Dict[str, str]
    TProjectUrls = Dict[str, str]
    TReleaseUrlDict = Dict[str, Union[bool, int, str, TDigestDict]]
    TReleasesList = List[TReleaseUrlDict]
    TReleasesDict = Dict[str, TReleasesList]
    TDownloads = Dict[str, int]
    TPackageInfo = Dict[str, Optional[Union[str, List[str], TDownloads, TProjectUrls]]]
    TPackage = Dict[str, Union[TPackageInfo, int, TReleasesDict, TReleasesList]]


VALID_ALGORITHMS = {
    "sha1": 40,
    "sha3_224": 56,
    "sha512": 128,
    "blake2b": 128,
    "sha256": 64,
    "sha384": 96,
    "blake2s": 64,
    "sha3_256": 64,
    "sha3_512": 128,
    "md5": 32,
    "sha3_384": 96,
    "sha224": 56,
}  # type: Dict[str, int]

PACKAGE_TYPES = {
    "sdist",
    "bdist_wheel",
    "bdist_egg",
    "bdist_dumb",
    "bdist_wininst",
    "bdist_rpm",
    "bdist_msi",
    "bdist_dmg",
}


[docs]class PackageEncoder(json.JSONEncoder):
[docs] def default(self, obj): # noqa:E0202 # noqa:W0221 if isinstance(obj, datetime.datetime): return obj.isoformat() elif isinstance(obj, PackagingRequirement): return obj.__dict__ elif isinstance(obj, set): return tuple(obj) elif isinstance(obj, (Specifier, SpecifierSet, Marker)): return str(obj) else: return json.JSONEncoder.default(self, obj)
[docs]def validate_extras(inst, attrib, value): # type: ("Dependency", Attribute, Tuple[str, ...]) -> None duplicates = [k for k in value if value.count(k) > 1] if duplicates: raise ValueError("Found duplicate keys: {0}".format(", ".join(duplicates))) return None
[docs]def validate_digest(inst, attrib, value): # type: ("Digest", Attribute, str) -> None expected_length = VALID_ALGORITHMS[inst.algorithm.lower()] if len(value) != expected_length: raise ValueError( "Expected a digest of length {0!s}, got one of length {1!s}".format( expected_length, len(value) ) ) return None
[docs]def get_local_wheel_metadata(wheel_file): # type: (str) -> Optional[distlib.metadata.Metadata] parsed_metadata = None with io.open(wheel_file, "rb") as fh: with zipfile.ZipFile(fh, mode="r", compression=zipfile.ZIP_DEFLATED) as zf: metadata = None for fn in zf.namelist(): if os.path.basename(fn) == "METADATA": metadata = fn break if metadata is None: raise RuntimeError("No metadata found in wheel: {0}".format(wheel_file)) with zf.open(metadata, "r") as metadata_fh: parsed_metadata = distlib.metadata.Metadata(fileobj=metadata_fh) return parsed_metadata
[docs]def get_remote_sdist_metadata(line): # type: (str) -> SetupInfo req = Requirement.from_line(line) try: _ = req.run_requires() except SystemExit: raise RuntimeError("Failed to compute metadata for dependency {0}".format(line)) else: return req.line_instance.setup_info
[docs]def get_remote_wheel_metadata(whl_file): # type: (str) -> Optional[distlib.metadata.Metadata] parsed_metadata = None data = io.BytesIO() with vistir.contextmanagers.open_file(whl_file) as fp: for chunk in iter(lambda: fp.read(8096), b""): data.write(chunk) with zipfile.ZipFile(data, mode="r", compression=zipfile.ZIP_DEFLATED) as zf: metadata = None for fn in zf.namelist(): if os.path.basename(fn) == "METADATA": metadata = fn break if metadata is None: raise RuntimeError("No metadata found in wheel: {0}".format(whl_file)) with zf.open(metadata, "r") as metadata_fh: parsed_metadata = distlib.metadata.Metadata(fileobj=metadata_fh) return parsed_metadata
[docs]def create_specifierset(spec=None): # type: (Optional[str]) -> SpecifierSet if isinstance(spec, SpecifierSet): return spec elif isinstance(spec, (set, list, tuple)): spec = " and ".join(spec) if spec is None: spec = "" return SpecifierSet(spec)
[docs]@attr.s(frozen=True, eq=True) class ExtrasCollection(object): #: The name of the extras collection (e.g. 'security') name = attr.ib(type=str) #: The dependency the collection belongs to parent = attr.ib(type="Dependency") #: The members of the collection dependencies = attr.ib(factory=set) # type: Set["Dependency"]
[docs] def add_dependency(self, dependency): # type: ("Dependency") -> "ExtrasCollection" if not isinstance(dependency, Dependency): raise TypeError( "Expected a Dependency instance, received {0!r}".format(dependency) ) dependencies = self.dependencies.copy() dependencies.add(dependency) return attr.evolve(self, dependencies=dependencies)
[docs]@attr.s(frozen=True, eq=True) class Dependency(object): #: The name of the dependency name = attr.ib(type=str) #: A requirement instance requirement = attr.ib(type=PackagingRequirement, eq=False) #: The specifier defined in the dependency definition specifier = attr.ib(type=SpecifierSet, converter=create_specifierset, eq=False) #: Any extras this dependency declares extras = attr.ib(factory=tuple, validator=validate_extras) # type: Tuple[str, ...] #: The name of the extra meta-dependency this one came from (e.g. 'security') from_extras = attr.ib(default=None, eq=False) # type: Optional[str] #: The declared specifier set of allowable python versions for this dependency python_version = attr.ib( default="", type=SpecifierSet, converter=create_specifierset, eq=False ) #: The parent of this dependency (i.e. where it came from) parent = attr.ib(default=None) # type: Optional[Dependency] #: The markers for this dependency markers = attr.ib(default=None, eq=False) # type: Optional[Marker] _specset_str = attr.ib(default="", type=str) _python_version_str = attr.ib(default="", type=str) _marker_str = attr.ib(default="", type=str) def __str__(self): # type: () -> str return str(self.requirement)
[docs] def as_line(self): # type: () -> str line_str = "{0}".format(self.name) if self.extras: line_str = "{0}[{1}]".format(line_str, ",".join(self.extras)) if self.specifier: line_str = "{0}{1!s}".format(line_str, self.specifier) py_version_part = "" if self.python_version: specifiers = normalize_specifier_set(self.python_version) markers = [] if specifiers is not None: markers = [marker_from_specifier(str(s)) for s in specifiers] py_version_part = reduce(merge_markers, markers) if self.markers: line_str = "{0}; {1}".format(line_str, str(self.markers)) if py_version_part: line_str = "{0} and {1}".format(line_str, py_version_part) elif py_version_part and not self.markers: line_str = "{0}; {1}".format(line_str, py_version_part) return line_str
[docs] def pin(self): # type: () -> "Package" base_package = get_package(self.name) sorted_releases = sorted( base_package.releases.non_yanked_releases, key=operator.attrgetter("parsed_version"), reverse=True, ) version = next( iter(self.specifier.filter((r.version for r in sorted_releases))), None ) if not version: version = next( iter( self.specifier.filter( (r.version for r in sorted_releases), prereleases=True ) ), None, ) if not version: raise RuntimeError( "Failed to resolve {0} ({1!s})".format(self.name, self.specifier) ) match = get_package_version(self.name, str(version)) return match
[docs] @classmethod def from_requirement(cls, req, parent=None): # type: (PackagingRequirement, Optional["Dependency"]) -> "Dependency" from_extras, marker, python_version = None, None, None specset_str, py_version_str, marker_str = "", "", "" if req.marker: marker = Marker(str(req.marker)) from_extras = next(iter(list(get_contained_extras(marker))), None) python_version = get_contained_pyversions(marker) marker = get_without_extra(get_without_pyversion(marker)) if not str(marker) or not marker or not marker._markers: marker = None req.marker = marker if marker is not None: marker_str = str(marker) if req.specifier: specset_str = str(req.specifier) if python_version: py_version_str = str(python_version) return cls( name=req.name, specifier=req.specifier, extras=tuple(sorted(set(req.extras))) if req.extras is not None else req.extras, requirement=req, from_extras=from_extras, python_version=python_version, markers=marker, parent=parent, specset_str=specset_str, python_version_str=py_version_str, marker_str=marker_str, )
[docs] @classmethod def from_info(cls, info): # type: ("PackageInfo") -> "Dependency" marker_str = "" specset_str, py_version_str = "", "" if info.requires_python: # XXX: Some markers are improperly formatted -- we already handle most cases # XXX: but learned about new broken formats, such as # XXX: python_version in "2.6 2.7 3.2 3.3" (note the lack of commas) # XXX: as a marker on a dependency of a library called 'pickleshare' # XXX: Some packages also have invalid markers with stray characters, # XXX: such as 'algoliasearch' try: marker = marker_from_specifier(info.requires_python) except Exception: marker_str = "" else: if not marker or not marker._markers: marker_str = "" else: marker_str = "{0!s}".format(marker) req_str = "{0}=={1}".format(info.name, info.version) if marker_str: req_str = "{0}; {1}".format(req_str, marker_str) req = PackagingRequirement(req_str) requires_python_str = ( info.requires_python if info.requires_python is not None else "" ) if req.specifier: specset_str = str(req.specifier) if requires_python_str: py_version_str = requires_python_str return cls( name=info.name, specifier=req.specifier, extras=tuple(sorted(set(req.extras))) if req.extras is not None else req.extras, requirement=req, from_extras=None, python_version=SpecifierSet(requires_python_str), markers=None, parent=None, specset_str=specset_str, python_version_str=py_version_str, marker_str=marker_str, )
[docs] @classmethod def from_str(cls, depstr, parent=None): # type: (str, Optional["Dependency"]) -> "Dependency" try: req = PackagingRequirement(depstr) except Exception: raise return cls.from_requirement(req, parent=parent)
[docs] def add_parent(self, parent): # type: ("Dependency") -> "Dependency" return attr.evolve(self, parent=parent)
[docs]@attr.s(frozen=True, eq=True) class Digest(object): #: The algorithm declared for the digest, e.g. 'sha256' algorithm = attr.ib( type=str, validator=attr.validators.in_(VALID_ALGORITHMS.keys()), eq=True ) #: The digest value value = attr.ib(type=str, validator=validate_digest, eq=True) def __str__(self): # type: () -> str return "{0}:{1}".format(self.algorithm, self.value)
[docs] @classmethod def create(cls, algorithm, value): # type: (str, str) -> "Digest" return cls(algorithm=algorithm, value=value)
[docs] @classmethod def collection_from_dict(cls, digest_dict): # type: (TDigestDict) -> List["Digest"] return [cls.create(k, v) for k, v in digest_dict.items()]
# XXX: This is necessary because attrs converters can only be functions, not classmethods
[docs]def create_digest_collection(digest_dict): # type: (TDigestDict) -> List["Digest"] return Digest.collection_from_dict(digest_dict)
[docs]def instance_check_converter(expected_type=None, converter=None): # type: (Optional[Type], Optional[Callable]) -> Callable def _converter(val): if expected_type is not None and isinstance(val, expected_type): return val return converter(val) return _converter
[docs]@attr.s(frozen=True, eq=True) class ParsedTag(object): #: The marker string corresponding to the tag marker_string = attr.ib(default=None) # type: Optional[str] #: The python version represented by the tag python_version = attr.ib(default=None) # type: Optional[str] #: The platform represented by the tag platform_system = attr.ib(default=None) # type: Optional[str] #: the ABI represented by the tag abi = attr.ib(default=None) # type: Optional[str]
[docs]def parse_tag(tag): # type: (Tag) -> ParsedTag """Parse a :class:`~packaging.tags.Tag` instance. :param :class:`~packaging.tags.Tag` tag: A tag to parse :return: A parsed tag with combined markers, supported platform and python version :rtype: :class:`~ParsedTag` """ platform_system = None python_version = None version = None marker_str = "" if tag.platform.startswith("macos"): platform_system = "Darwin" elif tag.platform.startswith("manylinux") or tag.platform.startswith("linux"): platform_system = "Linux" elif tag.platform.startswith("win32"): platform_system = "Windows" if platform_system: marker_str = 'platform_system == "{}"'.format(platform_system) if tag.interpreter: version = tag.interpreter[2:] py_version_str = "" if len(version) == 1: py_version_str = ">={}.0,<{}".format(version, str(int(version) + 1)) elif len(version) > 1 and len(version) <= 3: # reverse the existing version so we can add 1 to the first element # and re-reverse, generating the new version, e.g. [3, 2, 8] => # [8, 2, 3] => [9, 2, 3] => [3, 2, 9] next_version_list = list(reversed(version[:])) next_version_list[0] = str(int(next_version_list[0]) + 1) next_version = ".".join(list(reversed(next_version_list))) version = ".".join(version) py_version_str = ">={},<{}".format(version, next_version) else: py_version_str = "{0}".format(version) python_version = marker_from_specifier(py_version_str) if python_version: if marker_str: marker_str = "{0} and {1!s}".format(marker_str, python_version) else: marker_str = str(python_version) return ParsedTag( marker_string=marker_str, python_version=version, platform_system=platform_system, abi=tag.abi, )
[docs]@attr.s(frozen=True, eq=True) class ReleaseUrl(object): #: The MD5 digest of the given release md5_digest = attr.ib(type=Digest) #: The package type of the url packagetype = attr.ib(type=str, validator=attr.validators.in_(PACKAGE_TYPES)) #: The upload timestamp from the package upload_time = attr.ib( type=datetime.datetime, converter=instance_check_converter(datetime.datetime, dateutil.parser.parse), # type: ignore ) #: The ISO8601 formatted upload timestamp of the package upload_time_iso_8601 = attr.ib( type=datetime.datetime, converter=instance_check_converter(datetime.datetime, dateutil.parser.parse), # type: ignore ) #: The size in bytes of the package size = attr.ib(type=int) #: The URL of the package url = attr.ib(type=str) #: The digests of the package digests = attr.ib( converter=instance_check_converter(list, create_digest_collection) # type: ignore ) # type: List[Digest] #: The name of the package name = attr.ib(type=str, default=None) #: The available comments of the given upload comment_text = attr.ib(type=str, default="") #: Whether the url has been yanked from the server yanked = attr.ib(type=bool, default=False) #: The number of downloads (deprecated) downloads = attr.ib(type=int, default=-1) #: The filename of the current upload filename = attr.ib(type=str, default="") #: Whether the upload has a signature has_sig = attr.ib(type=bool, default=False) #: The python_version attribute of the upload (e.g. 'source', 'py27', etc) python_version = attr.ib(type=str, default="source") #: The 'requires_python' restriction on the package requires_python = attr.ib(type=str, default=None) #: A list of valid aprsed tags from the upload tags = attr.ib(factory=list) # type: List[ParsedTag] @property def is_wheel(self): # type: () -> bool return os.path.splitext(self.filename)[-1].lower() == ".whl" @property def is_sdist(self): # type: () -> bool return self.python_version == "source" @property def markers(self): # type: () -> Optional[str] # TODO: Compare dependencies in parent and add markers for python version # TODO: Compare dependencies in parent and add markers for platform # XXX: We can't use wheel-based markers until we do it via derived markers by # XXX: comparing in the parent (i.e. 'Release' instance or so) and merging # XXX: down to the common / minimal set of markers otherwise we wind up # XXX: with an unmanageable set and combinatorial explosion # if self.is_wheel: # return self.get_markers_from_wheel() if self.requires_python: return marker_from_specifier(self.requires_python) return None @property def pep508_url(self): # type: () -> str markers = self.markers req_str = "{0} @ {1}#egg={0}".format(self.name, self.url) if markers: req_str = "{0}; {1}".format(req_str, markers) return req_str
[docs] def get_markers_from_wheel(self): # type: () -> str supported_platforms = [] # type: List[str] supported_pyversions = [] supported_abis = [] markers = [] for parsed_tag in self.tags: if parsed_tag.marker_string: markers.append(Marker(parsed_tag.marker_string)) if parsed_tag.python_version: supported_pyversions.append(parsed_tag.python_version) if parsed_tag.abi: supported_abis.append(parsed_tag.abi) if not (markers or supported_platforms): return "" if ( all(pyversion in supported_pyversions for pyversion in ["2", "3"]) and not supported_platforms ): marker_line = "" else: marker_line = " or ".join(["{}".format(str(marker)) for marker in markers]) return marker_line
[docs] def get_dependencies(self): # type: () -> Tuple["ReleaseUrl", Dict[str, Union[List[str], str]]] results = {"requires_python": None} requires_dist = [] # type: List[str] if self.is_wheel: metadata = get_remote_wheel_metadata(self.url) if metadata is not None: requires_dist = metadata.run_requires if not self.requires_python: results["requires_python"] = metadata._legacy.get("Requires-Python") else: try: metadata = get_remote_sdist_metadata(self.pep508_url) except Exception: requires_dist = [] else: requires_dist = [str(v) for v in metadata.requires.values()] results["requires_dist"] = requires_dist requires_python = getattr(self, "requires_python", results["requires_python"]) return attr.evolve(self, requires_python=requires_python), results
@property def sha256(self): # type: () -> str return next( iter(digest for digest in self.digests if digest.algorithm == "sha256") ).value
[docs] @classmethod def create(cls, release_dict, name=None): # type: (TReleaseUrlDict, Optional[str]) -> "ReleaseUrl" valid_digest_keys = set("{0}_digest".format(k) for k in VALID_ALGORITHMS.keys()) digest_keys = set(release_dict.keys()) & valid_digest_keys creation_kwargs = {} # type: Dict[str, Union[bool, int, str, Digest, TDigestDict]] creation_kwargs = { k: v for k, v in release_dict.items() if k not in digest_keys } if name is not None: creation_kwargs["name"] = name for k in digest_keys: digest = release_dict[k] if not isinstance(digest, six.string_types): raise TypeError("Digests must be strings, got {!r}".format(digest)) creation_kwargs[k] = Digest.create(k.replace("_digest", ""), digest) release_url = cls(**filter_dict(creation_kwargs)) # type: ignore if release_url.is_wheel: supported_tags = [ parse_tag(Tag(*tag)) for tag in distlib.wheel.Wheel(release_url.url).tags ] release_url = attr.evolve(release_url, tags=supported_tags) return release_url
[docs]def create_release_urls_from_list(urls, name=None): # type: (Union[TReleasesList, List[ReleaseUrl]], Optional[str]) -> List[ReleaseUrl] url_list = [] for release_dict in urls: if isinstance(release_dict, ReleaseUrl): if name and not release_dict.name: release_dict = attr.evolve(release_dict, name=name) url_list.append(release_dict) continue url_list.append(ReleaseUrl.create(release_dict, name=name)) return url_list
[docs]@attr.s(frozen=True, eq=True) class ReleaseUrlCollection(Sequence): #: A list of release URLs urls = attr.ib(converter=create_release_urls_from_list) #: the name of the package name = attr.ib(default=None) # type: Optional[str]
[docs] @classmethod def create(cls, urls, name=None): # type: (TReleasesList, Optional[str]) -> "ReleaseUrlCollection" return cls(urls=urls, name=name)
@property def wheels(self): # type: () -> Iterator[ReleaseUrl] for url in self.urls: if not url.is_wheel: continue yield url @property def sdists(self): # type: () -> Iterator[ReleaseUrl] for url in self.urls: if not url.is_sdist: continue yield url def __iter__(self): # type: () -> Iterator[ReleaseUrl] return iter(self.urls) def __getitem__(self, key): # type: (int) -> ReleaseUrl return self.urls.__getitem__(key) def __len__(self): # type: () -> int return len(self.urls) @property def latest(self): # type: () -> Optional[ReleaseUrl] if not self.urls: return None return next( iter(sorted(self.urls, key=operator.attrgetter("upload_time"), reverse=True)) ) @property def latest_timestamp(self): # type: () -> Optional[datetime.datetime] latest = self.latest if latest is not None: return latest.upload_time return None
[docs] def find_package_type(self, type_): # type: (str) -> Optional[ReleaseUrl] """Given a package type (e.g. sdist, bdist_wheel), find the matching release. :param str type_: A package type from :const:`~PACKAGE_TYPES` :return: The package from this collection matching that type, if available :rtype: Optional[ReleaseUrl] """ if type_ not in PACKAGE_TYPES: raise ValueError( "Invalid package type: {0}. Expected one of {1}".format( type_, " ".join(PACKAGE_TYPES) ) ) return next(iter(url for url in self.urls if url.packagetype == type_), None)
[docs]def convert_release_urls_to_collection(urls=None, name=None): # type: (Optional[TReleasesList], Optional[str]) -> ReleaseUrlCollection if urls is None: urls = [] urls = create_release_urls_from_list(urls, name=name) return ReleaseUrlCollection.create(urls, name=name)
[docs]@attr.s(frozen=True) class Release(Sequence): #: The version of the release version = attr.ib(type=str) #: The URL collection for the release urls = attr.ib( converter=instance_check_converter( # type: ignore ReleaseUrlCollection, convert_release_urls_to_collection ), type=ReleaseUrlCollection, ) #: the name of the package name = attr.ib(default=None) # type: Optional[str] def __iter__(self): # type: () -> Iterator[ReleaseUrlCollection] return iter(self.urls) def __getitem__(self, key): return self.urls[key] def __len__(self): # type: () -> int return len(self.urls) @property def yanked(self): # type: () -> bool if not self.urls: return True return False @property def parsed_version(self): # type: () -> packaging.version._BaseVersion return packaging.version.parse(self.version) @property def wheels(self): # type: () -> Iterator[ReleaseUrl] return self.urls.wheels @property def sdists(self): # type: () -> Iterator[ReleaseUrl] return self.urls.sdists @property def latest(self): # type: () -> ReleaseUrl return self.urls.latest @property def latest_timestamp(self): # type: () -> datetime.datetime return self.urls.latest_timestamp
[docs] def to_lockfile(self): # type: () -> Dict[str, Union[List[str], str]] return { "hashes": [str(url.sha256) for url in self.urls if url.sha256 is not None], "version": "=={0}".format(self.version), }
[docs]def get_release(version, urls, name=None): # type: (str, TReleasesList, Optional[str]) -> Release release_kwargs = {"version": version, "name": name} if not isinstance(urls, ReleaseUrlCollection): release_kwargs["urls"] = convert_release_urls_to_collection(urls, name=name) else: release_kwargs["urls"] = urls return Release(**release_kwargs) # type: ignore
[docs]def get_releases_from_package(releases, name=None): # type: (TReleasesDict, Optional[str]) -> List[Release] release_list = [] for version, urls in releases.items(): release_list.append(get_release(version, urls, name=name)) return release_list
[docs]@attr.s(frozen=True) class ReleaseCollection(object): releases = attr.ib( factory=list, converter=instance_check_converter(list, get_releases_from_package), # type: ignore ) # type: List[Release] def __iter__(self): # type: () -> Iterator[Release] return iter(self.releases) def __getitem__(self, key): # type: (str) -> Release result = next(iter(r for r in self.releases if r.version == key), None) if result is None: raise KeyError(key) return result def __len__(self): # type: () -> int return len(self.releases)
[docs] def get_latest_lockfile(self): # type: () -> Dict[str, Union[str, List[str]]] return self.latest.to_lockfile()
[docs] def wheels(self): # type: () -> Iterator[ReleaseUrl] for release in self.sort_releases(): for wheel in release.wheels: yield wheel
[docs] def sdists(self): # type: () -> Iterator[ReleaseUrl] for release in self.sort_releases(): for sdist in release.sdists: yield sdist
@property def non_yanked_releases(self): # type: () -> List[Release] return list(r for r in self.releases if not r.yanked)
[docs] def sort_releases(self): # type: () -> List[Release] return sorted( self.non_yanked_releases, key=operator.attrgetter("latest_timestamp"), reverse=True, )
@property def latest(self): # type: () -> Optional[Release] return next(iter(r for r in self.sort_releases() if not r.yanked))
[docs] @classmethod def load(cls, releases, name=None): # type: (Union[TReleasesDict, List[Release]], Optional[str]) -> "ReleaseCollection" if not isinstance(releases, list): releases = get_releases_from_package(releases, name=name) return cls(releases)
[docs]def convert_releases_to_collection(releases, name=None): # type: (TReleasesDict, Optional[str]) -> ReleaseCollection return ReleaseCollection.load(releases, name=name)
[docs]def split_keywords(value): # type: (Union[str, List]) -> List[str] if value and isinstance(value, six.string_types): return value.split(",") elif isinstance(value, list): return value return []
[docs]def create_dependencies( requires_dist, # type: Optional[List[Dependency]] parent=None, # type: Optional[Dependency] ): # type: (...) -> Optional[Set[Dependency]] if requires_dist is None: return None dependencies = set() for req in requires_dist: if not isinstance(req, Dependency): dependencies.add(Dependency.from_str(req, parent=parent)) else: dependencies.add(req) return dependencies
[docs]@attr.s(frozen=True) class PackageInfo(object): name = attr.ib(type=str) version = attr.ib(type=str) package_url = attr.ib(type=str) summary = attr.ib(type=str, default=None) # type: Optional[str] author = attr.ib(type=str, default=None) # type: Optional[str] keywords = attr.ib(factory=list, converter=split_keywords) # type: List[str] description = attr.ib(type=str, default="") download_url = attr.ib(type=str, default="") home_page = attr.ib(type=str, default="") license = attr.ib(type=str, default="") maintainer = attr.ib(type=str, default="") maintainer_email = attr.ib(type=str, default="") downloads = attr.ib(factory=dict) # type: Dict[str, int] docs_url = attr.ib(default=None) # type: Optional[str] platform = attr.ib(type=str, default="") project_url = attr.ib(type=str, default="") project_urls = attr.ib(factory=dict) # type: Dict[str, str] requires_python = attr.ib(default=None) # type: Optional[str] requires_dist = attr.ib(factory=list) # type: List[Dependency] release_url = attr.ib(default=None) # type: Optional[str] description_content_type = attr.ib(type=str, default="text/md") bugtrack_url = attr.ib(default=None) # type: str classifiers = attr.ib(factory=list) # type: List[str] author_email = attr.ib(default=None) # type: Optional[str] markers = attr.ib(default=None) # type: Optional[str] dependencies = attr.ib(default=None) # type: Tuple[Dependency]
[docs] @classmethod def from_json(cls, info_json): # type: (TPackageInfo) -> "PackageInfo" return cls(**filter_dict(info_json)) # type: ignore
[docs] def to_dependency(self): # type: () -> Dependency return Dependency.from_info(self)
[docs] def create_dependencies(self, force=False): # type: (bool) -> "PackageInfo" """Create values for **self.dependencies**. :param bool force: Sets **self.dependencies** to an empty tuple if it would be None, defaults to False. :return: An updated instance of the current object with **self.dependencies** updated accordingly. :rtype: :class:`PackageInfo` """ if not self.dependencies and not self.requires_dist: if force: return attr.evolve(self, dependencies=tuple()) return self self_dependency = self.to_dependency() deps = set() self_dependencies = tuple() if not self.dependencies else self.dependencies for dep in self_dependencies: if dep is None: continue new_dep = dep.add_parent(self_dependency) deps.add(new_dep) created_deps = create_dependencies(self.requires_dist, parent=self_dependency) if created_deps is not None: for dep in created_deps: if dep is None: continue deps.add(dep) return attr.evolve(self, dependencies=tuple(sorted(deps)))
[docs]def convert_package_info(info_json): # type: (Union[TPackageInfo, PackageInfo]) -> PackageInfo if isinstance(info_json, PackageInfo): return info_json return PackageInfo.from_json(info_json)
[docs]def add_markers_to_dep(d, marker_str): # type: (str, Union[str, Marker]) -> str req = PackagingRequirement(d) existing_marker = getattr(req, "marker", None) if isinstance(marker_str, Marker): marker_str = str(marker_str) if existing_marker is not None: marker_str = str(merge_markers(existing_marker, marker_str)) if marker_str: marker_str = marker_str.replace("'", '"') req.marker = Marker(marker_str) return str(req)
[docs]@attr.s class Package(object): info = attr.ib(type=PackageInfo, converter=convert_package_info) last_serial = attr.ib(type=int) releases = attr.ib( type=ReleaseCollection, converter=instance_check_converter( # type: ignore ReleaseCollection, convert_releases_to_collection ), ) # XXX: Note: sometimes releases have no urls at the top level (e.g. pyrouge) urls = attr.ib( type=ReleaseUrlCollection, converter=instance_check_converter( # type: ignore ReleaseUrlCollection, convert_release_urls_to_collection ), ) @urls.default def _get_urls_collection(self): return functools.partial(convert_release_urls_to_collection, urls=[], name=self.name) @property def name(self): # type: () -> str return self.info.name @property def version(self): # type: () -> str return self.info.version @property def requirement(self): # type: () -> PackagingRequirement return self.info.to_dependency().requirement @property def latest_sdist(self): # type: () -> ReleaseUrl return next(iter(self.urls.sdists)) @property def latest_wheels(self): # type: () -> Iterator[ReleaseUrl] for wheel in self.urls.wheels: yield wheel @property def dependencies(self): # type: () -> List[Dependency] if self.info.dependencies is None and list(self.urls): rval = self.get_dependencies() return rval.dependencies return list(self.info.dependencies)
[docs] def get_dependencies(self): # type: () -> "Package" urls = [] # type: List[ReleaseUrl] deps = set() # type: Set[str] info = self.info if info.dependencies is None: for url in self.urls: try: url, dep_dict = url.get_dependencies() except (RuntimeError, TypeError): # This happens if we are parsing `setup.py` and we fail if url.is_sdist: continue else: raise markers = url.markers dep_list = dep_dict.get("requires_dist", []) for dep in dep_list: # XXX: We need to parse these as requirements and "and" the markers # XXX: together because they may contain "extra" markers which we # XXX: will need to parse and remove deps.add(add_markers_to_dep(dep, markers)) urls.append(url) if None in deps: deps.remove(None) info = attr.evolve( self.info, requires_dist=tuple(sorted(deps)) ).create_dependencies(force=True) return attr.evolve(self, info=info, urls=urls)
[docs] @classmethod def from_json(cls, package_json): # type: (Dict[str, Any]) -> "Package" info = convert_package_info(package_json["info"]).create_dependencies() releases = convert_releases_to_collection( package_json["releases"], name=info.name ) urls = convert_release_urls_to_collection(package_json["urls"], name=info.name) return cls( info=info, releases=releases, urls=urls, last_serial=package_json["last_serial"], )
[docs] def pin_dependencies(self, include_extras=None): # type: (Optional[List[str]]) -> Tuple[List["Package"], Dict[str, List[SpecifierSet]]] deps = [] if include_extras: include_extras = list(sorted(set(include_extras))) else: include_extras = [] constraints = defaultdict(list) for dep in self.dependencies: if dep.from_extras and dep.from_extras not in include_extras: continue if dep.specifier: constraints[dep.name].append(dep.specifier) try: pinned = dep.pin() except requests.exceptions.HTTPError: continue deps.append(pinned) return deps, constraints
[docs] def get_latest_lockfile(self): # type: () -> Dict[str, Dict[str, Union[List[str], str]]] lockfile = {} constraints = {dep.name: dep.specifier for dep in self.dependencies} deps, _ = self.pin_dependencies() for dep in deps: dep = dep.get_dependencies() for sub_dep in dep.dependencies: if sub_dep.name not in constraints: logger.info( "Adding {0} (from {1}) {2!s}".format( sub_dep.name, dep.name, sub_dep.specifier ) ) constraints[sub_dep.name] = sub_dep.specifier else: existing = "{0} (from {1}): {2!s} + ".format( sub_dep.name, dep.name, constraints[sub_dep.name] ) new_specifier = sub_dep.specifier merged = constraints[sub_dep.name] & new_specifier logger.info( "Updating: {0}{1!s} = {2!s}".format( existing, new_specifier, merged ) ) constraints[sub_dep.name] = merged lockfile.update({dep.info.name: dep.releases.get_latest_lockfile()}) for sub_dep_name, specset in constraints.items(): try: sub_dep_pkg = get_package(sub_dep_name) except requests.exceptions.HTTPError: continue logger.info("Getting package: {0} ({1!s})".format(sub_dep, specset)) sorted_releases = list( sorted( sub_dep_pkg.releases, key=operator.attrgetter("parsed_version"), reverse=True, ) ) try: version = next(iter(specset.filter((r.version for r in sorted_releases)))) except StopIteration: logger.info("No version of {0} matches specifier: {1}".format(sub_dep, specset)) logger.info( "Available versions: {0}".format( " ".join([r.version for r in sorted_releases]) ) ) raise sub_dep_instance = get_package_version(sub_dep_name, version=str(version)) if sub_dep_instance is None: continue lockfile.update( { sub_dep_instance.info.name: sub_dep_instance.releases.get_latest_lockfile() } ) # lockfile.update(dep.get_latest_lockfile()) lockfile.update({self.info.name: self.releases.get_latest_lockfile()}) return lockfile
[docs] def as_dict(self): # type: () -> Dict[str, Any] return json.loads(self.serialize())
[docs] def serialize(self): # type: () -> str return json.dumps(attr.asdict(self), cls=PackageEncoder, indent=4)
[docs]def get_package(name): # type: (str) -> Package url = "https://pypi.org/pypi/{}/json".format(name) with requests.get(url) as r: r.raise_for_status() result = r.json() package = Package.from_json(result) return package
[docs]def get_package_version(name, version): # type: (str, str) -> Package url = "https://pypi.org/pypi/{0}/{1}/json".format(name, version) with requests.get(url) as r: r.raise_for_status() result = r.json() package = Package.from_json(result) return package
[docs]def get_package_from_requirement(req): # type: (PackagingRequirement) -> Tuple[Package, Set[str]] versions = set() if is_pinned_requirement(req): version = get_pinned_version(req) versions.add(version) pkg = get_package_version(req.name, version) else: pkg = get_package(req.name) sorted_releases = list( sorted(pkg.releases, key=operator.attrgetter("parsed_version"), reverse=True) ) versions = set(req.specifier.filter((r.version for r in sorted_releases))) version = next(iter(req.specifier.filter((r.version for r in sorted_releases)))) if pkg.version not in versions: pkg = get_package_version(pkg.name, version) return pkg, versions