# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import attr
import pip_shims.shims
from orderedmultidict import omdict
from six.moves.urllib.parse import quote, unquote_plus, unquote as url_unquote
from urllib3 import util as urllib3_util
from urllib3.util import parse_url as urllib3_parse
from urllib3.util.url import Url
from ..environment import MYPY_RUNNING
from ..utils import is_installable_file
from .utils import extras_to_string, parse_extras
if MYPY_RUNNING:
from typing import Dict, List, Optional, Text, Tuple, TypeVar, Union
from pip_shims.shims import Link
from vistir.compat import Path
_T = TypeVar("_T")
STRING_TYPE = Union[bytes, str, Text]
S = TypeVar("S", bytes, str, Text)
def _get_parsed_url(url):
# type: (S) -> Url
"""This is a stand-in function for `urllib3.util.parse_url`
The orignal function doesn't handle special characters very well, this simply splits
out the authentication section, creates the parsed url, then puts the authentication
section back in, bypassing validation.
:return: The new, parsed URL object
:rtype: :class:`~urllib3.util.url.Url`
"""
try:
parsed = urllib3_parse(url)
except ValueError:
scheme, _, url = url.partition("://")
auth, _, url = url.rpartition("@")
url = "{scheme}://{url}".format(scheme=scheme, url=url)
parsed = urllib3_parse(url)._replace(auth=auth)
if parsed.auth:
return parsed._replace(auth=url_unquote(parsed.auth))
return parsed
[docs]def remove_password_from_url(url):
# type: (S) -> S
"""Given a url, remove the password and insert 4 dashes.
:param url: The url to replace the authentication in
:type url: S
:return: The new URL without authentication
:rtype: S
"""
parsed = _get_parsed_url(url)
if parsed.auth:
auth, _, _ = parsed.auth.partition(":")
return parsed._replace(auth="{auth}:----".format(auth=auth)).url
return parsed.url
[docs]@attr.s(hash=True)
class URI(object):
#: The target hostname, e.g. `amazon.com`
host = attr.ib(type=str)
#: The URI Scheme, e.g. `salesforce`
scheme = attr.ib(default="https", type=str)
#: The numeric port of the url if specified
port = attr.ib(default=None, type=int)
#: The url path, e.g. `/path/to/endpoint`
path = attr.ib(default="", type=str)
#: Query parameters, e.g. `?variable=value...`
query = attr.ib(default="", type=str)
#: URL Fragments, e.g. `#fragment=value`
fragment = attr.ib(default="", type=str)
#: Subdirectory fragment, e.g. `&subdirectory=blah...`
subdirectory = attr.ib(default="", type=str)
#: VCS ref this URI points at, if available
ref = attr.ib(default="", type=str)
#: The username if provided, parsed from `user:password@hostname`
username = attr.ib(default="", type=str)
#: Password parsed from `user:password@hostname`
password = attr.ib(default="", type=str, repr=False)
#: An orderedmultidict representing query fragments
query_dict = attr.ib(factory=omdict, type=omdict)
#: The name of the specified package in case it is a VCS URI with an egg fragment
name = attr.ib(default="", type=str)
#: Any extras requested from the requirement
extras = attr.ib(factory=tuple, type=tuple)
#: Whether the url was parsed as a direct pep508-style URL
is_direct_url = attr.ib(default=False, type=bool)
#: Whether the url was an implicit `git+ssh` url (passed as `git+git@`)
is_implicit_ssh = attr.ib(default=False, type=bool)
_auth = attr.ib(default=None, type=str, repr=False)
_fragment_dict = attr.ib(factory=dict, type=dict)
_username_is_quoted = attr.ib(type=bool, default=False)
_password_is_quoted = attr.ib(type=bool, default=False)
def _parse_query(self):
# type: () -> URI
query = self.query if self.query is not None else ""
query_dict = omdict()
queries = query.split("&")
query_items = []
subdirectory = self.subdirectory if self.subdirectory else None
for q in queries:
key, _, val = q.partition("=")
val = unquote_plus(val)
if key == "subdirectory" and not subdirectory:
subdirectory = val
else:
query_items.append((key, val))
query_dict.load(query_items)
return attr.evolve(
self, query_dict=query_dict, subdirectory=subdirectory, query=query
)
def _parse_fragment(self):
# type: () -> URI
subdirectory = self.subdirectory if self.subdirectory else ""
fragment = self.fragment if self.fragment else ""
if self.fragment is None:
return self
fragments = self.fragment.split("&")
fragment_items = {}
name = self.name if self.name else ""
extras = self.extras
for q in fragments:
key, _, val = q.partition("=")
val = unquote_plus(val)
fragment_items[key] = val
if key == "egg":
from .utils import parse_extras
name, stripped_extras = pip_shims.shims._strip_extras(val)
if stripped_extras:
extras = tuple(parse_extras(stripped_extras))
elif key == "subdirectory":
subdirectory = val
return attr.evolve(
self,
fragment_dict=fragment_items,
subdirectory=subdirectory,
fragment=fragment,
extras=extras,
name=name,
)
def _parse_auth(self):
# type: () -> URI
if self._auth:
username, _, password = self._auth.partition(":")
username_is_quoted, password_is_quoted = False, False
quoted_username, quoted_password = "", ""
if password:
quoted_password = quote(password)
password_is_quoted = quoted_password != password
if username:
quoted_username = quote(username)
username_is_quoted = quoted_username != username
return attr.evolve(
self,
username=quoted_username,
password=quoted_password,
username_is_quoted=username_is_quoted,
password_is_quoted=password_is_quoted,
)
return self
[docs] def get_password(self, unquote=False, include_token=True):
# type: (bool, bool) -> str
password = self.password if self.password else ""
if password and unquote and self._password_is_quoted:
password = url_unquote(password)
return password
[docs] def get_username(self, unquote=False):
# type: (bool) -> str
username = self.username if self.username else ""
if username and unquote and self._username_is_quoted:
username = url_unquote(username)
return username
[docs] @staticmethod
def parse_subdirectory(url_part):
# type: (str) -> Tuple[str, Optional[str]]
subdir = None
if "&subdirectory" in url_part:
url_part, _, subdir = url_part.rpartition("&")
if "#egg=" not in url_part:
subdir = "#{0}".format(subdir.strip())
else:
subdir = "&{0}".format(subdir.strip())
return url_part.strip(), subdir
[docs] @classmethod
def get_parsed_url(cls, url):
# if there is a "#" in the auth section, this could break url parsing
parsed_url = _get_parsed_url(url)
if "@" in url and "#" in url:
scheme = "{0}://".format(parsed_url.scheme)
if parsed_url.scheme == "file":
scheme = "{0}/".format(scheme)
url_without_scheme = url.replace(scheme, "")
maybe_auth, _, maybe_url = url_without_scheme.partition("@")
if "#" in maybe_auth and (not parsed_url.host or "." not in parsed_url.host):
new_parsed_url = _get_parsed_url("{0}{1}".format(scheme, maybe_url))
new_parsed_url = new_parsed_url._replace(auth=maybe_auth)
return new_parsed_url
return parsed_url
[docs] @classmethod
def parse(cls, url):
# type: (S) -> URI
from .utils import DIRECT_URL_RE, split_ref_from_uri
is_direct_url = False
name_with_extras = None
is_implicit_ssh = url.strip().startswith("git+git@")
if is_implicit_ssh:
from ..utils import add_ssh_scheme_to_git_uri
url = add_ssh_scheme_to_git_uri(url)
direct_match = DIRECT_URL_RE.match(url)
if direct_match is not None:
is_direct_url = True
name_with_extras, _, url = url.partition("@")
name_with_extras = name_with_extras.strip()
url, ref = split_ref_from_uri(url.strip())
if "file:/" in url and "file:///" not in url:
url = url.replace("file:/", "file:///")
parsed = cls.get_parsed_url(url)
# if there is a "#" in the auth section, this could break url parsing
if not (parsed.scheme and parsed.host):
# check if this is a file uri
if not (
parsed.scheme
and parsed.path
and (parsed.scheme == "file" or parsed.scheme.endswith("+file"))
):
raise ValueError("Failed parsing URL {0!r} - Not a valid url".format(url))
parsed_dict = dict(parsed._asdict()).copy()
parsed_dict["is_direct_url"] = is_direct_url
parsed_dict["is_implicit_ssh"] = is_implicit_ssh
parsed_dict.update(
**update_url_name_and_fragment(name_with_extras, ref, parsed_dict)
) # type: ignore
return cls(**parsed_dict)._parse_auth()._parse_query()._parse_fragment()
[docs] def to_string(
self,
escape_password=True, # type: bool
unquote=True, # type: bool
direct=None, # type: Optional[bool]
strip_ssh=False, # type: bool
strip_ref=False, # type: bool
strip_name=False, # type: bool
strip_subdir=False, # type: bool
):
# type: (...) -> str
"""Converts the current URI to a string, unquoting or escaping the
password as needed.
:param escape_password: Whether to replace password with ``----``, default True
:param escape_password: bool, optional
:param unquote: Whether to unquote url-escapes in the password, default False
:param unquote: bool, optional
:param bool direct: Whether to format as a direct URL
:param bool strip_ssh: Whether to strip the SSH scheme from the url (git only)
:param bool strip_ref: Whether to drop the VCS ref (if present)
:param bool strip_name: Whether to drop the name and extras (if present)
:param bool strip_subdir: Whether to drop the subdirectory (if present)
:return: The reconstructed string representing the URI
:rtype: str
"""
if direct is None:
direct = self.is_direct_url
if escape_password:
password = "----" if self.password else ""
if password:
username = self.get_username(unquote=unquote)
elif self.username:
username = "----"
else:
username = ""
else:
password = self.get_password(unquote=unquote)
username = self.get_username(unquote=unquote)
auth = ""
if username:
if password:
auth = "{username}:{password}@".format(
password=password, username=username
)
else:
auth = "{username}@".format(username=username)
query = ""
if self.query:
query = "{query}?{self.query}".format(query=query, self=self)
subdir_prefix = "#"
if not direct:
if self.name and not strip_name:
fragment = "#egg={self.name_with_extras}".format(self=self)
subdir_prefix = "&"
elif not strip_name and (
self.extras and self.scheme and self.scheme.startswith("file")
):
from .utils import extras_to_string
fragment = extras_to_string(self.extras)
else:
fragment = ""
query = "{query}{fragment}".format(query=query, fragment=fragment)
if self.subdirectory and not strip_subdir:
query = "{query}{subdir_prefix}subdirectory={self.subdirectory}".format(
query=query, subdir_prefix=subdir_prefix, self=self
)
host_port_path = self.get_host_port_path(strip_ref=strip_ref)
url = "{self.scheme}://{auth}{host_port_path}{query}".format(
self=self, auth=auth, host_port_path=host_port_path, query=query
)
if strip_ssh:
from ..utils import strip_ssh_from_git_uri
url = strip_ssh_from_git_uri(url)
if self.name and direct and not strip_name:
return "{self.name_with_extras}@ {url}".format(self=self, url=url)
return url
[docs] def get_host_port_path(self, strip_ref=False):
# type: (bool) -> str
host = self.host if self.host else ""
if self.port is not None:
host = "{host}:{self.port!s}".format(host=host, self=self)
path = "{self.path}".format(self=self) if self.path else ""
if self.ref and not strip_ref:
path = "{path}@{self.ref}".format(path=path, self=self)
return "{host}{path}".format(host=host, path=path)
@property
def hidden_auth(self):
# type: () -> str
auth = ""
if self.username and self.password:
password = "****"
username = self.get_username(unquote=True)
auth = "{username}:{password}".format(username=username, password=password)
elif self.username and not self.password:
auth = "****"
return auth
@property
def name_with_extras(self):
# type: () -> str
from .utils import extras_to_string
if not self.name:
return ""
extras = extras_to_string(self.extras)
return "{self.name}{extras}".format(self=self, extras=extras)
@property
def as_link(self):
# type: () -> Link
link = pip_shims.shims.Link(
self.to_string(escape_password=False, strip_ssh=False, direct=False)
)
return link
@property
def bare_url(self):
# type: () -> str
return self.to_string(
escape_password=False,
strip_ssh=self.is_implicit_ssh,
direct=False,
strip_name=True,
strip_ref=True,
strip_subdir=True,
)
@property
def url_without_fragment_or_ref(self):
# type: () -> str
return self.to_string(
escape_password=False,
strip_ssh=self.is_implicit_ssh,
direct=False,
strip_name=True,
strip_ref=True,
)
@property
def url_without_fragment(self):
# type: () -> str
return self.to_string(
escape_password=False,
strip_ssh=self.is_implicit_ssh,
direct=False,
strip_name=True,
)
@property
def url_without_ref(self):
# type: () -> str
return self.to_string(
escape_password=False,
strip_ssh=self.is_implicit_ssh,
direct=False,
strip_ref=True,
)
@property
def base_url(self):
# type: () -> str
return self.to_string(
escape_password=False,
strip_ssh=self.is_implicit_ssh,
direct=False,
unquote=False,
)
@property
def full_url(self):
# type: () -> str
return self.to_string(escape_password=False, strip_ssh=False, direct=False)
@property
def secret(self):
# type: () -> str
return self.full_url
@property
def safe_string(self):
# type: () -> str
return self.to_string(escape_password=True, unquote=True)
@property
def unsafe_string(self):
# type: () -> str
return self.to_string(escape_password=False, unquote=True)
@property
def uri_escape(self):
# type: () -> str
return self.to_string(escape_password=False, unquote=False)
@property
def is_installable(self):
# type: () -> bool
return self.is_file_url and is_installable_file(self.bare_url)
@property
def is_vcs(self):
# type: () -> bool
from ..utils import VCS_SCHEMES
return self.scheme in VCS_SCHEMES
@property
def is_file_url(self):
# type: () -> bool
return all([self.scheme, self.scheme == "file"])
def __str__(self):
# type: () -> str
return self.to_string(escape_password=True, unquote=True)
[docs]def update_url_name_and_fragment(name_with_extras, ref, parsed_dict):
# type: (Optional[str], Optional[str], Dict[str, Optional[str]]) -> Dict[str, Optional[str]]
if name_with_extras:
fragment = "" # type: Optional[str]
parsed_extras = ()
name, extras = pip_shims.shims._strip_extras(name_with_extras)
if extras:
parsed_extras = parsed_extras + tuple(parse_extras(extras))
if parsed_dict["fragment"] is not None:
fragment = "{0}".format(parsed_dict["fragment"])
if fragment.startswith("egg="):
_, _, fragment_part = fragment.partition("=")
fragment_name, fragment_extras = pip_shims.shims._strip_extras(
fragment_part
)
name = name if name else fragment_name
if fragment_extras:
parsed_extras = parsed_extras + tuple(parse_extras(fragment_extras))
name_with_extras = "{0}{1}".format(name, extras_to_string(parsed_extras))
elif (
parsed_dict.get("path") is not None and "&subdirectory" in parsed_dict["path"]
):
path, fragment = URI.parse_subdirectory(parsed_dict["path"]) # type: ignore
parsed_dict["path"] = path
elif ref is not None and "&subdirectory" in ref:
ref, fragment = URI.parse_subdirectory(ref)
parsed_dict["name"] = name
parsed_dict["extras"] = parsed_extras
if ref:
parsed_dict["ref"] = ref.strip()
return parsed_dict