Source code for requirementslib.utils

# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function

import logging
import os
import sys

import pip_shims.shims
import six
import six.moves
import tomlkit
import vistir
from six.moves.urllib.parse import urlparse, urlsplit, urlunparse
from vistir.compat import Path, fs_decode
from vistir.path import ensure_mkdir_p, is_valid_url

from .environment import MYPY_RUNNING

# fmt: off
six.add_move(  # type: ignore
    six.MovedAttribute("Mapping", "collections", "collections.abc")  # type: ignore
)  # noqa  # isort:skip
six.add_move(  # type: ignore
    six.MovedAttribute("Sequence", "collections", "collections.abc")  # type: ignore
)  # noqa  # isort:skip
six.add_move(  # type: ignore
    six.MovedAttribute("Set", "collections", "collections.abc")  # type: ignore
)  # noqa  # isort:skip
six.add_move(  # type: ignore
    six.MovedAttribute("ItemsView", "collections", "collections.abc")  # type: ignore
)  # noqa
from six.moves import ItemsView, Mapping, Sequence, Set  # type: ignore  # noqa  # isort:skip
# fmt: on


if MYPY_RUNNING:
    from typing import Dict, Any, Optional, Union, Tuple, List, Iterable, Text, TypeVar

    STRING_TYPE = Union[bytes, str, Text]
    S = TypeVar("S", bytes, str, Text)
    PipfileEntryType = Union[STRING_TYPE, bool, Tuple[STRING_TYPE], List[STRING_TYPE]]
    PipfileType = Union[STRING_TYPE, Dict[STRING_TYPE, PipfileEntryType]]


VCS_LIST = ("git", "svn", "hg", "bzr")


[docs]def setup_logger(): logger = logging.getLogger("requirementslib") loglevel = logging.DEBUG handler = logging.StreamHandler(stream=sys.stderr) handler.setLevel(loglevel) logger.addHandler(handler) logger.setLevel(loglevel) return logger
log = setup_logger() SCHEME_LIST = ("http://", "https://", "ftp://", "ftps://", "file://") VCS_SCHEMES = [ "git", "git+http", "git+https", "git+ssh", "git+git", "git+file", "hg", "hg+http", "hg+https", "hg+ssh", "hg+static-http", "svn", "svn+ssh", "svn+http", "svn+https", "svn+svn", "bzr", "bzr+http", "bzr+https", "bzr+ssh", "bzr+sftp", "bzr+ftp", "bzr+lp", ]
[docs]def is_installable_dir(path): # type: (STRING_TYPE) -> bool if pip_shims.shims.is_installable_dir(path): return True pyproject_path = os.path.join(path, "pyproject.toml") if os.path.exists(pyproject_path): pyproject = Path(pyproject_path) pyproject_toml = tomlkit.loads(pyproject.read_text()) build_system = pyproject_toml.get("build-system", {}).get("build-backend", "") if build_system: return True return False
[docs]def strip_ssh_from_git_uri(uri): # type: (S) -> S """Return git+ssh:// formatted URI to git+git@ format""" if isinstance(uri, six.string_types): if "git+ssh://" in uri: parsed = urlparse(uri) # split the path on the first separating / so we can put the first segment # into the 'netloc' section with a : separator path_part, _, path = parsed.path.lstrip("/").partition("/") path = "/{0}".format(path) parsed = parsed._replace( netloc="{0}:{1}".format(parsed.netloc, path_part), path=path ) uri = urlunparse(parsed).replace("git+ssh://", "git+", 1) return uri
[docs]def add_ssh_scheme_to_git_uri(uri): # type: (S) -> S """Cleans VCS uris from pip format""" if isinstance(uri, six.string_types): # Add scheme for parsing purposes, this is also what pip does if uri.startswith("git+") and "://" not in uri: uri = uri.replace("git+", "git+ssh://", 1) parsed = urlparse(uri) if ":" in parsed.netloc: netloc, _, path_start = parsed.netloc.rpartition(":") path = "/{0}{1}".format(path_start, parsed.path) uri = urlunparse(parsed._replace(netloc=netloc, path=path)) return uri
[docs]def is_vcs(pipfile_entry): # type: (PipfileType) -> bool """Determine if dictionary entry from Pipfile is for a vcs dependency.""" if isinstance(pipfile_entry, Mapping): return any(key for key in pipfile_entry.keys() if key in VCS_LIST) elif isinstance(pipfile_entry, six.string_types): if not is_valid_url(pipfile_entry) and pipfile_entry.startswith("git+"): pipfile_entry = add_ssh_scheme_to_git_uri(pipfile_entry) parsed_entry = urlsplit(pipfile_entry) return parsed_entry.scheme in VCS_SCHEMES return False
[docs]def is_editable(pipfile_entry): # type: (PipfileType) -> bool if isinstance(pipfile_entry, Mapping): return pipfile_entry.get("editable", False) is True if isinstance(pipfile_entry, six.string_types): return pipfile_entry.startswith("-e ") return False
[docs]def is_star(val): # type: (PipfileType) -> bool return (isinstance(val, six.string_types) and val == "*") or ( isinstance(val, Mapping) and val.get("version", "") == "*" )
[docs]def convert_entry_to_path(path): # type: (Dict[S, Union[S, bool, Tuple[S], List[S]]]) -> S """Convert a pipfile entry to a string""" if not isinstance(path, Mapping): raise TypeError("expecting a mapping, received {0!r}".format(path)) if not any(key in path for key in ["file", "path"]): raise ValueError("missing path-like entry in supplied mapping {0!r}".format(path)) if "file" in path: path = vistir.path.url_to_path(path["file"]) elif "path" in path: path = path["path"] if not os.name == "nt": return fs_decode(path) return Path(fs_decode(path)).as_posix()
[docs]def is_installable_file(path): # type: (PipfileType) -> bool """Determine if a path can potentially be installed""" from packaging import specifiers if isinstance(path, Mapping): path = convert_entry_to_path(path) # If the string starts with a valid specifier operator, test if it is a valid # specifier set before making a path object (to avoid breaking windows) if any(path.startswith(spec) for spec in "!=<>~"): try: specifiers.SpecifierSet(path) # If this is not a valid specifier, just move on and try it as a path except specifiers.InvalidSpecifier: pass else: return False parsed = urlparse(path) is_local = ( not parsed.scheme or parsed.scheme == "file" or (len(parsed.scheme) == 1 and os.name == "nt") ) if parsed.scheme and parsed.scheme == "file": path = vistir.compat.fs_decode(vistir.path.url_to_path(path)) normalized_path = vistir.path.normalize_path(path) if is_local and not os.path.exists(normalized_path): return False is_archive = pip_shims.shims.is_archive_file(normalized_path) is_local_project = os.path.isdir(normalized_path) and is_installable_dir( normalized_path ) if is_local and is_local_project or is_archive: return True if not is_local and pip_shims.shims.is_archive_file(parsed.path): return True return False
[docs]def get_dist_metadata(dist): import pkg_resources from email.parser import FeedParser if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata( "METADATA" ): metadata = dist.get_metadata("METADATA") elif dist.has_metadata("PKG-INFO"): metadata = dist.get_metadata("PKG-INFO") else: metadata = "" feed_parser = FeedParser() feed_parser.feed(metadata) return feed_parser.close()
[docs]def get_setup_paths(base_path, subdirectory=None): # type: (S, Optional[S]) -> Dict[S, Optional[S]] if base_path is None: raise TypeError("must provide a path to derive setup paths from") setup_py = os.path.join(base_path, "setup.py") setup_cfg = os.path.join(base_path, "setup.cfg") pyproject_toml = os.path.join(base_path, "pyproject.toml") if subdirectory is not None: base_path = os.path.join(base_path, subdirectory) subdir_setup_py = os.path.join(subdirectory, "setup.py") subdir_setup_cfg = os.path.join(subdirectory, "setup.cfg") subdir_pyproject_toml = os.path.join(subdirectory, "pyproject.toml") if subdirectory and os.path.exists(subdir_setup_py): setup_py = subdir_setup_py if subdirectory and os.path.exists(subdir_setup_cfg): setup_cfg = subdir_setup_cfg if subdirectory and os.path.exists(subdir_pyproject_toml): pyproject_toml = subdir_pyproject_toml return { "setup_py": setup_py if os.path.exists(setup_py) else None, "setup_cfg": setup_cfg if os.path.exists(setup_cfg) else None, "pyproject_toml": pyproject_toml if os.path.exists(pyproject_toml) else None, }
[docs]def prepare_pip_source_args(sources, pip_args=None): # type: (List[Dict[S, Union[S, bool]]], Optional[List[S]]) -> List[S] if pip_args is None: pip_args = [] if sources: # Add the source to pip9. pip_args.extend(["-i", sources[0]["url"]]) # type: ignore # Trust the host if it's not verified. if not sources[0].get("verify_ssl", True): pip_args.extend( ["--trusted-host", urlparse(sources[0]["url"]).hostname] ) # type: ignore # Add additional sources as extra indexes. if len(sources) > 1: for source in sources[1:]: pip_args.extend(["--extra-index-url", source["url"]]) # type: ignore # Trust the host if it's not verified. if not source.get("verify_ssl", True): pip_args.extend( ["--trusted-host", urlparse(source["url"]).hostname] ) # type: ignore return pip_args
@ensure_mkdir_p(mode=0o777) def _ensure_dir(path): return path _UNSET = object() _REMAP_EXIT = object() # The following functionality is either borrowed or modified from the itertools module # in the boltons library by Mahmoud Hashemi and distributed under the BSD license # the text of which is included below: # (original text from https://github.com/mahmoud/boltons/blob/master/LICENSE) # Copyright (c) 2013, Mahmoud Hashemi # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following # disclaimer in the documentation and/or other materials provided # with the distribution. # # * The names of the contributors may not be used to endorse or # promote products derived from this software without specific # prior written permission. # # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[docs]class PathAccessError(KeyError, IndexError, TypeError): """An amalgamation of KeyError, IndexError, and TypeError, representing what can occur when looking up a path in a nested object. """ def __init__(self, exc, seg, path): self.exc = exc self.seg = seg self.path = path def __repr__(self): cn = self.__class__.__name__ return "%s(%r, %r, %r)" % (cn, self.exc, self.seg, self.path) def __str__(self): return "could not access %r from path %r, got error: %r" % ( self.seg, self.path, self.exc, )
[docs]def get_path(root, path, default=_UNSET): """Retrieve a value from a nested object via a tuple representing the lookup path. >>> root = {'a': {'b': {'c': [[1], [2], [3]]}}} >>> get_path(root, ('a', 'b', 'c', 2, 0)) 3 The path format is intentionally consistent with that of :func:`remap`. One of get_path's chief aims is improved error messaging. EAFP is great, but the error messages are not. For instance, ``root['a']['b']['c'][2][1]`` gives back ``IndexError: list index out of range`` What went out of range where? get_path currently raises ``PathAccessError: could not access 2 from path ('a', 'b', 'c', 2, 1), got error: IndexError('list index out of range',)``, a subclass of IndexError and KeyError. You can also pass a default that covers the entire operation, should the lookup fail at any level. Args: root: The target nesting of dictionaries, lists, or other objects supporting ``__getitem__``. path (tuple): A list of strings and integers to be successively looked up within *root*. default: The value to be returned should any ``PathAccessError`` exceptions be raised. """ if isinstance(path, six.string_types): path = path.split(".") cur = root try: for seg in path: try: cur = cur[seg] except (KeyError, IndexError) as exc: raise PathAccessError(exc, seg, path) except TypeError as exc: # either string index in a list, or a parent that # doesn't support indexing try: seg = int(seg) cur = cur[seg] except (ValueError, KeyError, IndexError, TypeError): if not getattr(cur, "__iter__", None): exc = TypeError("%r object is not indexable" % type(cur).__name__) raise PathAccessError(exc, seg, path) except PathAccessError: if default is _UNSET: raise return default return cur
[docs]def default_visit(path, key, value): return key, value
_orig_default_visit = default_visit # Modified from https://github.com/mahmoud/boltons/blob/master/boltons/iterutils.py
[docs]def dict_path_enter(path, key, value): if isinstance(value, six.string_types): return value, False elif isinstance(value, (Mapping, dict)): return value.__class__(), ItemsView(value) elif isinstance(value, tomlkit.items.Array): return value.__class__([], value.trivia), enumerate(value) elif isinstance(value, (Sequence, list)): return value.__class__(), enumerate(value) elif isinstance(value, (Set, set)): return value.__class__(), enumerate(value) else: return value, False
[docs]def dict_path_exit(path, key, old_parent, new_parent, new_items): ret = new_parent if isinstance(new_parent, (Mapping, dict)): vals = dict(new_items) try: new_parent.update(new_items) except AttributeError: # Handle toml containers specifically try: new_parent.update(vals) # Now use default fallback if needed except AttributeError: ret = new_parent.__class__(vals) elif isinstance(new_parent, tomlkit.items.Array): vals = tomlkit.items.item([v for i, v in new_items]) try: new_parent._value.extend(vals._value) except AttributeError: ret = tomlkit.items.item(vals) elif isinstance(new_parent, (Sequence, list)): vals = [v for i, v in new_items] try: new_parent.extend(vals) except AttributeError: ret = new_parent.__class__(vals) # tuples elif isinstance(new_parent, (Set, set)): vals = [v for i, v in new_items] try: new_parent.update(vals) except AttributeError: ret = new_parent.__class__(vals) # frozensets else: raise RuntimeError("unexpected iterable type: %r" % type(new_parent)) return ret
[docs]def remap( root, visit=default_visit, enter=dict_path_enter, exit=dict_path_exit, **kwargs ): """The remap ("recursive map") function is used to traverse and transform nested structures. Lists, tuples, sets, and dictionaries are just a few of the data structures nested into heterogenous tree-like structures that are so common in programming. Unfortunately, Python's built-in ways to manipulate collections are almost all flat. List comprehensions may be fast and succinct, but they do not recurse, making it tedious to apply quick changes or complex transforms to real-world data. remap goes where list comprehensions cannot. Here's an example of removing all Nones from some data: >>> from pprint import pprint >>> reviews = {'Star Trek': {'TNG': 10, 'DS9': 8.5, 'ENT': None}, ... 'Babylon 5': 6, 'Dr. Who': None} >>> pprint(remap(reviews, lambda p, k, v: v is not None)) {'Babylon 5': 6, 'Star Trek': {'DS9': 8.5, 'TNG': 10}} Notice how both Nones have been removed despite the nesting in the dictionary. Not bad for a one-liner, and that's just the beginning. See `this remap cookbook`_ for more delicious recipes. .. _this remap cookbook: http://sedimental.org/remap.html remap takes four main arguments: the object to traverse and three optional callables which determine how the remapped object will be created. Args: root: The target object to traverse. By default, remap supports iterables like :class:`list`, :class:`tuple`, :class:`dict`, and :class:`set`, but any object traversable by *enter* will work. visit (callable): This function is called on every item in *root*. It must accept three positional arguments, *path*, *key*, and *value*. *path* is simply a tuple of parents' keys. *visit* should return the new key-value pair. It may also return ``True`` as shorthand to keep the old item unmodified, or ``False`` to drop the item from the new structure. *visit* is called after *enter*, on the new parent. The *visit* function is called for every item in root, including duplicate items. For traversable values, it is called on the new parent object, after all its children have been visited. The default visit behavior simply returns the key-value pair unmodified. enter (callable): This function controls which items in *root* are traversed. It accepts the same arguments as *visit*: the path, the key, and the value of the current item. It returns a pair of the blank new parent, and an iterator over the items which should be visited. If ``False`` is returned instead of an iterator, the value will not be traversed. The *enter* function is only called once per unique value. The default enter behavior support mappings, sequences, and sets. Strings and all other iterables will not be traversed. exit (callable): This function determines how to handle items once they have been visited. It gets the same three arguments as the other functions -- *path*, *key*, *value* -- plus two more: the blank new parent object returned from *enter*, and a list of the new items, as remapped by *visit*. Like *enter*, the *exit* function is only called once per unique value. The default exit behavior is to simply add all new items to the new parent, e.g., using :meth:`list.extend` and :meth:`dict.update` to add to the new parent. Immutable objects, such as a :class:`tuple` or :class:`namedtuple`, must be recreated from scratch, but use the same type as the new parent passed back from the *enter* function. reraise_visit (bool): A pragmatic convenience for the *visit* callable. When set to ``False``, remap ignores any errors raised by the *visit* callback. Items causing exceptions are kept. See examples for more details. remap is designed to cover the majority of cases with just the *visit* callable. While passing in multiple callables is very empowering, remap is designed so very few cases should require passing more than one function. When passing *enter* and *exit*, it's common and easiest to build on the default behavior. Simply add ``from boltons.iterutils import default_enter`` (or ``default_exit``), and have your enter/exit function call the default behavior before or after your custom logic. See `this example`_. Duplicate and self-referential objects (aka reference loops) are automatically handled internally, `as shown here`_. .. _this example: http://sedimental.org/remap.html#sort_all_lists .. _as shown here: http://sedimental.org/remap.html#corner_cases """ # TODO: improve argument formatting in sphinx doc # TODO: enter() return (False, items) to continue traverse but cancel copy? if not callable(visit): raise TypeError("visit expected callable, not: %r" % visit) if not callable(enter): raise TypeError("enter expected callable, not: %r" % enter) if not callable(exit): raise TypeError("exit expected callable, not: %r" % exit) reraise_visit = kwargs.pop("reraise_visit", True) if kwargs: raise TypeError("unexpected keyword arguments: %r" % kwargs.keys()) path, registry, stack = (), {}, [(None, root)] new_items_stack = [] while stack: key, value = stack.pop() id_value = id(value) if key is _REMAP_EXIT: key, new_parent, old_parent = value id_value = id(old_parent) path, new_items = new_items_stack.pop() value = exit(path, key, old_parent, new_parent, new_items) registry[id_value] = value if not new_items_stack: continue elif id_value in registry: value = registry[id_value] else: res = enter(path, key, value) try: new_parent, new_items = res except TypeError: # TODO: handle False? raise TypeError( "enter should return a tuple of (new_parent," " items_iterator), not: %r" % res ) if new_items is not False: # traverse unless False is explicitly passed registry[id_value] = new_parent new_items_stack.append((path, [])) if value is not root: path += (key,) stack.append((_REMAP_EXIT, (key, new_parent, value))) if new_items: stack.extend(reversed(list(new_items))) continue if visit is _orig_default_visit: # avoid function call overhead by inlining identity operation visited_item = (key, value) else: try: visited_item = visit(path, key, value) except Exception: if reraise_visit: raise visited_item = True if visited_item is False: continue # drop elif visited_item is True: visited_item = (key, value) # TODO: typecheck? # raise TypeError('expected (key, value) from visit(),' # ' not: %r' % visited_item) try: new_items_stack[-1][1].append(visited_item) except IndexError: raise TypeError("expected remappable root, not: %r" % root) return value
[docs]def merge_items(target_list, sourced=False): if not sourced: target_list = [(id(t), t) for t in target_list] ret = None source_map = {} def remerge_enter(path, key, value): new_parent, new_items = dict_path_enter(path, key, value) if ret and not path and key is None: new_parent = ret try: cur_val = get_path(ret, path + (key,)) except KeyError as ke: pass else: new_parent = cur_val return new_parent, new_items def remerge_exit(path, key, old_parent, new_parent, new_items): return dict_path_exit(path, key, old_parent, new_parent, new_items) for t_name, target in target_list: if sourced: def remerge_visit(path, key, value): source_map[path + (key,)] = t_name return True else: remerge_visit = default_visit ret = remap(target, enter=remerge_enter, visit=remerge_visit, exit=remerge_exit) if not sourced: return ret return ret, source_map