# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import copy
import itertools
import os
import attr
import plette.lockfiles
import six
from vistir.compat import FileNotFoundError, JSONDecodeError, Path
from ..exceptions import LockfileCorruptException, MissingParameter, PipfileNotFound
from ..utils import is_editable, is_vcs, merge_items
from .project import ProjectFile
from .requirements import Requirement
from .utils import optional_instance_of
DEFAULT_NEWLINES = six.text_type("\n")
[docs]def preferred_newlines(f):
if isinstance(f.newlines, six.text_type):
return f.newlines
return DEFAULT_NEWLINES
is_lockfile = optional_instance_of(plette.lockfiles.Lockfile)
is_projectfile = optional_instance_of(ProjectFile)
[docs]@attr.s(slots=True)
class Lockfile(object):
path = attr.ib(validator=optional_instance_of(Path), type=Path)
_requirements = attr.ib(default=attr.Factory(list), type=list)
_dev_requirements = attr.ib(default=attr.Factory(list), type=list)
projectfile = attr.ib(validator=is_projectfile, type=ProjectFile)
_lockfile = attr.ib(validator=is_lockfile, type=plette.lockfiles.Lockfile)
newlines = attr.ib(default=DEFAULT_NEWLINES, type=six.text_type)
@path.default
def _get_path(self):
return Path(os.curdir).joinpath("Pipfile.lock").absolute()
@projectfile.default
def _get_projectfile(self):
return self.load_projectfile(self.path)
@_lockfile.default
def _get_lockfile(self):
return self.projectfile.model
@property
def lockfile(self):
return self._lockfile
@property
def section_keys(self):
return ["default", "develop"]
@property
def extended_keys(self):
return [k for k in itertools.product(self.section_keys, ["", "vcs", "editable"])]
[docs] def get(self, k):
return self.__getitem__(k)
def __contains__(self, k):
check_lockfile = k in self.extended_keys or self.lockfile.__contains__(k)
if check_lockfile:
return True
return super(Lockfile, self).__contains__(k)
def __setitem__(self, k, v):
lockfile = self._lockfile
lockfile.__setitem__(k, v)
def __getitem__(self, k, *args, **kwargs):
retval = None
lockfile = self._lockfile
section = None
pkg_type = None
try:
retval = lockfile[k]
except KeyError:
if "-" in k:
section, _, pkg_type = k.rpartition("-")
vals = getattr(lockfile.get(section, {}), "_data", {})
if pkg_type == "vcs":
retval = {k: v for k, v in vals.items() if is_vcs(v)}
elif pkg_type == "editable":
retval = {k: v for k, v in vals.items() if is_editable(v)}
if retval is None:
raise
else:
retval = getattr(retval, "_data", retval)
return retval
def __getattr__(self, k, *args, **kwargs):
retval = None
lockfile = super(Lockfile, self).__getattribute__("_lockfile")
try:
return super(Lockfile, self).__getattribute__(k)
except AttributeError:
retval = getattr(lockfile, k, None)
if retval is not None:
return retval
return super(Lockfile, self).__getattribute__(k, *args, **kwargs)
[docs] def get_deps(self, dev=False, only=True):
deps = {}
if dev:
deps.update(self.develop._data)
if only:
return deps
deps = merge_items([deps, self.default._data])
return deps
[docs] @classmethod
def read_projectfile(cls, path):
"""Read the specified project file and provide an interface for writing/updating.
:param str path: Path to the target file.
:return: A project file with the model and location for interaction
:rtype: :class:`~requirementslib.models.project.ProjectFile`
"""
pf = ProjectFile.read(path, plette.lockfiles.Lockfile, invalid_ok=True)
return pf
[docs] @classmethod
def lockfile_from_pipfile(cls, pipfile_path):
from .pipfile import Pipfile
if os.path.isfile(pipfile_path):
if not os.path.isabs(pipfile_path):
pipfile_path = os.path.abspath(pipfile_path)
pipfile = Pipfile.load(os.path.dirname(pipfile_path))
return plette.lockfiles.Lockfile.with_meta_from(pipfile._pipfile)
raise PipfileNotFound(pipfile_path)
[docs] @classmethod
def load_projectfile(cls, path, create=True, data=None):
"""Given a path, load or create the necessary lockfile.
:param str path: Path to the project root or lockfile
:param bool create: Whether to create the lockfile if not found, defaults to True
:raises OSError: Thrown if the project root directory doesn't exist
:raises FileNotFoundError: Thrown if the lockfile doesn't exist and ``create=False``
:return: A project file instance for the supplied project
:rtype: :class:`~requirementslib.models.project.ProjectFile`
"""
if not path:
path = os.curdir
path = Path(path).absolute()
project_path = path if path.is_dir() else path.parent
lockfile_path = path if path.is_file() else project_path / "Pipfile.lock"
if not project_path.exists():
raise OSError("Project does not exist: %s" % project_path.as_posix())
elif not lockfile_path.exists() and not create:
raise FileNotFoundError(
"Lockfile does not exist: %s" % lockfile_path.as_posix()
)
projectfile = cls.read_projectfile(lockfile_path.as_posix())
if not lockfile_path.exists():
if not data:
path_str = lockfile_path.as_posix()
if path_str[-5:] == ".lock":
pipfile = Path(path_str[:-5])
else:
pipfile = project_path.joinpath("Pipfile")
lf = cls.lockfile_from_pipfile(pipfile)
else:
lf = plette.lockfiles.Lockfile(data)
projectfile.model = lf
return projectfile
[docs] @classmethod
def from_data(cls, path, data, meta_from_project=True):
"""Create a new lockfile instance from a dictionary.
:param str path: Path to the project root.
:param dict data: Data to load into the lockfile.
:param bool meta_from_project: Attempt to populate the meta section from the
project root, default True.
"""
if path is None:
raise MissingParameter("path")
if data is None:
raise MissingParameter("data")
if not isinstance(data, dict):
raise TypeError("Expecting a dictionary for parameter 'data'")
path = os.path.abspath(str(path))
if os.path.isdir(path):
project_path = path
elif not os.path.isdir(path) and os.path.isdir(os.path.dirname(path)):
project_path = os.path.dirname(path)
pipfile_path = os.path.join(project_path, "Pipfile")
lockfile_path = os.path.join(project_path, "Pipfile.lock")
if meta_from_project:
lockfile = cls.lockfile_from_pipfile(pipfile_path)
lockfile.update(data)
else:
lockfile = plette.lockfiles.Lockfile(data)
projectfile = ProjectFile(
line_ending=DEFAULT_NEWLINES, location=lockfile_path, model=lockfile
)
return cls(
projectfile=projectfile,
lockfile=lockfile,
newlines=projectfile.line_ending,
path=Path(projectfile.location),
)
[docs] @classmethod
def load(cls, path, create=True):
"""Create a new lockfile instance.
:param project_path: Path to project root or lockfile
:type project_path: str or :class:`pathlib.Path`
:param str lockfile_name: Name of the lockfile in the project root directory
:param pipfile_path: Path to the project pipfile
:type pipfile_path: :class:`pathlib.Path`
:returns: A new lockfile representing the supplied project paths
:rtype: :class:`~requirementslib.models.lockfile.Lockfile`
"""
try:
projectfile = cls.load_projectfile(path, create=create)
except JSONDecodeError:
path = os.path.abspath(path)
path = Path(
os.path.join(path, "Pipfile.lock") if os.path.isdir(path) else path
)
formatted_path = path.as_posix()
backup_path = "%s.bak" % formatted_path
LockfileCorruptException.show(formatted_path, backup_path=backup_path)
path.rename(backup_path)
cls.load(formatted_path, create=True)
lockfile_path = Path(projectfile.location)
creation_args = {
"projectfile": projectfile,
"lockfile": projectfile.model,
"newlines": projectfile.line_ending,
"path": lockfile_path,
}
return cls(**creation_args)
[docs] @classmethod
def create(cls, path, create=True):
return cls.load(path, create=create)
@property
def develop(self):
return self._lockfile.develop
@property
def default(self):
return self._lockfile.default
[docs] def get_requirements(self, dev=True, only=False):
"""Produces a generator which generates requirements from the desired section.
:param bool dev: Indicates whether to use dev requirements, defaults to False
:return: Requirements from the relevant the relevant pipfile
:rtype: :class:`~requirementslib.models.requirements.Requirement`
"""
deps = self.get_deps(dev=dev, only=only)
for k, v in deps.items():
yield Requirement.from_pipfile(k, v)
@property
def dev_requirements(self):
if not self._dev_requirements:
self._dev_requirements = list(self.get_requirements(dev=True, only=True))
return self._dev_requirements
@property
def requirements(self):
if not self._requirements:
self._requirements = list(self.get_requirements(dev=False, only=True))
return self._requirements
@property
def dev_requirements_list(self):
return [{name: entry._data} for name, entry in self._lockfile.develop.items()]
@property
def requirements_list(self):
return [{name: entry._data} for name, entry in self._lockfile.default.items()]
[docs] def write(self):
self.projectfile.model = copy.deepcopy(self._lockfile)
self.projectfile.write()
[docs] def as_requirements(self, include_hashes=False, dev=False):
"""Returns a list of requirements in pip-style format"""
lines = []
section = self.dev_requirements if dev else self.requirements
for req in section:
kwargs = {"include_hashes": include_hashes}
if req.editable:
kwargs["include_markers"] = False
r = req.as_line(**kwargs)
lines.append(r.strip())
return lines