Source code for requirementslib.models.resolvers

# -*- coding=utf-8 -*-
from contextlib import contextmanager

import attr
import six
from pip_shims.shims import Wheel

from .cache import HashCache
from .utils import format_requirement, is_pinned_requirement, version_from_ireq


[docs]class ResolutionError(Exception): pass
[docs]@attr.s class DependencyResolver(object): pinned_deps = attr.ib(default=attr.Factory(dict)) #: A dictionary of abstract dependencies by name dep_dict = attr.ib(default=attr.Factory(dict)) #: A dictionary of sets of version numbers that are valid for a candidate currently candidate_dict = attr.ib(default=attr.Factory(dict)) #: A historical record of pins pin_history = attr.ib(default=attr.Factory(dict)) #: Whether to allow prerelease dependencies allow_prereleases = attr.ib(default=False) #: Stores hashes for each dependency hashes = attr.ib(default=attr.Factory(dict)) #: A hash cache hash_cache = attr.ib(default=attr.Factory(HashCache)) #: A finder for searching the index finder = attr.ib(default=None) #: Whether to include hashes even from incompatible wheels include_incompatible_hashes = attr.ib(default=True) #: A cache for storing available canddiates when using all wheels _available_candidates_cache = attr.ib(default=attr.Factory(dict))
[docs] @classmethod def create(cls, finder=None, allow_prereleases=False, get_all_hashes=True): if not finder: from .dependencies import get_finder finder_args = [] if allow_prereleases: finder_args.append("--pre") finder = get_finder(*finder_args) creation_kwargs = { "allow_prereleases": allow_prereleases, "include_incompatible_hashes": get_all_hashes, "finder": finder, "hash_cache": HashCache(), } resolver = cls(**creation_kwargs) return resolver
@property def dependencies(self): return list(self.dep_dict.values()) @property def resolution(self): return list(self.pinned_deps.values())
[docs] def add_abstract_dep(self, dep): """Add an abstract dependency by either creating a new entry or merging with an old one. :param dep: An abstract dependency to add :type dep: :class:`~requirementslib.models.dependency.AbstractDependency` :raises ResolutionError: Raised when the given dependency is not compatible with an existing abstract dependency. """ if dep.name in self.dep_dict: compatible_versions = self.dep_dict[dep.name].compatible_versions(dep) if compatible_versions: self.candidate_dict[dep.name] = compatible_versions self.dep_dict[dep.name] = self.dep_dict[dep.name].compatible_abstract_dep( dep ) else: raise ResolutionError else: self.candidate_dict[dep.name] = dep.version_set self.dep_dict[dep.name] = dep
[docs] def pin_deps(self): """Pins the current abstract dependencies and adds them to the history dict. Adds any new dependencies to the abstract dependencies already present by merging them together to form new, compatible abstract dependencies. """ for name in list(self.dep_dict.keys()): candidates = self.dep_dict[name].candidates[:] abs_dep = self.dep_dict[name] while candidates: pin = candidates.pop() # Move on from existing pins if the new pin isn't compatible if name in self.pinned_deps: if self.pinned_deps[name].editable: continue old_version = version_from_ireq(self.pinned_deps[name]) if not pin.editable: new_version = version_from_ireq(pin) if ( new_version != old_version and new_version not in self.candidate_dict[name] ): continue pin.parent = abs_dep.parent pin_subdeps = self.dep_dict[name].get_deps(pin) backup = self.dep_dict.copy(), self.candidate_dict.copy() try: for pin_dep in pin_subdeps: self.add_abstract_dep(pin_dep) except ResolutionError: self.dep_dict, self.candidate_dict = backup continue else: self.pinned_deps[name] = pin break
[docs] def resolve(self, root_nodes, max_rounds=20): """Resolves dependencies using a backtracking resolver and multiple endpoints. Note: this resolver caches aggressively. Runs for *max_rounds* or until any two pinning rounds yield the same outcome. :param root_nodes: A list of the root requirements. :type root_nodes: list[:class:`~requirementslib.models.requirements.Requirement`] :param max_rounds: The max number of resolution rounds, defaults to 20 :param max_rounds: int, optional :raises RuntimeError: Raised when max rounds is exceeded without a resolution. """ if self.dep_dict: raise RuntimeError("Do not use the same resolver more than once") if not self.hash_cache: self.hash_cache = HashCache() # Coerce input into AbstractDependency instances. # We accept str, Requirement, and AbstractDependency as input. from .dependencies import AbstractDependency from ..utils import log for dep in root_nodes: if isinstance(dep, six.string_types): dep = AbstractDependency.from_string(dep) elif not isinstance(dep, AbstractDependency): dep = AbstractDependency.from_requirement(dep) self.add_abstract_dep(dep) for round_ in range(max_rounds): self.pin_deps() self.pin_history[round_] = self.pinned_deps.copy() if round_ > 0: previous_round = set(self.pin_history[round_ - 1].values()) current_values = set(self.pin_history[round_].values()) difference = current_values - previous_round else: difference = set(self.pin_history[round_].values()) log.debug("\n") log.debug("{:=^30}".format(" Round {0} ".format(round_))) log.debug("\n") if difference: log.debug("New Packages: ") for d in difference: log.debug("{:>30}".format(format_requirement(d))) elif round_ >= 3: log.debug("Stable Pins: ") for d in current_values: log.debug("{:>30}".format(format_requirement(d))) return else: log.debug("No New Packages.") # TODO: Raise a better error. raise RuntimeError("cannot resolve after {} rounds".format(max_rounds))
[docs] def get_hashes(self): for dep in self.pinned_deps.values(): if dep.name not in self.hashes: self.hashes[dep.name] = self.get_hashes_for_one(dep) return self.hashes.copy()
[docs] def get_hashes_for_one(self, ireq): if not self.finder: from .dependencies import get_finder finder_args = [] if self.allow_prereleases: finder_args.append("--pre") self.finder = get_finder(*finder_args) if ireq.editable: return set() from pip_shims import VcsSupport vcs = VcsSupport() if ( ireq.link and ireq.link.scheme in vcs.all_schemes and "ssh" in ireq.link.scheme ): return set() if not is_pinned_requirement(ireq): raise TypeError("Expected pinned requirement, got {}".format(ireq)) matching_candidates = set() with self.allow_all_wheels(): from .dependencies import find_all_matches matching_candidates = find_all_matches( self.finder, ireq, pre=self.allow_prereleases ) return { self.hash_cache.get_hash( getattr(candidate, "location", getattr(candidate, "link", None)) ) for candidate in matching_candidates }
[docs] @contextmanager def allow_all_wheels(self): """ Monkey patches pip.Wheel to allow wheels from all platforms and Python versions. This also saves the candidate cache and set a new one, or else the results from the previous non-patched calls will interfere. """ def _wheel_supported(self, tags=None): # Ignore current platform. Support everything. return True def _wheel_support_index_min(self, tags=None): # All wheels are equal priority for sorting. return 0 original_wheel_supported = Wheel.supported original_support_index_min = Wheel.support_index_min Wheel.supported = _wheel_supported Wheel.support_index_min = _wheel_support_index_min try: yield finally: Wheel.supported = original_wheel_supported Wheel.support_index_min = original_support_index_min