Source code for requirementslib.models.dependencies

# -*- coding=utf-8 -*-

import atexit
import contextlib
import copy
import functools
import os

import attr
import packaging.markers
import packaging.version
import pip_shims.shims
import requests
from packaging.utils import canonicalize_name
from vistir.compat import JSONDecodeError, fs_str
from vistir.contextmanagers import cd, temp_environ
from vistir.path import create_tracked_tempdir

from ..environment import MYPY_RUNNING
from ..utils import _ensure_dir, prepare_pip_source_args
from .cache import CACHE_DIR, DependencyCache
from .setup_info import SetupInfo
from .utils import (
    clean_requires_python,
    fix_requires_python_marker,
    format_requirement,
    full_groupby,
    is_pinned_requirement,
    key_from_ireq,
    make_install_requirement,
    name_from_req,
    version_from_ireq,
)

try:
    from contextlib import ExitStack
except ImportError:
    from contextlib2 import ExitStack

if MYPY_RUNNING:
    from typing import (
        Any,
        Dict,
        List,
        Generator,
        Optional,
        Union,
        Tuple,
        TypeVar,
        Text,
        Set,
    )
    from pip_shims.shims import (
        InstallRequirement,
        InstallationCandidate,
        PackageFinder,
        Command,
    )
    from packaging.requirements import Requirement as PackagingRequirement
    from packaging.markers import Marker

    TRequirement = TypeVar("TRequirement")
    RequirementType = TypeVar(
        "RequirementType", covariant=True, bound=PackagingRequirement
    )
    MarkerType = TypeVar("MarkerType", covariant=True, bound=Marker)
    STRING_TYPE = Union[str, bytes, Text]
    S = TypeVar("S", bytes, str, Text)


PKGS_DOWNLOAD_DIR = fs_str(os.path.join(CACHE_DIR, "pkgs"))
WHEEL_DOWNLOAD_DIR = fs_str(os.path.join(CACHE_DIR, "wheels"))

DEPENDENCY_CACHE = DependencyCache()


@contextlib.contextmanager
def _get_wheel_cache():
    with pip_shims.shims.global_tempdir_manager():
        yield pip_shims.shims.WheelCache(
            CACHE_DIR, pip_shims.shims.FormatControl(set(), set())
        )


def _get_filtered_versions(ireq, versions, prereleases):
    return set(ireq.specifier.filter(versions, prereleases=prereleases))


[docs]def find_all_matches(finder, ireq, pre=False): # type: (PackageFinder, InstallRequirement, bool) -> List[InstallationCandidate] """Find all matching dependencies using the supplied finder and the given ireq. :param finder: A package finder for discovering matching candidates. :type finder: :class:`~pip._internal.index.PackageFinder` :param ireq: An install requirement. :type ireq: :class:`~pip._internal.req.req_install.InstallRequirement` :return: A list of matching candidates. :rtype: list[:class:`~pip._internal.index.InstallationCandidate`] """ candidates = clean_requires_python(finder.find_all_candidates(ireq.name)) versions = {candidate.version for candidate in candidates} allowed_versions = _get_filtered_versions(ireq, versions, pre) if not pre and not allowed_versions: allowed_versions = _get_filtered_versions(ireq, versions, True) candidates = {c for c in candidates if c.version in allowed_versions} return candidates
[docs]def get_pip_command(): # type: () -> Command # Use pip's parser for pip.conf management and defaults. # General options (find_links, index_url, extra_index_url, trusted_host, # and pre) are defered to pip. pip_command = pip_shims.shims.InstallCommand() return pip_command
[docs]@attr.s class AbstractDependency(object): name = attr.ib() # type: STRING_TYPE specifiers = attr.ib() markers = attr.ib() candidates = attr.ib() requirement = attr.ib() parent = attr.ib() finder = attr.ib() dep_dict = attr.ib(default=attr.Factory(dict)) @property def version_set(self): """Return the set of versions for the candidates in this abstract dependency. :return: A set of matching versions :rtype: set(str) """ if len(self.candidates) == 1: return set() return set(packaging.version.parse(version_from_ireq(c)) for c in self.candidates)
[docs] def compatible_versions(self, other): """Find compatible version numbers between this abstract dependency and another one. :param other: An abstract dependency to compare with. :type other: :class:`~requirementslib.models.dependency.AbstractDependency` :return: A set of compatible version strings :rtype: set(str) """ if len(self.candidates) == 1 and next(iter(self.candidates)).editable: return self elif len(other.candidates) == 1 and next(iter(other.candidates)).editable: return other return self.version_set & other.version_set
[docs] def compatible_abstract_dep(self, other): """Merge this abstract dependency with another one. Return the result of the merge as a new abstract dependency. :param other: An abstract dependency to merge with :type other: :class:`~requirementslib.models.dependency.AbstractDependency` :return: A new, combined abstract dependency :rtype: :class:`~requirementslib.models.dependency.AbstractDependency` """ from .requirements import Requirement if len(self.candidates) == 1 and next(iter(self.candidates)).editable: return self elif len(other.candidates) == 1 and next(iter(other.candidates)).editable: return other new_specifiers = self.specifiers & other.specifiers markers = set(self.markers) if self.markers else set() if other.markers: markers.add(other.markers) new_markers = None if markers: new_markers = packaging.markers.Marker( " or ".join(str(m) for m in sorted(markers)) ) new_ireq = copy.deepcopy(self.requirement.ireq) new_ireq.req.specifier = new_specifiers new_ireq.req.marker = new_markers new_requirement = Requirement.from_line(format_requirement(new_ireq)) compatible_versions = self.compatible_versions(other) if isinstance(compatible_versions, AbstractDependency): return compatible_versions candidates = [ c for c in self.candidates if packaging.version.parse(version_from_ireq(c)) in compatible_versions ] dep_dict = {} candidate_strings = [format_requirement(c) for c in candidates] for c in candidate_strings: if c in self.dep_dict: dep_dict[c] = self.dep_dict.get(c) return AbstractDependency( name=self.name, specifiers=new_specifiers, markers=new_markers, candidates=candidates, requirement=new_requirement, parent=self.parent, dep_dict=dep_dict, finder=self.finder, )
[docs] def get_deps(self, candidate): """Get the dependencies of the supplied candidate. :param candidate: An installrequirement :type candidate: :class:`~pip._internal.req.req_install.InstallRequirement` :return: A list of abstract dependencies :rtype: list[:class:`~requirementslib.models.dependency.AbstractDependency`] """ key = format_requirement(candidate) if key not in self.dep_dict: from .requirements import Requirement req = Requirement.from_line(key) req = req.merge_markers(self.markers) self.dep_dict[key] = req.get_abstract_dependencies() return self.dep_dict[key]
[docs] @classmethod def from_requirement(cls, requirement, parent=None): """Creates a new :class:`~requirementslib.models.dependency.AbstractDependency` from a :class:`~requirementslib.models.requirements.Requirement` object. This class is used to find all candidates matching a given set of specifiers and a given requirement. :param requirement: A requirement for resolution :type requirement: :class:`~requirementslib.models.requirements.Requirement` object. """ name = requirement.normalized_name specifiers = requirement.ireq.specifier if not requirement.editable else "" markers = requirement.ireq.markers extras = requirement.ireq.extras is_pinned = is_pinned_requirement(requirement.ireq) is_constraint = bool(parent) _, finder = get_finder(sources=None) candidates = [] if not is_pinned and not requirement.editable: for r in requirement.find_all_matches(finder=finder): req = make_install_requirement( name, r.version, extras=extras, markers=markers, constraint=is_constraint, ) req.req.link = getattr(r, "location", getattr(r, "link", None)) req.parent = parent candidates.append(req) candidates = sorted( set(candidates), key=lambda k: packaging.version.parse(version_from_ireq(k)), ) else: candidates = [requirement.ireq] return cls( name=name, specifiers=specifiers, markers=markers, candidates=candidates, requirement=requirement, parent=parent, finder=finder, )
[docs] @classmethod def from_string(cls, line, parent=None): from .requirements import Requirement req = Requirement.from_line(line) abstract_dep = cls.from_requirement(req, parent=parent) return abstract_dep
[docs]def get_abstract_dependencies(reqs, sources=None, parent=None): """Get all abstract dependencies for a given list of requirements. Given a set of requirements, convert each requirement to an Abstract Dependency. :param reqs: A list of Requirements :type reqs: list[:class:`~requirementslib.models.requirements.Requirement`] :param sources: Pipfile-formatted sources, defaults to None :param sources: list[dict], optional :param parent: The parent of this list of dependencies, defaults to None :param parent: :class:`~requirementslib.models.requirements.Requirement`, optional :return: A list of Abstract Dependencies :rtype: list[:class:`~requirementslib.models.dependency.AbstractDependency`] """ deps = [] from .requirements import Requirement for req in reqs: if isinstance(req, pip_shims.shims.InstallRequirement): requirement = Requirement.from_line("{0}{1}".format(req.name, req.specifier)) if req.link: requirement.req.link = req.link requirement.markers = req.markers requirement.req.markers = req.markers requirement.extras = req.extras requirement.req.extras = req.extras elif isinstance(req, Requirement): requirement = copy.deepcopy(req) else: requirement = Requirement.from_line(req) dep = AbstractDependency.from_requirement(requirement, parent=parent) deps.append(dep) return deps
[docs]def get_dependencies(ireq, sources=None, parent=None): # type: (Union[InstallRequirement, InstallationCandidate], Optional[List[Dict[S, Union[S, bool]]]], Optional[AbstractDependency]) -> Set[S, ...] """Get all dependencies for a given install requirement. :param ireq: A single InstallRequirement :type ireq: :class:`~pip._internal.req.req_install.InstallRequirement` :param sources: Pipfile-formatted sources, defaults to None :type sources: list[dict], optional :param parent: The parent of this list of dependencies, defaults to None :type parent: :class:`~pip._internal.req.req_install.InstallRequirement` :return: A set of dependency lines for generating new InstallRequirements. :rtype: set(str) """ if not isinstance(ireq, pip_shims.shims.InstallRequirement): name = getattr(ireq, "project_name", getattr(ireq, "project", ireq.name)) version = getattr(ireq, "version", None) if not version: ireq = pip_shims.shims.InstallRequirement.from_line("{0}".format(name)) else: ireq = pip_shims.shims.InstallRequirement.from_line( "{0}=={1}".format(name, version) ) pip_options = get_pip_options(sources=sources) getters = [ get_dependencies_from_cache, get_dependencies_from_wheel_cache, get_dependencies_from_json, functools.partial(get_dependencies_from_index, pip_options=pip_options), ] for getter in getters: deps = getter(ireq) if deps is not None: return deps raise RuntimeError("failed to get dependencies for {}".format(ireq))
[docs]def get_dependencies_from_wheel_cache(ireq): # type: (pip_shims.shims.InstallRequirement) -> Optional[Set[pip_shims.shims.InstallRequirement]] """Retrieves dependencies for the given install requirement from the wheel cache. :param ireq: A single InstallRequirement :type ireq: :class:`~pip._internal.req.req_install.InstallRequirement` :return: A set of dependency lines for generating new InstallRequirements. :rtype: set(str) or None """ if ireq.editable or not is_pinned_requirement(ireq): return with _get_wheel_cache() as wheel_cache: matches = wheel_cache.get(ireq.link, name_from_req(ireq.req)) if matches: matches = set(matches) if not DEPENDENCY_CACHE.get(ireq): DEPENDENCY_CACHE[ireq] = [format_requirement(m) for m in matches] return matches return None
def _marker_contains_extra(ireq): # TODO: Implement better parsing logic avoid false-positives. return "extra" in repr(ireq.markers)
[docs]def get_dependencies_from_json(ireq): """Retrieves dependencies for the given install requirement from the json api. :param ireq: A single InstallRequirement :type ireq: :class:`~pip._internal.req.req_install.InstallRequirement` :return: A set of dependency lines for generating new InstallRequirements. :rtype: set(str) or None """ if ireq.editable or not is_pinned_requirement(ireq): return # It is technically possible to parse extras out of the JSON API's # requirement format, but it is such a chore let's just use the simple API. if ireq.extras: return session = requests.session() atexit.register(session.close) version = str(ireq.req.specifier).lstrip("=") def gen(ireq): info = None try: info = session.get( "https://pypi.org/pypi/{0}/{1}/json".format(ireq.req.name, version) ).json()["info"] finally: session.close() requires_dist = info.get("requires_dist", info.get("requires")) if not requires_dist: # The API can return None for this. return for requires in requires_dist: i = pip_shims.shims.InstallRequirement.from_line(requires) # See above, we don't handle requirements with extras. if not _marker_contains_extra(i): yield format_requirement(i) if ireq not in DEPENDENCY_CACHE: try: reqs = DEPENDENCY_CACHE[ireq] = list(gen(ireq)) except JSONDecodeError: return req_iter = iter(reqs) else: req_iter = gen(ireq) return set(req_iter)
[docs]def get_dependencies_from_cache(ireq): """Retrieves dependencies for the given install requirement from the dependency cache. :param ireq: A single InstallRequirement :type ireq: :class:`~pip._internal.req.req_install.InstallRequirement` :return: A set of dependency lines for generating new InstallRequirements. :rtype: set(str) or None """ if ireq.editable or not is_pinned_requirement(ireq): return if ireq not in DEPENDENCY_CACHE: return cached = set(DEPENDENCY_CACHE[ireq]) # Preserving sanity: Run through the cache and make sure every entry if # valid. If this fails, something is wrong with the cache. Drop it. try: broken = False for line in cached: dep_ireq = pip_shims.shims.InstallRequirement.from_line(line) name = canonicalize_name(dep_ireq.name) if _marker_contains_extra(dep_ireq): broken = True # The "extra =" marker breaks everything. elif name == canonicalize_name(ireq.name): broken = True # A package cannot depend on itself. if broken: break except Exception: broken = True if broken: del DEPENDENCY_CACHE[ireq] return return cached
[docs]def is_python(section): return section.startswith("[") and ":" in section
[docs]def get_dependencies_from_index(dep, sources=None, pip_options=None, wheel_cache=None): """Retrieves dependencies for the given install requirement from the pip resolver. :param dep: A single InstallRequirement :type dep: :class:`~pip._internal.req.req_install.InstallRequirement` :param sources: Pipfile-formatted sources, defaults to None :type sources: list[dict], optional :return: A set of dependency lines for generating new InstallRequirements. :rtype: set(str) or None """ session, finder = get_finder(sources=sources, pip_options=pip_options) dep.is_direct = True requirements = None setup_requires = {} with temp_environ(), ExitStack() as stack: if not wheel_cache: wheel_cache = stack.enter_context(_get_wheel_cache()) os.environ["PIP_EXISTS_ACTION"] = "i" if dep.editable and not dep.prepared and not dep.req: setup_info = SetupInfo.from_ireq(dep) results = setup_info.get_info() setup_requires.update(results["setup_requires"]) requirements = set(results["requires"].values()) else: results = pip_shims.shims.resolve(dep) requirements = [v for v in results.values() if v.name != dep.name] requirements = set([format_requirement(r) for r in requirements]) if not dep.editable and is_pinned_requirement(dep) and requirements is not None: DEPENDENCY_CACHE[dep] = list(requirements) return requirements
[docs]def get_pip_options(args=[], sources=None, pip_command=None): """Build a pip command from a list of sources :param args: positional arguments passed through to the pip parser :param sources: A list of pipfile-formatted sources, defaults to None :param sources: list[dict], optional :param pip_command: A pre-built pip command instance :type pip_command: :class:`~pip._internal.cli.base_command.Command` :return: An instance of pip_options using the supplied arguments plus sane defaults :rtype: :class:`~pip._internal.cli.cmdoptions` """ if not pip_command: pip_command = get_pip_command() if not sources: sources = [{"url": "https://pypi.org/simple", "name": "pypi", "verify_ssl": True}] _ensure_dir(CACHE_DIR) pip_args = args pip_args = prepare_pip_source_args(sources, pip_args) pip_options, _ = pip_command.parser.parse_args(pip_args) pip_options.cache_dir = CACHE_DIR return pip_options
[docs]def get_finder(sources=None, pip_command=None, pip_options=None): # type: (List[Dict[S, Union[S, bool]]], Optional[Command], Any) -> PackageFinder """Get a package finder for looking up candidates to install :param sources: A list of pipfile-formatted sources, defaults to None :param sources: list[dict], optional :param pip_command: A pip command instance, defaults to None :type pip_command: :class:`~pip._internal.cli.base_command.Command` :param pip_options: A pip options, defaults to None :type pip_options: :class:`~pip._internal.cli.cmdoptions` :return: A package finder :rtype: :class:`~pip._internal.index.PackageFinder` """ if not pip_command: pip_command = pip_shims.shims.InstallCommand() if not sources: sources = [{"url": "https://pypi.org/simple", "name": "pypi", "verify_ssl": True}] if not pip_options: pip_options = get_pip_options(sources=sources, pip_command=pip_command) session = pip_command._build_session(pip_options) atexit.register(session.close) finder = pip_shims.shims.get_package_finder( pip_shims.shims.InstallCommand(), options=pip_options, session=session ) return session, finder
[docs]@contextlib.contextmanager def start_resolver(finder=None, session=None, wheel_cache=None): """Context manager to produce a resolver. :param finder: A package finder to use for searching the index :type finder: :class:`~pip._internal.index.PackageFinder` :param :class:`~requests.Session` session: A session instance :param :class:`~pip._internal.cache.WheelCache` wheel_cache: A pip WheelCache instance :return: A 3-tuple of finder, preparer, resolver :rtype: (:class:`~pip._internal.operations.prepare.RequirementPreparer`, :class:`~pip._internal.resolve.Resolver`) """ pip_command = get_pip_command() pip_options = get_pip_options(pip_command=pip_command) session = None if not finder: session, finder = get_finder(pip_command=pip_command, pip_options=pip_options) if not session: session = pip_command._build_session(pip_options) download_dir = PKGS_DOWNLOAD_DIR _ensure_dir(download_dir) _build_dir = create_tracked_tempdir(fs_str("build")) _source_dir = create_tracked_tempdir(fs_str("source")) try: with ExitStack() as ctx: ctx.enter_context(pip_shims.shims.global_tempdir_manager()) if not wheel_cache: wheel_cache = ctx.enter_context(_get_wheel_cache()) _ensure_dir(fs_str(os.path.join(wheel_cache.cache_dir, "wheels"))) preparer = ctx.enter_context( pip_shims.shims.make_preparer( options=pip_options, finder=finder, session=session, build_dir=_build_dir, src_dir=_source_dir, download_dir=download_dir, wheel_download_dir=WHEEL_DOWNLOAD_DIR, progress_bar="off", build_isolation=False, install_cmd=pip_command, ) ) resolver = pip_shims.shims.get_resolver( finder=finder, ignore_dependencies=False, ignore_requires_python=True, preparer=preparer, session=session, options=pip_options, install_cmd=pip_command, wheel_cache=wheel_cache, force_reinstall=True, ignore_installed=True, upgrade_strategy="to-satisfy-only", isolated=False, use_user_site=False, ) yield resolver finally: session.close()
[docs]def get_grouped_dependencies(constraints): # We need to track what contributed a specifierset # as well as which specifiers were required by the root node # in order to resolve any conflicts when we are deciding which thing to backtrack on # then we take the loose match (which _is_ flexible) and start moving backwards in # versions by popping them off of a stack and checking for the conflicting package for _, ireqs in full_groupby(constraints, key=key_from_ireq): ireqs = sorted(ireqs, key=lambda ireq: ireq.editable) editable_ireq = next(iter(ireq for ireq in ireqs if ireq.editable), None) if editable_ireq: yield editable_ireq # only the editable match mattters, ignore all others continue ireqs = iter(ireqs) # deepcopy the accumulator so as to not modify the self.our_constraints invariant combined_ireq = copy.deepcopy(next(ireqs)) for ireq in ireqs: # NOTE we may be losing some info on dropped reqs here try: combined_ireq.req.specifier &= ireq.req.specifier except TypeError: if ireq.req.specifier._specs and not combined_ireq.req.specifier._specs: combined_ireq.req.specifier._specs = ireq.req.specifier._specs combined_ireq.constraint &= ireq.constraint if not combined_ireq.markers: combined_ireq.markers = ireq.markers else: _markers = combined_ireq.markers._markers if not isinstance(_markers[0], (tuple, list)): combined_ireq.markers._markers = [ _markers, "and", ireq.markers._markers, ] # Return a sorted, de-duped tuple of extras combined_ireq.extras = tuple( sorted(set(tuple(combined_ireq.extras) + tuple(ireq.extras))) ) yield combined_ireq