# -*- coding=utf-8 -*-
import datetime
import functools
import io
import json
import logging
import operator
import os
import zipfile
from collections import defaultdict
import attr
import dateutil.parser
import distlib.metadata
import distlib.wheel
import packaging.version
import requests
import six
import vistir
from packaging.markers import Marker
from packaging.requirements import Requirement as PackagingRequirement
from packaging.specifiers import Specifier, SpecifierSet
from packaging.tags import Tag
from ..environment import MYPY_RUNNING
from .markers import (
get_contained_extras,
get_contained_pyversions,
get_without_extra,
get_without_pyversion,
marker_from_specifier,
merge_markers,
normalize_specifier_set,
)
from .requirements import Requirement
from .utils import filter_dict, get_pinned_version, is_pinned_requirement
# fmt: off
from six.moves import Sequence # type: ignore # isort:skip
from six.moves import reduce # type: ignore # isort:skip
# fmt: on # isort:skip
ch = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if MYPY_RUNNING:
from typing import (
Any,
Callable,
Dict,
Generator,
Generic,
Iterator,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)
from attr import Attribute # noqa
from .setup_info import SetupInfo
TAttrsClass = TypeVar("TAttrsClass")
AttrsClass = Generic[TAttrsClass]
TDigestDict = Dict[str, str]
TProjectUrls = Dict[str, str]
TReleaseUrlDict = Dict[str, Union[bool, int, str, TDigestDict]]
TReleasesList = List[TReleaseUrlDict]
TReleasesDict = Dict[str, TReleasesList]
TDownloads = Dict[str, int]
TPackageInfo = Dict[str, Optional[Union[str, List[str], TDownloads, TProjectUrls]]]
TPackage = Dict[str, Union[TPackageInfo, int, TReleasesDict, TReleasesList]]
VALID_ALGORITHMS = {
"sha1": 40,
"sha3_224": 56,
"sha512": 128,
"blake2b": 128,
"sha256": 64,
"sha384": 96,
"blake2s": 64,
"sha3_256": 64,
"sha3_512": 128,
"md5": 32,
"sha3_384": 96,
"sha224": 56,
} # type: Dict[str, int]
PACKAGE_TYPES = {
"sdist",
"bdist_wheel",
"bdist_egg",
"bdist_dumb",
"bdist_wininst",
"bdist_rpm",
"bdist_msi",
"bdist_dmg",
}
[docs]class PackageEncoder(json.JSONEncoder):
[docs] def default(self, obj): # noqa:E0202 # noqa:W0221
if isinstance(obj, datetime.datetime):
return obj.isoformat()
elif isinstance(obj, PackagingRequirement):
return obj.__dict__
elif isinstance(obj, set):
return tuple(obj)
elif isinstance(obj, (Specifier, SpecifierSet, Marker)):
return str(obj)
else:
return json.JSONEncoder.default(self, obj)
[docs]def validate_digest(inst, attrib, value):
# type: ("Digest", Attribute, str) -> None
expected_length = VALID_ALGORITHMS[inst.algorithm.lower()]
if len(value) != expected_length:
raise ValueError(
"Expected a digest of length {0!s}, got one of length {1!s}".format(
expected_length, len(value)
)
)
return None
[docs]def create_specifierset(spec=None):
# type: (Optional[str]) -> SpecifierSet
if isinstance(spec, SpecifierSet):
return spec
elif isinstance(spec, (set, list, tuple)):
spec = " and ".join(spec)
if spec is None:
spec = ""
return SpecifierSet(spec)
[docs]@attr.s(frozen=True, eq=True)
class Dependency(object):
#: The name of the dependency
name = attr.ib(type=str)
#: A requirement instance
requirement = attr.ib(type=PackagingRequirement, eq=False)
#: The specifier defined in the dependency definition
specifier = attr.ib(type=SpecifierSet, converter=create_specifierset, eq=False)
#: Any extras this dependency declares
extras = attr.ib(factory=tuple, validator=validate_extras) # type: Tuple[str, ...]
#: The name of the extra meta-dependency this one came from (e.g. 'security')
from_extras = attr.ib(default=None, eq=False) # type: Optional[str]
#: The declared specifier set of allowable python versions for this dependency
python_version = attr.ib(
default="", type=SpecifierSet, converter=create_specifierset, eq=False
)
#: The parent of this dependency (i.e. where it came from)
parent = attr.ib(default=None) # type: Optional[Dependency]
#: The markers for this dependency
markers = attr.ib(default=None, eq=False) # type: Optional[Marker]
_specset_str = attr.ib(default="", type=str)
_python_version_str = attr.ib(default="", type=str)
_marker_str = attr.ib(default="", type=str)
def __str__(self):
# type: () -> str
return str(self.requirement)
[docs] def as_line(self):
# type: () -> str
line_str = "{0}".format(self.name)
if self.extras:
line_str = "{0}[{1}]".format(line_str, ",".join(self.extras))
if self.specifier:
line_str = "{0}{1!s}".format(line_str, self.specifier)
py_version_part = ""
if self.python_version:
specifiers = normalize_specifier_set(self.python_version)
markers = []
if specifiers is not None:
markers = [marker_from_specifier(str(s)) for s in specifiers]
py_version_part = reduce(merge_markers, markers)
if self.markers:
line_str = "{0}; {1}".format(line_str, str(self.markers))
if py_version_part:
line_str = "{0} and {1}".format(line_str, py_version_part)
elif py_version_part and not self.markers:
line_str = "{0}; {1}".format(line_str, py_version_part)
return line_str
[docs] def pin(self):
# type: () -> "Package"
base_package = get_package(self.name)
sorted_releases = sorted(
base_package.releases.non_yanked_releases,
key=operator.attrgetter("parsed_version"),
reverse=True,
)
version = next(
iter(self.specifier.filter((r.version for r in sorted_releases))), None
)
if not version:
version = next(
iter(
self.specifier.filter(
(r.version for r in sorted_releases), prereleases=True
)
),
None,
)
if not version:
raise RuntimeError(
"Failed to resolve {0} ({1!s})".format(self.name, self.specifier)
)
match = get_package_version(self.name, str(version))
return match
[docs] @classmethod
def from_requirement(cls, req, parent=None):
# type: (PackagingRequirement, Optional["Dependency"]) -> "Dependency"
from_extras, marker, python_version = None, None, None
specset_str, py_version_str, marker_str = "", "", ""
if req.marker:
marker = Marker(str(req.marker))
from_extras = next(iter(list(get_contained_extras(marker))), None)
python_version = get_contained_pyversions(marker)
marker = get_without_extra(get_without_pyversion(marker))
if not str(marker) or not marker or not marker._markers:
marker = None
req.marker = marker
if marker is not None:
marker_str = str(marker)
if req.specifier:
specset_str = str(req.specifier)
if python_version:
py_version_str = str(python_version)
return cls(
name=req.name,
specifier=req.specifier,
extras=tuple(sorted(set(req.extras))) if req.extras is not None else req.extras,
requirement=req,
from_extras=from_extras,
python_version=python_version,
markers=marker,
parent=parent,
specset_str=specset_str,
python_version_str=py_version_str,
marker_str=marker_str,
)
[docs] @classmethod
def from_info(cls, info):
# type: ("PackageInfo") -> "Dependency"
marker_str = ""
specset_str, py_version_str = "", ""
if info.requires_python:
# XXX: Some markers are improperly formatted -- we already handle most cases
# XXX: but learned about new broken formats, such as
# XXX: python_version in "2.6 2.7 3.2 3.3" (note the lack of commas)
# XXX: as a marker on a dependency of a library called 'pickleshare'
# XXX: Some packages also have invalid markers with stray characters,
# XXX: such as 'algoliasearch'
try:
marker = marker_from_specifier(info.requires_python)
except Exception:
marker_str = ""
else:
if not marker or not marker._markers:
marker_str = ""
else:
marker_str = "{0!s}".format(marker)
req_str = "{0}=={1}".format(info.name, info.version)
if marker_str:
req_str = "{0}; {1}".format(req_str, marker_str)
req = PackagingRequirement(req_str)
requires_python_str = (
info.requires_python if info.requires_python is not None else ""
)
if req.specifier:
specset_str = str(req.specifier)
if requires_python_str:
py_version_str = requires_python_str
return cls(
name=info.name,
specifier=req.specifier,
extras=tuple(sorted(set(req.extras))) if req.extras is not None else req.extras,
requirement=req,
from_extras=None,
python_version=SpecifierSet(requires_python_str),
markers=None,
parent=None,
specset_str=specset_str,
python_version_str=py_version_str,
marker_str=marker_str,
)
[docs] @classmethod
def from_str(cls, depstr, parent=None):
# type: (str, Optional["Dependency"]) -> "Dependency"
try:
req = PackagingRequirement(depstr)
except Exception:
raise
return cls.from_requirement(req, parent=parent)
[docs] def add_parent(self, parent):
# type: ("Dependency") -> "Dependency"
return attr.evolve(self, parent=parent)
[docs]@attr.s(frozen=True, eq=True)
class Digest(object):
#: The algorithm declared for the digest, e.g. 'sha256'
algorithm = attr.ib(
type=str, validator=attr.validators.in_(VALID_ALGORITHMS.keys()), eq=True
)
#: The digest value
value = attr.ib(type=str, validator=validate_digest, eq=True)
def __str__(self):
# type: () -> str
return "{0}:{1}".format(self.algorithm, self.value)
[docs] @classmethod
def create(cls, algorithm, value):
# type: (str, str) -> "Digest"
return cls(algorithm=algorithm, value=value)
[docs] @classmethod
def collection_from_dict(cls, digest_dict):
# type: (TDigestDict) -> List["Digest"]
return [cls.create(k, v) for k, v in digest_dict.items()]
# XXX: This is necessary because attrs converters can only be functions, not classmethods
[docs]def create_digest_collection(digest_dict):
# type: (TDigestDict) -> List["Digest"]
return Digest.collection_from_dict(digest_dict)
[docs]def instance_check_converter(expected_type=None, converter=None):
# type: (Optional[Type], Optional[Callable]) -> Callable
def _converter(val):
if expected_type is not None and isinstance(val, expected_type):
return val
return converter(val)
return _converter
[docs]@attr.s(frozen=True, eq=True)
class ParsedTag(object):
#: The marker string corresponding to the tag
marker_string = attr.ib(default=None) # type: Optional[str]
#: The python version represented by the tag
python_version = attr.ib(default=None) # type: Optional[str]
#: The platform represented by the tag
platform_system = attr.ib(default=None) # type: Optional[str]
#: the ABI represented by the tag
abi = attr.ib(default=None) # type: Optional[str]
[docs]def parse_tag(tag):
# type: (Tag) -> ParsedTag
"""Parse a :class:`~packaging.tags.Tag` instance.
:param :class:`~packaging.tags.Tag` tag: A tag to parse
:return: A parsed tag with combined markers, supported platform and python version
:rtype: :class:`~ParsedTag`
"""
platform_system = None
python_version = None
version = None
marker_str = ""
if tag.platform.startswith("macos"):
platform_system = "Darwin"
elif tag.platform.startswith("manylinux") or tag.platform.startswith("linux"):
platform_system = "Linux"
elif tag.platform.startswith("win32"):
platform_system = "Windows"
if platform_system:
marker_str = 'platform_system == "{}"'.format(platform_system)
if tag.interpreter:
version = tag.interpreter[2:]
py_version_str = ""
if len(version) == 1:
py_version_str = ">={}.0,<{}".format(version, str(int(version) + 1))
elif len(version) > 1 and len(version) <= 3:
# reverse the existing version so we can add 1 to the first element
# and re-reverse, generating the new version, e.g. [3, 2, 8] =>
# [8, 2, 3] => [9, 2, 3] => [3, 2, 9]
next_version_list = list(reversed(version[:]))
next_version_list[0] = str(int(next_version_list[0]) + 1)
next_version = ".".join(list(reversed(next_version_list)))
version = ".".join(version)
py_version_str = ">={},<{}".format(version, next_version)
else:
py_version_str = "{0}".format(version)
python_version = marker_from_specifier(py_version_str)
if python_version:
if marker_str:
marker_str = "{0} and {1!s}".format(marker_str, python_version)
else:
marker_str = str(python_version)
return ParsedTag(
marker_string=marker_str,
python_version=version,
platform_system=platform_system,
abi=tag.abi,
)
[docs]@attr.s(frozen=True, eq=True)
class ReleaseUrl(object):
#: The MD5 digest of the given release
md5_digest = attr.ib(type=Digest)
#: The package type of the url
packagetype = attr.ib(type=str, validator=attr.validators.in_(PACKAGE_TYPES))
#: The upload timestamp from the package
upload_time = attr.ib(
type=datetime.datetime,
converter=instance_check_converter(datetime.datetime, dateutil.parser.parse), # type: ignore
)
#: The ISO8601 formatted upload timestamp of the package
upload_time_iso_8601 = attr.ib(
type=datetime.datetime,
converter=instance_check_converter(datetime.datetime, dateutil.parser.parse), # type: ignore
)
#: The size in bytes of the package
size = attr.ib(type=int)
#: The URL of the package
url = attr.ib(type=str)
#: The digests of the package
digests = attr.ib(
converter=instance_check_converter(list, create_digest_collection) # type: ignore
) # type: List[Digest]
#: The name of the package
name = attr.ib(type=str, default=None)
#: The available comments of the given upload
comment_text = attr.ib(type=str, default="")
#: Whether the url has been yanked from the server
yanked = attr.ib(type=bool, default=False)
#: The number of downloads (deprecated)
downloads = attr.ib(type=int, default=-1)
#: The filename of the current upload
filename = attr.ib(type=str, default="")
#: Whether the upload has a signature
has_sig = attr.ib(type=bool, default=False)
#: The python_version attribute of the upload (e.g. 'source', 'py27', etc)
python_version = attr.ib(type=str, default="source")
#: The 'requires_python' restriction on the package
requires_python = attr.ib(type=str, default=None)
#: A list of valid aprsed tags from the upload
tags = attr.ib(factory=list) # type: List[ParsedTag]
@property
def is_wheel(self):
# type: () -> bool
return os.path.splitext(self.filename)[-1].lower() == ".whl"
@property
def is_sdist(self):
# type: () -> bool
return self.python_version == "source"
@property
def markers(self):
# type: () -> Optional[str]
# TODO: Compare dependencies in parent and add markers for python version
# TODO: Compare dependencies in parent and add markers for platform
# XXX: We can't use wheel-based markers until we do it via derived markers by
# XXX: comparing in the parent (i.e. 'Release' instance or so) and merging
# XXX: down to the common / minimal set of markers otherwise we wind up
# XXX: with an unmanageable set and combinatorial explosion
# if self.is_wheel:
# return self.get_markers_from_wheel()
if self.requires_python:
return marker_from_specifier(self.requires_python)
return None
@property
def pep508_url(self):
# type: () -> str
markers = self.markers
req_str = "{0} @ {1}#egg={0}".format(self.name, self.url)
if markers:
req_str = "{0}; {1}".format(req_str, markers)
return req_str
[docs] def get_markers_from_wheel(self):
# type: () -> str
supported_platforms = [] # type: List[str]
supported_pyversions = []
supported_abis = []
markers = []
for parsed_tag in self.tags:
if parsed_tag.marker_string:
markers.append(Marker(parsed_tag.marker_string))
if parsed_tag.python_version:
supported_pyversions.append(parsed_tag.python_version)
if parsed_tag.abi:
supported_abis.append(parsed_tag.abi)
if not (markers or supported_platforms):
return ""
if (
all(pyversion in supported_pyversions for pyversion in ["2", "3"])
and not supported_platforms
):
marker_line = ""
else:
marker_line = " or ".join(["{}".format(str(marker)) for marker in markers])
return marker_line
[docs] def get_dependencies(self):
# type: () -> Tuple["ReleaseUrl", Dict[str, Union[List[str], str]]]
results = {"requires_python": None}
requires_dist = [] # type: List[str]
if self.is_wheel:
metadata = get_remote_wheel_metadata(self.url)
if metadata is not None:
requires_dist = metadata.run_requires
if not self.requires_python:
results["requires_python"] = metadata._legacy.get("Requires-Python")
else:
try:
metadata = get_remote_sdist_metadata(self.pep508_url)
except Exception:
requires_dist = []
else:
requires_dist = [str(v) for v in metadata.requires.values()]
results["requires_dist"] = requires_dist
requires_python = getattr(self, "requires_python", results["requires_python"])
return attr.evolve(self, requires_python=requires_python), results
@property
def sha256(self):
# type: () -> str
return next(
iter(digest for digest in self.digests if digest.algorithm == "sha256")
).value
[docs] @classmethod
def create(cls, release_dict, name=None):
# type: (TReleaseUrlDict, Optional[str]) -> "ReleaseUrl"
valid_digest_keys = set("{0}_digest".format(k) for k in VALID_ALGORITHMS.keys())
digest_keys = set(release_dict.keys()) & valid_digest_keys
creation_kwargs = {} # type: Dict[str, Union[bool, int, str, Digest, TDigestDict]]
creation_kwargs = {
k: v for k, v in release_dict.items() if k not in digest_keys
}
if name is not None:
creation_kwargs["name"] = name
for k in digest_keys:
digest = release_dict[k]
if not isinstance(digest, six.string_types):
raise TypeError("Digests must be strings, got {!r}".format(digest))
creation_kwargs[k] = Digest.create(k.replace("_digest", ""), digest)
release_url = cls(**filter_dict(creation_kwargs)) # type: ignore
if release_url.is_wheel:
supported_tags = [
parse_tag(Tag(*tag)) for tag in distlib.wheel.Wheel(release_url.url).tags
]
release_url = attr.evolve(release_url, tags=supported_tags)
return release_url
[docs]def create_release_urls_from_list(urls, name=None):
# type: (Union[TReleasesList, List[ReleaseUrl]], Optional[str]) -> List[ReleaseUrl]
url_list = []
for release_dict in urls:
if isinstance(release_dict, ReleaseUrl):
if name and not release_dict.name:
release_dict = attr.evolve(release_dict, name=name)
url_list.append(release_dict)
continue
url_list.append(ReleaseUrl.create(release_dict, name=name))
return url_list
[docs]@attr.s(frozen=True, eq=True)
class ReleaseUrlCollection(Sequence):
#: A list of release URLs
urls = attr.ib(converter=create_release_urls_from_list)
#: the name of the package
name = attr.ib(default=None) # type: Optional[str]
[docs] @classmethod
def create(cls, urls, name=None):
# type: (TReleasesList, Optional[str]) -> "ReleaseUrlCollection"
return cls(urls=urls, name=name)
@property
def wheels(self):
# type: () -> Iterator[ReleaseUrl]
for url in self.urls:
if not url.is_wheel:
continue
yield url
@property
def sdists(self):
# type: () -> Iterator[ReleaseUrl]
for url in self.urls:
if not url.is_sdist:
continue
yield url
def __iter__(self):
# type: () -> Iterator[ReleaseUrl]
return iter(self.urls)
def __getitem__(self, key):
# type: (int) -> ReleaseUrl
return self.urls.__getitem__(key)
def __len__(self):
# type: () -> int
return len(self.urls)
@property
def latest(self):
# type: () -> Optional[ReleaseUrl]
if not self.urls:
return None
return next(
iter(sorted(self.urls, key=operator.attrgetter("upload_time"), reverse=True))
)
@property
def latest_timestamp(self):
# type: () -> Optional[datetime.datetime]
latest = self.latest
if latest is not None:
return latest.upload_time
return None
[docs] def find_package_type(self, type_):
# type: (str) -> Optional[ReleaseUrl]
"""Given a package type (e.g. sdist, bdist_wheel), find the matching
release.
:param str type_: A package type from :const:`~PACKAGE_TYPES`
:return: The package from this collection matching that type, if available
:rtype: Optional[ReleaseUrl]
"""
if type_ not in PACKAGE_TYPES:
raise ValueError(
"Invalid package type: {0}. Expected one of {1}".format(
type_, " ".join(PACKAGE_TYPES)
)
)
return next(iter(url for url in self.urls if url.packagetype == type_), None)
[docs]def convert_release_urls_to_collection(urls=None, name=None):
# type: (Optional[TReleasesList], Optional[str]) -> ReleaseUrlCollection
if urls is None:
urls = []
urls = create_release_urls_from_list(urls, name=name)
return ReleaseUrlCollection.create(urls, name=name)
[docs]@attr.s(frozen=True)
class Release(Sequence):
#: The version of the release
version = attr.ib(type=str)
#: The URL collection for the release
urls = attr.ib(
converter=instance_check_converter( # type: ignore
ReleaseUrlCollection, convert_release_urls_to_collection
),
type=ReleaseUrlCollection,
)
#: the name of the package
name = attr.ib(default=None) # type: Optional[str]
def __iter__(self):
# type: () -> Iterator[ReleaseUrlCollection]
return iter(self.urls)
def __getitem__(self, key):
return self.urls[key]
def __len__(self):
# type: () -> int
return len(self.urls)
@property
def yanked(self):
# type: () -> bool
if not self.urls:
return True
return False
@property
def parsed_version(self):
# type: () -> packaging.version._BaseVersion
return packaging.version.parse(self.version)
@property
def wheels(self):
# type: () -> Iterator[ReleaseUrl]
return self.urls.wheels
@property
def sdists(self):
# type: () -> Iterator[ReleaseUrl]
return self.urls.sdists
@property
def latest(self):
# type: () -> ReleaseUrl
return self.urls.latest
@property
def latest_timestamp(self):
# type: () -> datetime.datetime
return self.urls.latest_timestamp
[docs] def to_lockfile(self):
# type: () -> Dict[str, Union[List[str], str]]
return {
"hashes": [str(url.sha256) for url in self.urls if url.sha256 is not None],
"version": "=={0}".format(self.version),
}
[docs]def get_release(version, urls, name=None):
# type: (str, TReleasesList, Optional[str]) -> Release
release_kwargs = {"version": version, "name": name}
if not isinstance(urls, ReleaseUrlCollection):
release_kwargs["urls"] = convert_release_urls_to_collection(urls, name=name)
else:
release_kwargs["urls"] = urls
return Release(**release_kwargs) # type: ignore
[docs]def get_releases_from_package(releases, name=None):
# type: (TReleasesDict, Optional[str]) -> List[Release]
release_list = []
for version, urls in releases.items():
release_list.append(get_release(version, urls, name=name))
return release_list
[docs]@attr.s(frozen=True)
class ReleaseCollection(object):
releases = attr.ib(
factory=list, converter=instance_check_converter(list, get_releases_from_package), # type: ignore
) # type: List[Release]
def __iter__(self):
# type: () -> Iterator[Release]
return iter(self.releases)
def __getitem__(self, key):
# type: (str) -> Release
result = next(iter(r for r in self.releases if r.version == key), None)
if result is None:
raise KeyError(key)
return result
def __len__(self):
# type: () -> int
return len(self.releases)
[docs] def get_latest_lockfile(self):
# type: () -> Dict[str, Union[str, List[str]]]
return self.latest.to_lockfile()
[docs] def wheels(self):
# type: () -> Iterator[ReleaseUrl]
for release in self.sort_releases():
for wheel in release.wheels:
yield wheel
[docs] def sdists(self):
# type: () -> Iterator[ReleaseUrl]
for release in self.sort_releases():
for sdist in release.sdists:
yield sdist
@property
def non_yanked_releases(self):
# type: () -> List[Release]
return list(r for r in self.releases if not r.yanked)
[docs] def sort_releases(self):
# type: () -> List[Release]
return sorted(
self.non_yanked_releases,
key=operator.attrgetter("latest_timestamp"),
reverse=True,
)
@property
def latest(self):
# type: () -> Optional[Release]
return next(iter(r for r in self.sort_releases() if not r.yanked))
[docs] @classmethod
def load(cls, releases, name=None):
# type: (Union[TReleasesDict, List[Release]], Optional[str]) -> "ReleaseCollection"
if not isinstance(releases, list):
releases = get_releases_from_package(releases, name=name)
return cls(releases)
[docs]def convert_releases_to_collection(releases, name=None):
# type: (TReleasesDict, Optional[str]) -> ReleaseCollection
return ReleaseCollection.load(releases, name=name)
[docs]def split_keywords(value):
# type: (Union[str, List]) -> List[str]
if value and isinstance(value, six.string_types):
return value.split(",")
elif isinstance(value, list):
return value
return []
[docs]def create_dependencies(
requires_dist, # type: Optional[List[Dependency]]
parent=None, # type: Optional[Dependency]
):
# type: (...) -> Optional[Set[Dependency]]
if requires_dist is None:
return None
dependencies = set()
for req in requires_dist:
if not isinstance(req, Dependency):
dependencies.add(Dependency.from_str(req, parent=parent))
else:
dependencies.add(req)
return dependencies
[docs]@attr.s(frozen=True)
class PackageInfo(object):
name = attr.ib(type=str)
version = attr.ib(type=str)
package_url = attr.ib(type=str)
summary = attr.ib(type=str, default=None) # type: Optional[str]
author = attr.ib(type=str, default=None) # type: Optional[str]
keywords = attr.ib(factory=list, converter=split_keywords) # type: List[str]
description = attr.ib(type=str, default="")
download_url = attr.ib(type=str, default="")
home_page = attr.ib(type=str, default="")
license = attr.ib(type=str, default="")
maintainer = attr.ib(type=str, default="")
maintainer_email = attr.ib(type=str, default="")
downloads = attr.ib(factory=dict) # type: Dict[str, int]
docs_url = attr.ib(default=None) # type: Optional[str]
platform = attr.ib(type=str, default="")
project_url = attr.ib(type=str, default="")
project_urls = attr.ib(factory=dict) # type: Dict[str, str]
requires_python = attr.ib(default=None) # type: Optional[str]
requires_dist = attr.ib(factory=list) # type: List[Dependency]
release_url = attr.ib(default=None) # type: Optional[str]
description_content_type = attr.ib(type=str, default="text/md")
bugtrack_url = attr.ib(default=None) # type: str
classifiers = attr.ib(factory=list) # type: List[str]
author_email = attr.ib(default=None) # type: Optional[str]
markers = attr.ib(default=None) # type: Optional[str]
dependencies = attr.ib(default=None) # type: Tuple[Dependency]
[docs] @classmethod
def from_json(cls, info_json):
# type: (TPackageInfo) -> "PackageInfo"
return cls(**filter_dict(info_json)) # type: ignore
[docs] def to_dependency(self):
# type: () -> Dependency
return Dependency.from_info(self)
[docs] def create_dependencies(self, force=False):
# type: (bool) -> "PackageInfo"
"""Create values for **self.dependencies**.
:param bool force: Sets **self.dependencies** to an empty tuple if it would be
None, defaults to False.
:return: An updated instance of the current object with **self.dependencies**
updated accordingly.
:rtype: :class:`PackageInfo`
"""
if not self.dependencies and not self.requires_dist:
if force:
return attr.evolve(self, dependencies=tuple())
return self
self_dependency = self.to_dependency()
deps = set()
self_dependencies = tuple() if not self.dependencies else self.dependencies
for dep in self_dependencies:
if dep is None:
continue
new_dep = dep.add_parent(self_dependency)
deps.add(new_dep)
created_deps = create_dependencies(self.requires_dist, parent=self_dependency)
if created_deps is not None:
for dep in created_deps:
if dep is None:
continue
deps.add(dep)
return attr.evolve(self, dependencies=tuple(sorted(deps)))
[docs]def convert_package_info(info_json):
# type: (Union[TPackageInfo, PackageInfo]) -> PackageInfo
if isinstance(info_json, PackageInfo):
return info_json
return PackageInfo.from_json(info_json)
[docs]def add_markers_to_dep(d, marker_str):
# type: (str, Union[str, Marker]) -> str
req = PackagingRequirement(d)
existing_marker = getattr(req, "marker", None)
if isinstance(marker_str, Marker):
marker_str = str(marker_str)
if existing_marker is not None:
marker_str = str(merge_markers(existing_marker, marker_str))
if marker_str:
marker_str = marker_str.replace("'", '"')
req.marker = Marker(marker_str)
return str(req)
[docs]@attr.s
class Package(object):
info = attr.ib(type=PackageInfo, converter=convert_package_info)
last_serial = attr.ib(type=int)
releases = attr.ib(
type=ReleaseCollection,
converter=instance_check_converter( # type: ignore
ReleaseCollection, convert_releases_to_collection
),
)
# XXX: Note: sometimes releases have no urls at the top level (e.g. pyrouge)
urls = attr.ib(
type=ReleaseUrlCollection,
converter=instance_check_converter( # type: ignore
ReleaseUrlCollection, convert_release_urls_to_collection
),
)
@urls.default
def _get_urls_collection(self):
return functools.partial(convert_release_urls_to_collection, urls=[], name=self.name)
@property
def name(self):
# type: () -> str
return self.info.name
@property
def version(self):
# type: () -> str
return self.info.version
@property
def requirement(self):
# type: () -> PackagingRequirement
return self.info.to_dependency().requirement
@property
def latest_sdist(self):
# type: () -> ReleaseUrl
return next(iter(self.urls.sdists))
@property
def latest_wheels(self):
# type: () -> Iterator[ReleaseUrl]
for wheel in self.urls.wheels:
yield wheel
@property
def dependencies(self):
# type: () -> List[Dependency]
if self.info.dependencies is None and list(self.urls):
rval = self.get_dependencies()
return rval.dependencies
return list(self.info.dependencies)
[docs] def get_dependencies(self):
# type: () -> "Package"
urls = [] # type: List[ReleaseUrl]
deps = set() # type: Set[str]
info = self.info
if info.dependencies is None:
for url in self.urls:
try:
url, dep_dict = url.get_dependencies()
except (RuntimeError, TypeError):
# This happens if we are parsing `setup.py` and we fail
if url.is_sdist:
continue
else:
raise
markers = url.markers
dep_list = dep_dict.get("requires_dist", [])
for dep in dep_list:
# XXX: We need to parse these as requirements and "and" the markers
# XXX: together because they may contain "extra" markers which we
# XXX: will need to parse and remove
deps.add(add_markers_to_dep(dep, markers))
urls.append(url)
if None in deps:
deps.remove(None)
info = attr.evolve(
self.info, requires_dist=tuple(sorted(deps))
).create_dependencies(force=True)
return attr.evolve(self, info=info, urls=urls)
[docs] @classmethod
def from_json(cls, package_json):
# type: (Dict[str, Any]) -> "Package"
info = convert_package_info(package_json["info"]).create_dependencies()
releases = convert_releases_to_collection(
package_json["releases"], name=info.name
)
urls = convert_release_urls_to_collection(package_json["urls"], name=info.name)
return cls(
info=info,
releases=releases,
urls=urls,
last_serial=package_json["last_serial"],
)
[docs] def pin_dependencies(self, include_extras=None):
# type: (Optional[List[str]]) -> Tuple[List["Package"], Dict[str, List[SpecifierSet]]]
deps = []
if include_extras:
include_extras = list(sorted(set(include_extras)))
else:
include_extras = []
constraints = defaultdict(list)
for dep in self.dependencies:
if dep.from_extras and dep.from_extras not in include_extras:
continue
if dep.specifier:
constraints[dep.name].append(dep.specifier)
try:
pinned = dep.pin()
except requests.exceptions.HTTPError:
continue
deps.append(pinned)
return deps, constraints
[docs] def get_latest_lockfile(self):
# type: () -> Dict[str, Dict[str, Union[List[str], str]]]
lockfile = {}
constraints = {dep.name: dep.specifier for dep in self.dependencies}
deps, _ = self.pin_dependencies()
for dep in deps:
dep = dep.get_dependencies()
for sub_dep in dep.dependencies:
if sub_dep.name not in constraints:
logger.info(
"Adding {0} (from {1}) {2!s}".format(
sub_dep.name, dep.name, sub_dep.specifier
)
)
constraints[sub_dep.name] = sub_dep.specifier
else:
existing = "{0} (from {1}): {2!s} + ".format(
sub_dep.name, dep.name, constraints[sub_dep.name]
)
new_specifier = sub_dep.specifier
merged = constraints[sub_dep.name] & new_specifier
logger.info(
"Updating: {0}{1!s} = {2!s}".format(
existing, new_specifier, merged
)
)
constraints[sub_dep.name] = merged
lockfile.update({dep.info.name: dep.releases.get_latest_lockfile()})
for sub_dep_name, specset in constraints.items():
try:
sub_dep_pkg = get_package(sub_dep_name)
except requests.exceptions.HTTPError:
continue
logger.info("Getting package: {0} ({1!s})".format(sub_dep, specset))
sorted_releases = list(
sorted(
sub_dep_pkg.releases,
key=operator.attrgetter("parsed_version"),
reverse=True,
)
)
try:
version = next(iter(specset.filter((r.version for r in sorted_releases))))
except StopIteration:
logger.info("No version of {0} matches specifier: {1}".format(sub_dep, specset))
logger.info(
"Available versions: {0}".format(
" ".join([r.version for r in sorted_releases])
)
)
raise
sub_dep_instance = get_package_version(sub_dep_name, version=str(version))
if sub_dep_instance is None:
continue
lockfile.update(
{
sub_dep_instance.info.name: sub_dep_instance.releases.get_latest_lockfile()
}
)
# lockfile.update(dep.get_latest_lockfile())
lockfile.update({self.info.name: self.releases.get_latest_lockfile()})
return lockfile
[docs] def as_dict(self):
# type: () -> Dict[str, Any]
return json.loads(self.serialize())
[docs] def serialize(self):
# type: () -> str
return json.dumps(attr.asdict(self), cls=PackageEncoder, indent=4)
[docs]def get_package(name):
# type: (str) -> Package
url = "https://pypi.org/pypi/{}/json".format(name)
with requests.get(url) as r:
r.raise_for_status()
result = r.json()
package = Package.from_json(result)
return package
[docs]def get_package_version(name, version):
# type: (str, str) -> Package
url = "https://pypi.org/pypi/{0}/{1}/json".format(name, version)
with requests.get(url) as r:
r.raise_for_status()
result = r.json()
package = Package.from_json(result)
return package
[docs]def get_package_from_requirement(req):
# type: (PackagingRequirement) -> Tuple[Package, Set[str]]
versions = set()
if is_pinned_requirement(req):
version = get_pinned_version(req)
versions.add(version)
pkg = get_package_version(req.name, version)
else:
pkg = get_package(req.name)
sorted_releases = list(
sorted(pkg.releases, key=operator.attrgetter("parsed_version"), reverse=True)
)
versions = set(req.specifier.filter((r.version for r in sorted_releases)))
version = next(iter(req.specifier.filter((r.version for r in sorted_releases))))
if pkg.version not in versions:
pkg = get_package_version(pkg.name, version)
return pkg, versions