Source code for requirementslib.models.project

# -*- coding=utf-8 -*-

from __future__ import absolute_import, print_function, unicode_literals

import collections
import io
import os

import attr
import packaging.markers
import packaging.utils
import plette
import plette.models
import six
import tomlkit
from vistir.compat import FileNotFoundError

SectionDifference = collections.namedtuple("SectionDifference", ["inthis", "inthat"])
FileDifference = collections.namedtuple("FileDifference", ["default", "develop"])


def _are_pipfile_entries_equal(a, b):
    a = {k: v for k, v in a.items() if k not in ("markers", "hashes", "hash")}
    b = {k: v for k, v in b.items() if k not in ("markers", "hashes", "hash")}
    if a != b:
        return False
    try:
        marker_eval_a = packaging.markers.Marker(a["markers"]).evaluate()
    except (AttributeError, KeyError, TypeError, ValueError):
        marker_eval_a = True
    try:
        marker_eval_b = packaging.markers.Marker(b["markers"]).evaluate()
    except (AttributeError, KeyError, TypeError, ValueError):
        marker_eval_b = True
    return marker_eval_a == marker_eval_b


DEFAULT_NEWLINES = "\n"


[docs]def preferred_newlines(f): if isinstance(f.newlines, six.text_type): return f.newlines return DEFAULT_NEWLINES
[docs]@attr.s class ProjectFile(object): """A file in the Pipfile project. """ location = attr.ib() line_ending = attr.ib() model = attr.ib()
[docs] @classmethod def read(cls, location, model_cls, invalid_ok=False): if not os.path.exists(location) and not invalid_ok: raise FileNotFoundError(location) try: with io.open(location, encoding="utf-8") as f: model = model_cls.load(f) line_ending = preferred_newlines(f) except Exception: if not invalid_ok: raise model = None line_ending = DEFAULT_NEWLINES return cls(location=location, line_ending=line_ending, model=model)
[docs] def write(self): kwargs = {"encoding": "utf-8", "newline": self.line_ending} with io.open(self.location, "w", **kwargs) as f: self.model.dump(f)
[docs] def dumps(self): strio = six.StringIO() self.model.dump(strio) return strio.getvalue()
[docs]@attr.s class Project(object): root = attr.ib() _p = attr.ib(init=False) _l = attr.ib(init=False) def __attrs_post_init__(self): self.root = root = os.path.abspath(self.root) self._p = ProjectFile.read(os.path.join(root, "Pipfile"), plette.Pipfile) self._l = ProjectFile.read( os.path.join(root, "Pipfile.lock"), plette.Lockfile, invalid_ok=True ) @property def pipfile(self): return self._p.model @property def pipfile_location(self): return self._p.location @property def lockfile(self): return self._l.model @property def lockfile_location(self): return self._l.location @lockfile.setter def lockfile(self, new): self._l.model = new
[docs] def is_synced(self): return self.lockfile and self.lockfile.is_up_to_date(self.pipfile)
def _get_pipfile_section(self, develop, insert=True): name = "dev-packages" if develop else "packages" try: section = self.pipfile[name] except KeyError: section = plette.models.PackageCollection(tomlkit.table()) if insert: self.pipfile[name] = section return section
[docs] def contains_key_in_pipfile(self, key): sections = [ self._get_pipfile_section(develop=False, insert=False), self._get_pipfile_section(develop=True, insert=False), ] return any( ( packaging.utils.canonicalize_name(name) == packaging.utils.canonicalize_name(key) ) for section in sections for name in section )
[docs] def add_line_to_pipfile(self, line, develop): from requirementslib import Requirement requirement = Requirement.from_line(line) section = self._get_pipfile_section(develop=develop) key = requirement.normalized_name entry = next(iter(requirement.as_pipfile().values())) if isinstance(entry, dict): # HACK: TOMLKit prefers to expand tables by default, but we # always want inline tables here. Also tomlkit.inline_table # does not have `update()`. table = tomlkit.inline_table() for k, v in entry.items(): table[k] = v entry = table section[key] = entry
[docs] def remove_keys_from_pipfile(self, keys, default, develop): keys = {packaging.utils.canonicalize_name(key) for key in keys} sections = [] if default: sections.append(self._get_pipfile_section(develop=False, insert=False)) if develop: sections.append(self._get_pipfile_section(develop=True, insert=False)) for section in sections: removals = set() for name in section: if packaging.utils.canonicalize_name(name) in keys: removals.add(name) for key in removals: del section._data[key]
[docs] def remove_keys_from_lockfile(self, keys): keys = {packaging.utils.canonicalize_name(key) for key in keys} removed = False for section_name in ("default", "develop"): try: section = self.lockfile[section_name] except KeyError: continue removals = set() for name in section: if packaging.utils.canonicalize_name(name) in keys: removals.add(name) removed = removed or bool(removals) for key in removals: del section._data[key] if removed: # HACK: The lock file no longer represents the Pipfile at this # point. Set the hash to an arbitrary invalid value. self.lockfile.meta.hash = plette.models.Hash({"__invalid__": ""})
[docs] def difference_lockfile(self, lockfile): """Generate a difference between the current and given lockfiles. Returns a 2-tuple containing differences in default in develop sections. Each element is a 2-tuple of dicts. The first, `inthis`, contains entries only present in the current lockfile; the second, `inthat`, contains entries only present in the given one. If a key exists in both this and that, but the values differ, the key is present in both dicts, pointing to values from each file. """ diff_data = { "default": SectionDifference({}, {}), "develop": SectionDifference({}, {}), } for section_name, section_diff in diff_data.items(): try: this = self.lockfile[section_name]._data except (KeyError, TypeError): this = {} try: that = lockfile[section_name]._data except (KeyError, TypeError): that = {} for key, this_value in this.items(): try: that_value = that[key] except KeyError: section_diff.inthis[key] = this_value continue if not _are_pipfile_entries_equal(this_value, that_value): section_diff.inthis[key] = this_value section_diff.inthat[key] = that_value for key, that_value in that.items(): if key not in this: section_diff.inthat[key] = that_value return FileDifference(**diff_data)