Created starter files for the project.

This commit is contained in:
Batuhan Berk Başoğlu 2020-10-02 21:26:03 -04:00
commit 73f0c0db42
1992 changed files with 769897 additions and 0 deletions

View file

@ -0,0 +1,44 @@
"""
This code wraps the vendored appdirs module to so the return values are
compatible for the current pip code base.
The intention is to rewrite current usages gradually, keeping the tests pass,
and eventually drop this after all usages are changed.
"""
from __future__ import absolute_import
import os
from pip._vendor import appdirs as _appdirs
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List
def user_cache_dir(appname):
# type: (str) -> str
return _appdirs.user_cache_dir(appname, appauthor=False)
def user_config_dir(appname, roaming=True):
# type: (str, bool) -> str
path = _appdirs.user_config_dir(appname, appauthor=False, roaming=roaming)
if _appdirs.system == "darwin" and not os.path.isdir(path):
path = os.path.expanduser('~/.config/')
if appname:
path = os.path.join(path, appname)
return path
# for the discussion regarding site_config_dir locations
# see <https://github.com/pypa/pip/issues/1733>
def site_config_dirs(appname):
# type: (str) -> List[str]
dirval = _appdirs.site_config_dir(appname, appauthor=False, multipath=True)
if _appdirs.system not in ["win32", "darwin"]:
# always look in /etc directly as well
return dirval.split(os.pathsep) + ['/etc']
return [dirval]

View file

@ -0,0 +1,271 @@
"""Stuff that differs in different Python versions and platform
distributions."""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
from __future__ import absolute_import, division
import codecs
import locale
import logging
import os
import shutil
import sys
from pip._vendor.six import PY2, text_type
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Text, Tuple, Union
try:
import ipaddress
except ImportError:
try:
from pip._vendor import ipaddress # type: ignore
except ImportError:
import ipaddr as ipaddress # type: ignore
ipaddress.ip_address = ipaddress.IPAddress # type: ignore
ipaddress.ip_network = ipaddress.IPNetwork # type: ignore
__all__ = [
"ipaddress", "uses_pycache", "console_to_str",
"get_path_uid", "stdlib_pkgs", "WINDOWS", "samefile", "get_terminal_size",
]
logger = logging.getLogger(__name__)
if PY2:
import imp
try:
cache_from_source = imp.cache_from_source # type: ignore
except AttributeError:
# does not use __pycache__
cache_from_source = None
uses_pycache = cache_from_source is not None
else:
uses_pycache = True
from importlib.util import cache_from_source
if PY2:
# In Python 2.7, backslashreplace exists
# but does not support use for decoding.
# We implement our own replace handler for this
# situation, so that we can consistently use
# backslash replacement for all versions.
def backslashreplace_decode_fn(err):
raw_bytes = (err.object[i] for i in range(err.start, err.end))
# Python 2 gave us characters - convert to numeric bytes
raw_bytes = (ord(b) for b in raw_bytes)
return u"".join(map(u"\\x{:x}".format, raw_bytes)), err.end
codecs.register_error(
"backslashreplace_decode",
backslashreplace_decode_fn,
)
backslashreplace_decode = "backslashreplace_decode"
else:
backslashreplace_decode = "backslashreplace"
def has_tls():
# type: () -> bool
try:
import _ssl # noqa: F401 # ignore unused
return True
except ImportError:
pass
from pip._vendor.urllib3.util import IS_PYOPENSSL
return IS_PYOPENSSL
def str_to_display(data, desc=None):
# type: (Union[bytes, Text], Optional[str]) -> Text
"""
For display or logging purposes, convert a bytes object (or text) to
text (e.g. unicode in Python 2) safe for output.
:param desc: An optional phrase describing the input data, for use in
the log message if a warning is logged. Defaults to "Bytes object".
This function should never error out and so can take a best effort
approach. It is okay to be lossy if needed since the return value is
just for display.
We assume the data is in the locale preferred encoding. If it won't
decode properly, we warn the user but decode as best we can.
We also ensure that the output can be safely written to standard output
without encoding errors.
"""
if isinstance(data, text_type):
return data
# Otherwise, data is a bytes object (str in Python 2).
# First, get the encoding we assume. This is the preferred
# encoding for the locale, unless that is not found, or
# it is ASCII, in which case assume UTF-8
encoding = locale.getpreferredencoding()
if (not encoding) or codecs.lookup(encoding).name == "ascii":
encoding = "utf-8"
# Now try to decode the data - if we fail, warn the user and
# decode with replacement.
try:
decoded_data = data.decode(encoding)
except UnicodeDecodeError:
logger.warning(
'%s does not appear to be encoded as %s',
desc or 'Bytes object',
encoding,
)
decoded_data = data.decode(encoding, errors=backslashreplace_decode)
# Make sure we can print the output, by encoding it to the output
# encoding with replacement of unencodable characters, and then
# decoding again.
# We use stderr's encoding because it's less likely to be
# redirected and if we don't find an encoding we skip this
# step (on the assumption that output is wrapped by something
# that won't fail).
# The double getattr is to deal with the possibility that we're
# being called in a situation where sys.__stderr__ doesn't exist,
# or doesn't have an encoding attribute. Neither of these cases
# should occur in normal pip use, but there's no harm in checking
# in case people use pip in (unsupported) unusual situations.
output_encoding = getattr(getattr(sys, "__stderr__", None),
"encoding", None)
if output_encoding:
output_encoded = decoded_data.encode(
output_encoding,
errors="backslashreplace"
)
decoded_data = output_encoded.decode(output_encoding)
return decoded_data
def console_to_str(data):
# type: (bytes) -> Text
"""Return a string, safe for output, of subprocess output.
"""
return str_to_display(data, desc='Subprocess output')
def get_path_uid(path):
# type: (str) -> int
"""
Return path's uid.
Does not follow symlinks:
https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in compat due to differences on AIX and
Jython, that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, 'O_NOFOLLOW'):
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: # AIX and Jython
# WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError(
"{} is a symlink; Will not return uid for symlinks".format(
path)
)
return file_uid
def expanduser(path):
# type: (str) -> str
"""
Expand ~ and ~user constructions.
Includes a workaround for https://bugs.python.org/issue14768
"""
expanded = os.path.expanduser(path)
if path.startswith('~/') and expanded.startswith('//'):
expanded = expanded[1:]
return expanded
# packages in the stdlib that may have installation metadata, but should not be
# considered 'installed'. this theoretically could be determined based on
# dist.location (py27:`sysconfig.get_paths()['stdlib']`,
# py26:sysconfig.get_config_vars('LIBDEST')), but fear platform variation may
# make this ineffective, so hard-coding
stdlib_pkgs = {"python", "wsgiref", "argparse"}
# windows detection, covers cpython and ironpython
WINDOWS = (sys.platform.startswith("win") or
(sys.platform == 'cli' and os.name == 'nt'))
def samefile(file1, file2):
# type: (str, str) -> bool
"""Provide an alternative for os.path.samefile on Windows/Python2"""
if hasattr(os.path, 'samefile'):
return os.path.samefile(file1, file2)
else:
path1 = os.path.normcase(os.path.abspath(file1))
path2 = os.path.normcase(os.path.abspath(file2))
return path1 == path2
if hasattr(shutil, 'get_terminal_size'):
def get_terminal_size():
# type: () -> Tuple[int, int]
"""
Returns a tuple (x, y) representing the width(x) and the height(y)
in characters of the terminal window.
"""
return tuple(shutil.get_terminal_size()) # type: ignore
else:
def get_terminal_size():
# type: () -> Tuple[int, int]
"""
Returns a tuple (x, y) representing the width(x) and the height(y)
in characters of the terminal window.
"""
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack_from(
'hh',
fcntl.ioctl(fd, termios.TIOCGWINSZ, '12345678')
)
except Exception:
return None
if cr == (0, 0):
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
if sys.platform != "win32":
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except Exception:
pass
if not cr:
cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
return int(cr[1]), int(cr[0])

View file

@ -0,0 +1,166 @@
"""Generate and work with PEP 425 Compatibility Tags.
"""
from __future__ import absolute_import
import re
from pip._vendor.packaging.tags import (
Tag,
compatible_tags,
cpython_tags,
generic_tags,
interpreter_name,
interpreter_version,
mac_platforms,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List, Optional, Tuple
from pip._vendor.packaging.tags import PythonVersion
_osx_arch_pat = re.compile(r'(.+)_(\d+)_(\d+)_(.+)')
def version_info_to_nodot(version_info):
# type: (Tuple[int, ...]) -> str
# Only use up to the first two numbers.
return ''.join(map(str, version_info[:2]))
def _mac_platforms(arch):
# type: (str) -> List[str]
match = _osx_arch_pat.match(arch)
if match:
name, major, minor, actual_arch = match.groups()
mac_version = (int(major), int(minor))
arches = [
# Since we have always only checked that the platform starts
# with "macosx", for backwards-compatibility we extract the
# actual prefix provided by the user in case they provided
# something like "macosxcustom_". It may be good to remove
# this as undocumented or deprecate it in the future.
'{}_{}'.format(name, arch[len('macosx_'):])
for arch in mac_platforms(mac_version, actual_arch)
]
else:
# arch pattern didn't match (?!)
arches = [arch]
return arches
def _custom_manylinux_platforms(arch):
# type: (str) -> List[str]
arches = [arch]
arch_prefix, arch_sep, arch_suffix = arch.partition('_')
if arch_prefix == 'manylinux2014':
# manylinux1/manylinux2010 wheels run on most manylinux2014 systems
# with the exception of wheels depending on ncurses. PEP 599 states
# manylinux1/manylinux2010 wheels should be considered
# manylinux2014 wheels:
# https://www.python.org/dev/peps/pep-0599/#backwards-compatibility-with-manylinux2010-wheels
if arch_suffix in {'i686', 'x86_64'}:
arches.append('manylinux2010' + arch_sep + arch_suffix)
arches.append('manylinux1' + arch_sep + arch_suffix)
elif arch_prefix == 'manylinux2010':
# manylinux1 wheels run on most manylinux2010 systems with the
# exception of wheels depending on ncurses. PEP 571 states
# manylinux1 wheels should be considered manylinux2010 wheels:
# https://www.python.org/dev/peps/pep-0571/#backwards-compatibility-with-manylinux1-wheels
arches.append('manylinux1' + arch_sep + arch_suffix)
return arches
def _get_custom_platforms(arch):
# type: (str) -> List[str]
arch_prefix, arch_sep, arch_suffix = arch.partition('_')
if arch.startswith('macosx'):
arches = _mac_platforms(arch)
elif arch_prefix in ['manylinux2014', 'manylinux2010']:
arches = _custom_manylinux_platforms(arch)
else:
arches = [arch]
return arches
def _get_python_version(version):
# type: (str) -> PythonVersion
if len(version) > 1:
return int(version[0]), int(version[1:])
else:
return (int(version[0]),)
def _get_custom_interpreter(implementation=None, version=None):
# type: (Optional[str], Optional[str]) -> str
if implementation is None:
implementation = interpreter_name()
if version is None:
version = interpreter_version()
return "{}{}".format(implementation, version)
def get_supported(
version=None, # type: Optional[str]
platform=None, # type: Optional[str]
impl=None, # type: Optional[str]
abi=None # type: Optional[str]
):
# type: (...) -> List[Tag]
"""Return a list of supported tags for each version specified in
`versions`.
:param version: a string version, of the form "33" or "32",
or None. The version will be assumed to support our ABI.
:param platform: specify the exact platform you want valid
tags for, or None. If None, use the local system platform.
:param impl: specify the exact implementation you want valid
tags for, or None. If None, use the local interpreter impl.
:param abi: specify the exact abi you want valid
tags for, or None. If None, use the local interpreter abi.
"""
supported = [] # type: List[Tag]
python_version = None # type: Optional[PythonVersion]
if version is not None:
python_version = _get_python_version(version)
interpreter = _get_custom_interpreter(impl, version)
abis = None # type: Optional[List[str]]
if abi is not None:
abis = [abi]
platforms = None # type: Optional[List[str]]
if platform is not None:
platforms = _get_custom_platforms(platform)
is_cpython = (impl or interpreter_name()) == "cp"
if is_cpython:
supported.extend(
cpython_tags(
python_version=python_version,
abis=abis,
platforms=platforms,
)
)
else:
supported.extend(
generic_tags(
interpreter=interpreter,
abis=abis,
platforms=platforms,
)
)
supported.extend(
compatible_tags(
python_version=python_version,
interpreter=interpreter,
platforms=platforms,
)
)
return supported

View file

@ -0,0 +1,14 @@
"""For when pip wants to check the date or time.
"""
from __future__ import absolute_import
import datetime
def today_is_later_than(year, month, day):
# type: (int, int, int) -> bool
today = datetime.date.today()
given = datetime.date(year, month, day)
return today > given

View file

@ -0,0 +1,104 @@
"""
A module that implements tooling to enable easy warnings about deprecations.
"""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
from __future__ import absolute_import
import logging
import warnings
from pip._vendor.packaging.version import parse
from pip import __version__ as current_version
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Any, Optional
DEPRECATION_MSG_PREFIX = "DEPRECATION: "
class PipDeprecationWarning(Warning):
pass
_original_showwarning = None # type: Any
# Warnings <-> Logging Integration
def _showwarning(message, category, filename, lineno, file=None, line=None):
if file is not None:
if _original_showwarning is not None:
_original_showwarning(
message, category, filename, lineno, file, line,
)
elif issubclass(category, PipDeprecationWarning):
# We use a specially named logger which will handle all of the
# deprecation messages for pip.
logger = logging.getLogger("pip._internal.deprecations")
logger.warning(message)
else:
_original_showwarning(
message, category, filename, lineno, file, line,
)
def install_warning_logger():
# type: () -> None
# Enable our Deprecation Warnings
warnings.simplefilter("default", PipDeprecationWarning, append=True)
global _original_showwarning
if _original_showwarning is None:
_original_showwarning = warnings.showwarning
warnings.showwarning = _showwarning
def deprecated(reason, replacement, gone_in, issue=None):
# type: (str, Optional[str], Optional[str], Optional[int]) -> None
"""Helper to deprecate existing functionality.
reason:
Textual reason shown to the user about why this functionality has
been deprecated.
replacement:
Textual suggestion shown to the user about what alternative
functionality they can use.
gone_in:
The version of pip does this functionality should get removed in.
Raises errors if pip's current version is greater than or equal to
this.
issue:
Issue number on the tracker that would serve as a useful place for
users to find related discussion and provide feedback.
Always pass replacement, gone_in and issue as keyword arguments for clarity
at the call site.
"""
# Construct a nice message.
# This is eagerly formatted as we want it to get logged as if someone
# typed this entire message out.
sentences = [
(reason, DEPRECATION_MSG_PREFIX + "{}"),
(gone_in, "pip {} will remove support for this functionality."),
(replacement, "A possible replacement is {}."),
(issue, (
"You can find discussion regarding this at "
"https://github.com/pypa/pip/issues/{}."
)),
]
message = " ".join(
template.format(val) for val, template in sentences if val is not None
)
# Raise as an error if it has to be removed.
if gone_in is not None and parse(current_version) >= parse(gone_in):
raise PipDeprecationWarning(message)
warnings.warn(message, category=PipDeprecationWarning, stacklevel=2)

View file

@ -0,0 +1,130 @@
import logging
from pip._internal.models.direct_url import (
DIRECT_URL_METADATA_NAME,
ArchiveInfo,
DirectUrl,
DirectUrlValidationError,
DirInfo,
VcsInfo,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs import vcs
try:
from json import JSONDecodeError
except ImportError:
# PY2
JSONDecodeError = ValueError # type: ignore
if MYPY_CHECK_RUNNING:
from typing import Optional
from pip._internal.models.link import Link
from pip._vendor.pkg_resources import Distribution
logger = logging.getLogger(__name__)
def direct_url_as_pep440_direct_reference(direct_url, name):
# type: (DirectUrl, str) -> str
"""Convert a DirectUrl to a pip requirement string."""
direct_url.validate() # if invalid, this is a pip bug
requirement = name + " @ "
fragments = []
if isinstance(direct_url.info, VcsInfo):
requirement += "{}+{}@{}".format(
direct_url.info.vcs, direct_url.url, direct_url.info.commit_id
)
elif isinstance(direct_url.info, ArchiveInfo):
requirement += direct_url.url
if direct_url.info.hash:
fragments.append(direct_url.info.hash)
else:
assert isinstance(direct_url.info, DirInfo)
# pip should never reach this point for editables, since
# pip freeze inspects the editable project location to produce
# the requirement string
assert not direct_url.info.editable
requirement += direct_url.url
if direct_url.subdirectory:
fragments.append("subdirectory=" + direct_url.subdirectory)
if fragments:
requirement += "#" + "&".join(fragments)
return requirement
def direct_url_from_link(link, source_dir=None, link_is_in_wheel_cache=False):
# type: (Link, Optional[str], bool) -> DirectUrl
if link.is_vcs:
vcs_backend = vcs.get_backend_for_scheme(link.scheme)
assert vcs_backend
url, requested_revision, _ = (
vcs_backend.get_url_rev_and_auth(link.url_without_fragment)
)
# For VCS links, we need to find out and add commit_id.
if link_is_in_wheel_cache:
# If the requested VCS link corresponds to a cached
# wheel, it means the requested revision was an
# immutable commit hash, otherwise it would not have
# been cached. In that case we don't have a source_dir
# with the VCS checkout.
assert requested_revision
commit_id = requested_revision
else:
# If the wheel was not in cache, it means we have
# had to checkout from VCS to build and we have a source_dir
# which we can inspect to find out the commit id.
assert source_dir
commit_id = vcs_backend.get_revision(source_dir)
return DirectUrl(
url=url,
info=VcsInfo(
vcs=vcs_backend.name,
commit_id=commit_id,
requested_revision=requested_revision,
),
subdirectory=link.subdirectory_fragment,
)
elif link.is_existing_dir():
return DirectUrl(
url=link.url_without_fragment,
info=DirInfo(),
subdirectory=link.subdirectory_fragment,
)
else:
hash = None
hash_name = link.hash_name
if hash_name:
hash = "{}={}".format(hash_name, link.hash)
return DirectUrl(
url=link.url_without_fragment,
info=ArchiveInfo(hash=hash),
subdirectory=link.subdirectory_fragment,
)
def dist_get_direct_url(dist):
# type: (Distribution) -> Optional[DirectUrl]
"""Obtain a DirectUrl from a pkg_resource.Distribution.
Returns None if the distribution has no `direct_url.json` metadata,
or if `direct_url.json` is invalid.
"""
if not dist.has_metadata(DIRECT_URL_METADATA_NAME):
return None
try:
return DirectUrl.from_json(dist.get_metadata(DIRECT_URL_METADATA_NAME))
except (
DirectUrlValidationError,
JSONDecodeError,
UnicodeDecodeError
) as e:
logger.warning(
"Error parsing %s for %s: %s",
DIRECT_URL_METADATA_NAME,
dist.project_name,
e,
)
return None

View file

@ -0,0 +1,48 @@
from distutils.errors import DistutilsArgError
from distutils.fancy_getopt import FancyGetopt
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Dict, List
_options = [
("exec-prefix=", None, ""),
("home=", None, ""),
("install-base=", None, ""),
("install-data=", None, ""),
("install-headers=", None, ""),
("install-lib=", None, ""),
("install-platlib=", None, ""),
("install-purelib=", None, ""),
("install-scripts=", None, ""),
("prefix=", None, ""),
("root=", None, ""),
("user", None, ""),
]
# typeshed doesn't permit Tuple[str, None, str], see python/typeshed#3469.
_distutils_getopt = FancyGetopt(_options) # type: ignore
def parse_distutils_args(args):
# type: (List[str]) -> Dict[str, str]
"""Parse provided arguments, returning an object that has the
matched arguments.
Any unknown arguments are ignored.
"""
result = {}
for arg in args:
try:
_, match = _distutils_getopt.getopt(args=[arg])
except DistutilsArgError:
# We don't care about any other options, which here may be
# considered unrecognized since our option list is not
# exhaustive.
pass
else:
result.update(match.__dict__)
return result

View file

@ -0,0 +1,41 @@
import codecs
import locale
import re
import sys
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List, Tuple, Text
BOMS = [
(codecs.BOM_UTF8, 'utf-8'),
(codecs.BOM_UTF16, 'utf-16'),
(codecs.BOM_UTF16_BE, 'utf-16-be'),
(codecs.BOM_UTF16_LE, 'utf-16-le'),
(codecs.BOM_UTF32, 'utf-32'),
(codecs.BOM_UTF32_BE, 'utf-32-be'),
(codecs.BOM_UTF32_LE, 'utf-32-le'),
] # type: List[Tuple[bytes, Text]]
ENCODING_RE = re.compile(br'coding[:=]\s*([-\w.]+)')
def auto_decode(data):
# type: (bytes) -> Text
"""Check a bytes string for a BOM to correctly detect the encoding
Fallback to locale.getpreferredencoding(False) like open() on Python3"""
for bom, encoding in BOMS:
if data.startswith(bom):
return data[len(bom):].decode(encoding)
# Lets check the first two lines as in PEP263
for line in data.split(b'\n')[:2]:
if line[0:1] == b'#' and ENCODING_RE.search(line):
result = ENCODING_RE.search(line)
assert result is not None
encoding = result.groups()[0].decode('ascii')
return data.decode(encoding)
return data.decode(
locale.getpreferredencoding(False) or sys.getdefaultencoding(),
)

View file

@ -0,0 +1,31 @@
import sys
from pip._internal.cli.main import main
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, List
def _wrapper(args=None):
# type: (Optional[List[str]]) -> int
"""Central wrapper for all old entrypoints.
Historically pip has had several entrypoints defined. Because of issues
arising from PATH, sys.path, multiple Pythons, their interactions, and most
of them having a pip installed, users suffer every time an entrypoint gets
moved.
To alleviate this pain, and provide a mechanism for warning users and
directing them to an appropriate place for help, we now define all of
our old entrypoints as wrappers for the current one.
"""
sys.stderr.write(
"WARNING: pip is being invoked by an old script wrapper. This will "
"fail in a future version of pip.\n"
"Please see https://github.com/pypa/pip/issues/5599 for advice on "
"fixing the underlying issue.\n"
"To avoid this problem you can invoke Python with '-m pip' instead of "
"running pip directly.\n"
)
return main(args)

View file

@ -0,0 +1,224 @@
import errno
import fnmatch
import os
import os.path
import random
import shutil
import stat
import sys
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import.
from pip._vendor.retrying import retry # type: ignore
from pip._vendor.six import PY2
from pip._internal.utils.compat import get_path_uid
from pip._internal.utils.misc import format_size
from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
if MYPY_CHECK_RUNNING:
from typing import Any, BinaryIO, Iterator, List, Union
class NamedTemporaryFileResult(BinaryIO):
@property
def file(self):
# type: () -> BinaryIO
pass
def check_path_owner(path):
# type: (str) -> bool
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if sys.platform == "win32" or not hasattr(os, "geteuid"):
return True
assert os.path.isabs(path)
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
return False # assume we don't own the path
def copy2_fixed(src, dest):
# type: (str, str) -> None
"""Wrap shutil.copy2() but map errors copying socket files to
SpecialFileError as expected.
See also https://bugs.python.org/issue37700.
"""
try:
shutil.copy2(src, dest)
except (OSError, IOError):
for f in [src, dest]:
try:
is_socket_file = is_socket(f)
except OSError:
# An error has already occurred. Another error here is not
# a problem and we can ignore it.
pass
else:
if is_socket_file:
raise shutil.SpecialFileError(
"`{f}` is a socket".format(**locals()))
raise
def is_socket(path):
# type: (str) -> bool
return stat.S_ISSOCK(os.lstat(path).st_mode)
@contextmanager
def adjacent_tmp_file(path, **kwargs):
# type: (str, **Any) -> Iterator[NamedTemporaryFileResult]
"""Return a file-like object pointing to a tmp file next to path.
The file is created securely and is ensured to be written to disk
after the context reaches its end.
kwargs will be passed to tempfile.NamedTemporaryFile to control
the way the temporary file will be opened.
"""
with NamedTemporaryFile(
delete=False,
dir=os.path.dirname(path),
prefix=os.path.basename(path),
suffix='.tmp',
**kwargs
) as f:
result = cast('NamedTemporaryFileResult', f)
try:
yield result
finally:
result.file.flush()
os.fsync(result.file.fileno())
_replace_retry = retry(stop_max_delay=1000, wait_fixed=250)
if PY2:
@_replace_retry
def replace(src, dest):
# type: (str, str) -> None
try:
os.rename(src, dest)
except OSError:
os.remove(dest)
os.rename(src, dest)
else:
replace = _replace_retry(os.replace)
# test_writable_dir and _test_writable_dir_win are copied from Flit,
# with the author's agreement to also place them under pip's license.
def test_writable_dir(path):
# type: (str) -> bool
"""Check if a directory is writable.
Uses os.access() on POSIX, tries creating files on Windows.
"""
# If the directory doesn't exist, find the closest parent that does.
while not os.path.isdir(path):
parent = os.path.dirname(path)
if parent == path:
break # Should never get here, but infinite loops are bad
path = parent
if os.name == 'posix':
return os.access(path, os.W_OK)
return _test_writable_dir_win(path)
def _test_writable_dir_win(path):
# type: (str) -> bool
# os.access doesn't work on Windows: http://bugs.python.org/issue2528
# and we can't use tempfile: http://bugs.python.org/issue22107
basename = 'accesstest_deleteme_fishfingers_custard_'
alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'
for _ in range(10):
name = basename + ''.join(random.choice(alphabet) for _ in range(6))
file = os.path.join(path, name)
try:
fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
# Python 2 doesn't support FileExistsError and PermissionError.
except OSError as e:
# exception FileExistsError
if e.errno == errno.EEXIST:
continue
# exception PermissionError
if e.errno == errno.EPERM or e.errno == errno.EACCES:
# This could be because there's a directory with the same name.
# But it's highly unlikely there's a directory called that,
# so we'll assume it's because the parent dir is not writable.
# This could as well be because the parent dir is not readable,
# due to non-privileged user access.
return False
raise
else:
os.close(fd)
os.unlink(file)
return True
# This should never be reached
raise EnvironmentError(
'Unexpected condition testing for writable directory'
)
def find_files(path, pattern):
# type: (str, str) -> List[str]
"""Returns a list of absolute paths of files beneath path, recursively,
with filenames which match the UNIX-style shell glob pattern."""
result = [] # type: List[str]
for root, _, files in os.walk(path):
matches = fnmatch.filter(files, pattern)
result.extend(os.path.join(root, f) for f in matches)
return result
def file_size(path):
# type: (str) -> Union[int, float]
# If it's a symlink, return 0.
if os.path.islink(path):
return 0
return os.path.getsize(path)
def format_file_size(path):
# type: (str) -> str
return format_size(file_size(path))
def directory_size(path):
# type: (str) -> Union[int, float]
size = 0.0
for root, _dirs, files in os.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
size += file_size(file_path)
return size
def format_directory_size(path):
# type: (str) -> str
return format_size(directory_size(path))

View file

@ -0,0 +1,16 @@
"""Filetype information.
"""
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Tuple
WHEEL_EXTENSION = '.whl'
BZ2_EXTENSIONS = ('.tar.bz2', '.tbz') # type: Tuple[str, ...]
XZ_EXTENSIONS = ('.tar.xz', '.txz', '.tlz',
'.tar.lz', '.tar.lzma') # type: Tuple[str, ...]
ZIP_EXTENSIONS = ('.zip', WHEEL_EXTENSION) # type: Tuple[str, ...]
TAR_EXTENSIONS = ('.tar.gz', '.tgz', '.tar') # type: Tuple[str, ...]
ARCHIVE_EXTENSIONS = (
ZIP_EXTENSIONS + BZ2_EXTENSIONS + TAR_EXTENSIONS + XZ_EXTENSIONS
)

View file

@ -0,0 +1,98 @@
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
from __future__ import absolute_import
import os
import sys
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
def glibc_version_string():
# type: () -> Optional[str]
"Returns glibc version string, or None if not using glibc."
return glibc_version_string_confstr() or glibc_version_string_ctypes()
def glibc_version_string_confstr():
# type: () -> Optional[str]
"Primary implementation of glibc_version_string using os.confstr."
# os.confstr is quite a bit faster than ctypes.DLL. It's also less likely
# to be broken or missing. This strategy is used in the standard library
# platform module:
# https://github.com/python/cpython/blob/fcf1d003bf4f0100c9d0921ff3d70e1127ca1b71/Lib/platform.py#L175-L183
if sys.platform == "win32":
return None
try:
# os.confstr("CS_GNU_LIBC_VERSION") returns a string like "glibc 2.17":
_, version = os.confstr("CS_GNU_LIBC_VERSION").split()
except (AttributeError, OSError, ValueError):
# os.confstr() or CS_GNU_LIBC_VERSION not available (or a bad value)...
return None
return version
def glibc_version_string_ctypes():
# type: () -> Optional[str]
"Fallback implementation of glibc_version_string using ctypes."
try:
import ctypes
except ImportError:
return None
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return None
# Call gnu_get_libc_version, which returns a string like "2.5"
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return version_str
# platform.libc_ver regularly returns completely nonsensical glibc
# versions. E.g. on my computer, platform says:
#
# ~$ python2.7 -c 'import platform; print(platform.libc_ver())'
# ('glibc', '2.7')
# ~$ python3.5 -c 'import platform; print(platform.libc_ver())'
# ('glibc', '2.9')
#
# But the truth is:
#
# ~$ ldd --version
# ldd (Debian GLIBC 2.22-11) 2.22
#
# This is unfortunate, because it means that the linehaul data on libc
# versions that was generated by pip 8.1.2 and earlier is useless and
# misleading. Solution: instead of using platform, use our code that actually
# works.
def libc_ver():
# type: () -> Tuple[str, str]
"""Try to determine the glibc version
Returns a tuple of strings (lib, version) which default to empty strings
in case the lookup fails.
"""
glibc_version = glibc_version_string()
if glibc_version is None:
return ("", "")
else:
return ("glibc", glibc_version)

View file

@ -0,0 +1,145 @@
from __future__ import absolute_import
import hashlib
from pip._vendor.six import iteritems, iterkeys, itervalues
from pip._internal.exceptions import (
HashMismatch,
HashMissing,
InstallationError,
)
from pip._internal.utils.misc import read_chunks
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import (
Dict, List, BinaryIO, NoReturn, Iterator
)
from pip._vendor.six import PY3
if PY3:
from hashlib import _Hash
else:
from hashlib import _hash as _Hash
# The recommended hash algo of the moment. Change this whenever the state of
# the art changes; it won't hurt backward compatibility.
FAVORITE_HASH = 'sha256'
# Names of hashlib algorithms allowed by the --hash option and ``pip hash``
# Currently, those are the ones at least as collision-resistant as sha256.
STRONG_HASHES = ['sha256', 'sha384', 'sha512']
class Hashes(object):
"""A wrapper that builds multiple hashes at once and checks them against
known-good values
"""
def __init__(self, hashes=None):
# type: (Dict[str, List[str]]) -> None
"""
:param hashes: A dict of algorithm names pointing to lists of allowed
hex digests
"""
self._allowed = {} if hashes is None else hashes
def __or__(self, other):
# type: (Hashes) -> Hashes
if not isinstance(other, Hashes):
return NotImplemented
new = self._allowed.copy()
for alg, values in iteritems(other._allowed):
try:
new[alg] += values
except KeyError:
new[alg] = values
return Hashes(new)
@property
def digest_count(self):
# type: () -> int
return sum(len(digests) for digests in self._allowed.values())
def is_hash_allowed(
self,
hash_name, # type: str
hex_digest, # type: str
):
# type: (...) -> bool
"""Return whether the given hex digest is allowed."""
return hex_digest in self._allowed.get(hash_name, [])
def check_against_chunks(self, chunks):
# type: (Iterator[bytes]) -> None
"""Check good hashes against ones built from iterable of chunks of
data.
Raise HashMismatch if none match.
"""
gots = {}
for hash_name in iterkeys(self._allowed):
try:
gots[hash_name] = hashlib.new(hash_name)
except (ValueError, TypeError):
raise InstallationError(
'Unknown hash name: {}'.format(hash_name)
)
for chunk in chunks:
for hash in itervalues(gots):
hash.update(chunk)
for hash_name, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def _raise(self, gots):
# type: (Dict[str, _Hash]) -> NoReturn
raise HashMismatch(self._allowed, gots)
def check_against_file(self, file):
# type: (BinaryIO) -> None
"""Check good hashes against a file-like object
Raise HashMismatch if none match.
"""
return self.check_against_chunks(read_chunks(file))
def check_against_path(self, path):
# type: (str) -> None
with open(path, 'rb') as file:
return self.check_against_file(file)
def __nonzero__(self):
# type: () -> bool
"""Return whether I know any known-good hashes."""
return bool(self._allowed)
def __bool__(self):
# type: () -> bool
return self.__nonzero__()
class MissingHashes(Hashes):
"""A workalike for Hashes used when we're missing a hash for a requirement
It computes the actual hash of the requirement and raises a HashMissing
exception showing it to the user.
"""
def __init__(self):
# type: () -> None
"""Don't offer the ``hashes`` kwarg."""
# Pass our favorite hash in to generate a "gotten hash". With the
# empty list, it will never match, so an error will always raise.
super(MissingHashes, self).__init__(hashes={FAVORITE_HASH: []})
def _raise(self, gots):
# type: (Dict[str, _Hash]) -> NoReturn
raise HashMissing(gots[FAVORITE_HASH].hexdigest())

View file

@ -0,0 +1,36 @@
"""A helper module that injects SecureTransport, on import.
The import should be done as early as possible, to ensure all requests and
sessions (or whatever) are created after injecting SecureTransport.
Note that we only do the injection on macOS, when the linked OpenSSL is too
old to handle TLSv1.2.
"""
import sys
def inject_securetransport():
# type: () -> None
# Only relevant on macOS
if sys.platform != "darwin":
return
try:
import ssl
except ImportError:
return
# Checks for OpenSSL 1.0.1
if ssl.OPENSSL_VERSION_NUMBER >= 0x1000100f:
return
try:
from pip._vendor.urllib3.contrib import securetransport
except (ImportError, OSError):
return
securetransport.inject_into_urllib3()
inject_securetransport()

View file

@ -0,0 +1,399 @@
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
from __future__ import absolute_import
import contextlib
import errno
import logging
import logging.handlers
import os
import sys
from logging import Filter, getLogger
from pip._vendor.six import PY2
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
from pip._internal.utils.misc import ensure_dir
try:
import threading
except ImportError:
import dummy_threading as threading # type: ignore
try:
# Use "import as" and set colorama in the else clause to avoid mypy
# errors and get the following correct revealed type for colorama:
# `Union[_importlib_modulespec.ModuleType, None]`
# Otherwise, we get an error like the following in the except block:
# > Incompatible types in assignment (expression has type "None",
# variable has type Module)
# TODO: eliminate the need to use "import as" once mypy addresses some
# of its issues with conditional imports. Here is an umbrella issue:
# https://github.com/python/mypy/issues/1297
from pip._vendor import colorama as _colorama
# Lots of different errors can come from this, including SystemError and
# ImportError.
except Exception:
colorama = None
else:
# Import Fore explicitly rather than accessing below as colorama.Fore
# to avoid the following error running mypy:
# > Module has no attribute "Fore"
# TODO: eliminate the need to import Fore once mypy addresses some of its
# issues with conditional imports. This particular case could be an
# instance of the following issue (but also see the umbrella issue above):
# https://github.com/python/mypy/issues/3500
from pip._vendor.colorama import Fore
colorama = _colorama
_log_state = threading.local()
subprocess_logger = getLogger('pip.subprocessor')
class BrokenStdoutLoggingError(Exception):
"""
Raised if BrokenPipeError occurs for the stdout stream while logging.
"""
pass
# BrokenPipeError does not exist in Python 2 and, in addition, manifests
# differently in Windows and non-Windows.
if WINDOWS:
# In Windows, a broken pipe can show up as EINVAL rather than EPIPE:
# https://bugs.python.org/issue19612
# https://bugs.python.org/issue30418
if PY2:
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows Python 3 below."""
return (exc_class is IOError and
exc.errno in (errno.EINVAL, errno.EPIPE))
else:
# In Windows, a broken pipe IOError became OSError in Python 3.
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows Python 3 below."""
return ((exc_class is BrokenPipeError) or # noqa: F821
(exc_class is OSError and
exc.errno in (errno.EINVAL, errno.EPIPE)))
elif PY2:
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows Python 3 below."""
return (exc_class is IOError and exc.errno == errno.EPIPE)
else:
# Then we are in the non-Windows Python 3 case.
def _is_broken_pipe_error(exc_class, exc):
"""
Return whether an exception is a broken pipe error.
Args:
exc_class: an exception class.
exc: an exception instance.
"""
return (exc_class is BrokenPipeError) # noqa: F821
@contextlib.contextmanager
def indent_log(num=2):
"""
A context manager which will cause the log output to be indented for any
log messages emitted inside it.
"""
# For thread-safety
_log_state.indentation = get_indentation()
_log_state.indentation += num
try:
yield
finally:
_log_state.indentation -= num
def get_indentation():
return getattr(_log_state, 'indentation', 0)
class IndentingFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
"""
A logging.Formatter that obeys the indent_log() context manager.
:param add_timestamp: A bool indicating output lines should be prefixed
with their record's timestamp.
"""
self.add_timestamp = kwargs.pop("add_timestamp", False)
super(IndentingFormatter, self).__init__(*args, **kwargs)
def get_message_start(self, formatted, levelno):
"""
Return the start of the formatted log message (not counting the
prefix to add to each line).
"""
if levelno < logging.WARNING:
return ''
if formatted.startswith(DEPRECATION_MSG_PREFIX):
# Then the message already has a prefix. We don't want it to
# look like "WARNING: DEPRECATION: ...."
return ''
if levelno < logging.ERROR:
return 'WARNING: '
return 'ERROR: '
def format(self, record):
"""
Calls the standard formatter, but will indent all of the log message
lines by our current indentation level.
"""
formatted = super(IndentingFormatter, self).format(record)
message_start = self.get_message_start(formatted, record.levelno)
formatted = message_start + formatted
prefix = ''
if self.add_timestamp:
# TODO: Use Formatter.default_time_format after dropping PY2.
t = self.formatTime(record, "%Y-%m-%dT%H:%M:%S")
prefix = '{t},{record.msecs:03.0f} '.format(**locals())
prefix += " " * get_indentation()
formatted = "".join([
prefix + line
for line in formatted.splitlines(True)
])
return formatted
def _color_wrap(*colors):
def wrapped(inp):
return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
return wrapped
class ColorizedStreamHandler(logging.StreamHandler):
# Don't build up a list of colors if we don't have colorama
if colorama:
COLORS = [
# This needs to be in order from highest logging level to lowest.
(logging.ERROR, _color_wrap(Fore.RED)),
(logging.WARNING, _color_wrap(Fore.YELLOW)),
]
else:
COLORS = []
def __init__(self, stream=None, no_color=None):
logging.StreamHandler.__init__(self, stream)
self._no_color = no_color
if WINDOWS and colorama:
self.stream = colorama.AnsiToWin32(self.stream)
def _using_stdout(self):
"""
Return whether the handler is using sys.stdout.
"""
if WINDOWS and colorama:
# Then self.stream is an AnsiToWin32 object.
return self.stream.wrapped is sys.stdout
return self.stream is sys.stdout
def should_color(self):
# Don't colorize things if we do not have colorama or if told not to
if not colorama or self._no_color:
return False
real_stream = (
self.stream if not isinstance(self.stream, colorama.AnsiToWin32)
else self.stream.wrapped
)
# If the stream is a tty we should color it
if hasattr(real_stream, "isatty") and real_stream.isatty():
return True
# If we have an ANSI term we should color it
if os.environ.get("TERM") == "ANSI":
return True
# If anything else we should not color it
return False
def format(self, record):
msg = logging.StreamHandler.format(self, record)
if self.should_color():
for level, color in self.COLORS:
if record.levelno >= level:
msg = color(msg)
break
return msg
# The logging module says handleError() can be customized.
def handleError(self, record):
exc_class, exc = sys.exc_info()[:2]
# If a broken pipe occurred while calling write() or flush() on the
# stdout stream in logging's Handler.emit(), then raise our special
# exception so we can handle it in main() instead of logging the
# broken pipe error and continuing.
if (exc_class and self._using_stdout() and
_is_broken_pipe_error(exc_class, exc)):
raise BrokenStdoutLoggingError()
return super(ColorizedStreamHandler, self).handleError(record)
class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
def _open(self):
ensure_dir(os.path.dirname(self.baseFilename))
return logging.handlers.RotatingFileHandler._open(self)
class MaxLevelFilter(Filter):
def __init__(self, level):
self.level = level
def filter(self, record):
return record.levelno < self.level
class ExcludeLoggerFilter(Filter):
"""
A logging Filter that excludes records from a logger (or its children).
"""
def filter(self, record):
# The base Filter class allows only records from a logger (or its
# children).
return not super(ExcludeLoggerFilter, self).filter(record)
def setup_logging(verbosity, no_color, user_log_file):
"""Configures and sets up all of the logging
Returns the requested logging level, as its integer value.
"""
# Determine the level to be logging at.
if verbosity >= 1:
level = "DEBUG"
elif verbosity == -1:
level = "WARNING"
elif verbosity == -2:
level = "ERROR"
elif verbosity <= -3:
level = "CRITICAL"
else:
level = "INFO"
level_number = getattr(logging, level)
# The "root" logger should match the "console" level *unless* we also need
# to log to a user log file.
include_user_log = user_log_file is not None
if include_user_log:
additional_log_file = user_log_file
root_level = "DEBUG"
else:
additional_log_file = "/dev/null"
root_level = level
# Disable any logging besides WARNING unless we have DEBUG level logging
# enabled for vendored libraries.
vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG"
# Shorthands for clarity
log_streams = {
"stdout": "ext://sys.stdout",
"stderr": "ext://sys.stderr",
}
handler_classes = {
"stream": "pip._internal.utils.logging.ColorizedStreamHandler",
"file": "pip._internal.utils.logging.BetterRotatingFileHandler",
}
handlers = ["console", "console_errors", "console_subprocess"] + (
["user_log"] if include_user_log else []
)
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"filters": {
"exclude_warnings": {
"()": "pip._internal.utils.logging.MaxLevelFilter",
"level": logging.WARNING,
},
"restrict_to_subprocess": {
"()": "logging.Filter",
"name": subprocess_logger.name,
},
"exclude_subprocess": {
"()": "pip._internal.utils.logging.ExcludeLoggerFilter",
"name": subprocess_logger.name,
},
},
"formatters": {
"indent": {
"()": IndentingFormatter,
"format": "%(message)s",
},
"indent_with_timestamp": {
"()": IndentingFormatter,
"format": "%(message)s",
"add_timestamp": True,
},
},
"handlers": {
"console": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stdout"],
"filters": ["exclude_subprocess", "exclude_warnings"],
"formatter": "indent",
},
"console_errors": {
"level": "WARNING",
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["exclude_subprocess"],
"formatter": "indent",
},
# A handler responsible for logging to the console messages
# from the "subprocessor" logger.
"console_subprocess": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["restrict_to_subprocess"],
"formatter": "indent",
},
"user_log": {
"level": "DEBUG",
"class": handler_classes["file"],
"filename": additional_log_file,
"delay": True,
"formatter": "indent_with_timestamp",
},
},
"root": {
"level": root_level,
"handlers": handlers,
},
"loggers": {
"pip._vendor": {
"level": vendored_log_level
}
},
})
return level_number

View file

@ -0,0 +1,959 @@
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
# mypy: disallow-untyped-defs=False
from __future__ import absolute_import
import contextlib
import errno
import getpass
import hashlib
import io
import logging
import os
import posixpath
import shutil
import stat
import sys
from collections import deque
from itertools import tee
from pip._vendor import pkg_resources
from pip._vendor.packaging.utils import canonicalize_name
# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import.
from pip._vendor.retrying import retry # type: ignore
from pip._vendor.six import PY2, text_type
from pip._vendor.six.moves import filter, filterfalse, input, map, zip_longest
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote
from pip import __version__
from pip._internal.exceptions import CommandError
from pip._internal.locations import (
get_major_minor_version,
site_packages,
user_site,
)
from pip._internal.utils.compat import (
WINDOWS,
expanduser,
stdlib_pkgs,
str_to_display,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
from pip._internal.utils.virtualenv import (
running_under_virtualenv,
virtualenv_no_global,
)
if PY2:
from io import BytesIO as StringIO
else:
from io import StringIO
if MYPY_CHECK_RUNNING:
from typing import (
Any, AnyStr, Callable, Container, Iterable, Iterator, List, Optional,
Text, Tuple, TypeVar, Union,
)
from pip._vendor.pkg_resources import Distribution
VersionInfo = Tuple[int, int, int]
T = TypeVar("T")
__all__ = ['rmtree', 'display_path', 'backup_dir',
'ask', 'splitext',
'format_size', 'is_installable_dir',
'normalize_path',
'renames', 'get_prog',
'captured_stdout', 'ensure_dir',
'get_installed_version', 'remove_auth_from_url']
logger = logging.getLogger(__name__)
def get_pip_version():
# type: () -> str
pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
pip_pkg_dir = os.path.abspath(pip_pkg_dir)
return (
'pip {} from {} (python {})'.format(
__version__, pip_pkg_dir, get_major_minor_version(),
)
)
def normalize_version_info(py_version_info):
# type: (Tuple[int, ...]) -> Tuple[int, int, int]
"""
Convert a tuple of ints representing a Python version to one of length
three.
:param py_version_info: a tuple of ints representing a Python version,
or None to specify no version. The tuple can have any length.
:return: a tuple of length three if `py_version_info` is non-None.
Otherwise, return `py_version_info` unchanged (i.e. None).
"""
if len(py_version_info) < 3:
py_version_info += (3 - len(py_version_info)) * (0,)
elif len(py_version_info) > 3:
py_version_info = py_version_info[:3]
return cast('VersionInfo', py_version_info)
def ensure_dir(path):
# type: (AnyStr) -> None
"""os.path.makedirs without EEXIST."""
try:
os.makedirs(path)
except OSError as e:
# Windows can raise spurious ENOTEMPTY errors. See #6426.
if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
raise
def get_prog():
# type: () -> str
try:
prog = os.path.basename(sys.argv[0])
if prog in ('__main__.py', '-c'):
return "{} -m pip".format(sys.executable)
else:
return prog
except (AttributeError, TypeError, IndexError):
pass
return 'pip'
# Retry every half second for up to 3 seconds
@retry(stop_max_delay=3000, wait_fixed=500)
def rmtree(dir, ignore_errors=False):
# type: (Text, bool) -> None
shutil.rmtree(dir, ignore_errors=ignore_errors,
onerror=rmtree_errorhandler)
def rmtree_errorhandler(func, path, exc_info):
"""On Windows, the files in .svn are read-only, so when rmtree() tries to
remove them, an exception is thrown. We catch that here, remove the
read-only attribute, and hopefully continue without problems."""
try:
has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)
except (IOError, OSError):
# it's equivalent to os.path.exists
return
if has_attr_readonly:
# convert to read/write
os.chmod(path, stat.S_IWRITE)
# use the original function to repeat the operation
func(path)
return
else:
raise
def path_to_display(path):
# type: (Optional[Union[str, Text]]) -> Optional[Text]
"""
Convert a bytes (or text) path to text (unicode in Python 2) for display
and logging purposes.
This function should never error out. Also, this function is mainly needed
for Python 2 since in Python 3 str paths are already text.
"""
if path is None:
return None
if isinstance(path, text_type):
return path
# Otherwise, path is a bytes object (str in Python 2).
try:
display_path = path.decode(sys.getfilesystemencoding(), 'strict')
except UnicodeDecodeError:
# Include the full bytes to make troubleshooting easier, even though
# it may not be very human readable.
if PY2:
# Convert the bytes to a readable str representation using
# repr(), and then convert the str to unicode.
# Also, we add the prefix "b" to the repr() return value both
# to make the Python 2 output look like the Python 3 output, and
# to signal to the user that this is a bytes representation.
display_path = str_to_display('b{!r}'.format(path))
else:
# Silence the "F821 undefined name 'ascii'" flake8 error since
# in Python 3 ascii() is a built-in.
display_path = ascii(path) # noqa: F821
return display_path
def display_path(path):
# type: (Union[str, Text]) -> str
"""Gives the display value for a given path, making it relative to cwd
if possible."""
path = os.path.normcase(os.path.abspath(path))
if sys.version_info[0] == 2:
path = path.decode(sys.getfilesystemencoding(), 'replace')
path = path.encode(sys.getdefaultencoding(), 'replace')
if path.startswith(os.getcwd() + os.path.sep):
path = '.' + path[len(os.getcwd()):]
return path
def backup_dir(dir, ext='.bak'):
# type: (str, str) -> str
"""Figure out the name of a directory to back up the given dir to
(adding .bak, .bak2, etc)"""
n = 1
extension = ext
while os.path.exists(dir + extension):
n += 1
extension = ext + str(n)
return dir + extension
def ask_path_exists(message, options):
# type: (str, Iterable[str]) -> str
for action in os.environ.get('PIP_EXISTS_ACTION', '').split():
if action in options:
return action
return ask(message, options)
def _check_no_input(message):
# type: (str) -> None
"""Raise an error if no input is allowed."""
if os.environ.get('PIP_NO_INPUT'):
raise Exception(
'No input was expected ($PIP_NO_INPUT set); question: {}'.format(
message)
)
def ask(message, options):
# type: (str, Iterable[str]) -> str
"""Ask the message interactively, with the given possible responses"""
while 1:
_check_no_input(message)
response = input(message)
response = response.strip().lower()
if response not in options:
print(
'Your response ({!r}) was not one of the expected responses: '
'{}'.format(response, ', '.join(options))
)
else:
return response
def ask_input(message):
# type: (str) -> str
"""Ask for input interactively."""
_check_no_input(message)
return input(message)
def ask_password(message):
# type: (str) -> str
"""Ask for a password interactively."""
_check_no_input(message)
return getpass.getpass(message)
def format_size(bytes):
# type: (float) -> str
if bytes > 1000 * 1000:
return '{:.1f} MB'.format(bytes / 1000.0 / 1000)
elif bytes > 10 * 1000:
return '{} kB'.format(int(bytes / 1000))
elif bytes > 1000:
return '{:.1f} kB'.format(bytes / 1000.0)
else:
return '{} bytes'.format(int(bytes))
def tabulate(rows):
# type: (Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]
"""Return a list of formatted rows and a list of column sizes.
For example::
>>> tabulate([['foobar', 2000], [0xdeadbeef]])
(['foobar 2000', '3735928559'], [10, 4])
"""
rows = [tuple(map(str, row)) for row in rows]
sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue='')]
table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
return table, sizes
def is_installable_dir(path):
# type: (str) -> bool
"""Is path is a directory containing setup.py or pyproject.toml?
"""
if not os.path.isdir(path):
return False
setup_py = os.path.join(path, 'setup.py')
if os.path.isfile(setup_py):
return True
pyproject_toml = os.path.join(path, 'pyproject.toml')
if os.path.isfile(pyproject_toml):
return True
return False
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
"""Yield pieces of data from a file-like object until EOF."""
while True:
chunk = file.read(size)
if not chunk:
break
yield chunk
def normalize_path(path, resolve_symlinks=True):
# type: (str, bool) -> str
"""
Convert a path to its canonical, case-normalized, absolute version.
"""
path = expanduser(path)
if resolve_symlinks:
path = os.path.realpath(path)
else:
path = os.path.abspath(path)
return os.path.normcase(path)
def splitext(path):
# type: (str) -> Tuple[str, str]
"""Like os.path.splitext, but take off .tar too"""
base, ext = posixpath.splitext(path)
if base.lower().endswith('.tar'):
ext = base[-4:] + ext
base = base[:-4]
return base, ext
def renames(old, new):
# type: (str, str) -> None
"""Like os.renames(), but handles renaming across devices."""
# Implementation borrowed from os.renames().
head, tail = os.path.split(new)
if head and tail and not os.path.exists(head):
os.makedirs(head)
shutil.move(old, new)
head, tail = os.path.split(old)
if head and tail:
try:
os.removedirs(head)
except OSError:
pass
def is_local(path):
# type: (str) -> bool
"""
Return True if path is within sys.prefix, if we're running in a virtualenv.
If we're not in a virtualenv, all paths are considered "local."
Caution: this function assumes the head of path has been normalized
with normalize_path.
"""
if not running_under_virtualenv():
return True
return path.startswith(normalize_path(sys.prefix))
def dist_is_local(dist):
# type: (Distribution) -> bool
"""
Return True if given Distribution object is installed locally
(i.e. within current virtualenv).
Always True if we're not in a virtualenv.
"""
return is_local(dist_location(dist))
def dist_in_usersite(dist):
# type: (Distribution) -> bool
"""
Return True if given Distribution is installed in user site.
"""
return dist_location(dist).startswith(normalize_path(user_site))
def dist_in_site_packages(dist):
# type: (Distribution) -> bool
"""
Return True if given Distribution is installed in
sysconfig.get_python_lib().
"""
return dist_location(dist).startswith(normalize_path(site_packages))
def dist_is_editable(dist):
# type: (Distribution) -> bool
"""
Return True if given Distribution is an editable install.
"""
for path_item in sys.path:
egg_link = os.path.join(path_item, dist.project_name + '.egg-link')
if os.path.isfile(egg_link):
return True
return False
def get_installed_distributions(
local_only=True, # type: bool
skip=stdlib_pkgs, # type: Container[str]
include_editables=True, # type: bool
editables_only=False, # type: bool
user_only=False, # type: bool
paths=None # type: Optional[List[str]]
):
# type: (...) -> List[Distribution]
"""
Return a list of installed Distribution objects.
If ``local_only`` is True (default), only return installations
local to the current virtualenv, if in a virtualenv.
``skip`` argument is an iterable of lower-case project names to
ignore; defaults to stdlib_pkgs
If ``include_editables`` is False, don't report editables.
If ``editables_only`` is True , only report editables.
If ``user_only`` is True , only report installations in the user
site directory.
If ``paths`` is set, only report the distributions present at the
specified list of locations.
"""
if paths:
working_set = pkg_resources.WorkingSet(paths)
else:
working_set = pkg_resources.working_set
if local_only:
local_test = dist_is_local
else:
def local_test(d):
return True
if include_editables:
def editable_test(d):
return True
else:
def editable_test(d):
return not dist_is_editable(d)
if editables_only:
def editables_only_test(d):
return dist_is_editable(d)
else:
def editables_only_test(d):
return True
if user_only:
user_test = dist_in_usersite
else:
def user_test(d):
return True
return [d for d in working_set
if local_test(d) and
d.key not in skip and
editable_test(d) and
editables_only_test(d) and
user_test(d)
]
def _search_distribution(req_name):
# type: (str) -> Optional[Distribution]
"""Find a distribution matching the ``req_name`` in the environment.
This searches from *all* distributions available in the environment, to
match the behavior of ``pkg_resources.get_distribution()``.
"""
# Canonicalize the name before searching in the list of
# installed distributions and also while creating the package
# dictionary to get the Distribution object
req_name = canonicalize_name(req_name)
packages = get_installed_distributions(
local_only=False,
skip=(),
include_editables=True,
editables_only=False,
user_only=False,
paths=None,
)
pkg_dict = {canonicalize_name(p.key): p for p in packages}
return pkg_dict.get(req_name)
def get_distribution(req_name):
# type: (str) -> Optional[Distribution]
"""Given a requirement name, return the installed Distribution object.
This searches from *all* distributions available in the environment, to
match the behavior of ``pkg_resources.get_distribution()``.
"""
# Search the distribution by looking through the working set
dist = _search_distribution(req_name)
# If distribution could not be found, call working_set.require
# to update the working set, and try to find the distribution
# again.
# This might happen for e.g. when you install a package
# twice, once using setup.py develop and again using setup.py install.
# Now when run pip uninstall twice, the package gets removed
# from the working set in the first uninstall, so we have to populate
# the working set again so that pip knows about it and the packages
# gets picked up and is successfully uninstalled the second time too.
if not dist:
try:
pkg_resources.working_set.require(req_name)
except pkg_resources.DistributionNotFound:
return None
return _search_distribution(req_name)
def egg_link_path(dist):
# type: (Distribution) -> Optional[str]
"""
Return the path for the .egg-link file if it exists, otherwise, None.
There's 3 scenarios:
1) not in a virtualenv
try to find in site.USER_SITE, then site_packages
2) in a no-global virtualenv
try to find in site_packages
3) in a yes-global virtualenv
try to find in site_packages, then site.USER_SITE
(don't look in global location)
For #1 and #3, there could be odd cases, where there's an egg-link in 2
locations.
This method will just return the first one found.
"""
sites = []
if running_under_virtualenv():
sites.append(site_packages)
if not virtualenv_no_global() and user_site:
sites.append(user_site)
else:
if user_site:
sites.append(user_site)
sites.append(site_packages)
for site in sites:
egglink = os.path.join(site, dist.project_name) + '.egg-link'
if os.path.isfile(egglink):
return egglink
return None
def dist_location(dist):
# type: (Distribution) -> str
"""
Get the site-packages location of this distribution. Generally
this is dist.location, except in the case of develop-installed
packages, where dist.location is the source code location, and we
want to know where the egg-link file is.
The returned location is normalized (in particular, with symlinks removed).
"""
egg_link = egg_link_path(dist)
if egg_link:
return normalize_path(egg_link)
return normalize_path(dist.location)
def write_output(msg, *args):
# type: (Any, Any) -> None
logger.info(msg, *args)
class FakeFile(object):
"""Wrap a list of lines in an object with readline() to make
ConfigParser happy."""
def __init__(self, lines):
self._gen = iter(lines)
def readline(self):
try:
return next(self._gen)
except StopIteration:
return ''
def __iter__(self):
return self._gen
class StreamWrapper(StringIO):
@classmethod
def from_stream(cls, orig_stream):
cls.orig_stream = orig_stream
return cls()
# compileall.compile_dir() needs stdout.encoding to print to stdout
@property
def encoding(self):
return self.orig_stream.encoding
@contextlib.contextmanager
def captured_output(stream_name):
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO.
Taken from Lib/support/__init__.py in the CPython repo.
"""
orig_stdout = getattr(sys, stream_name)
setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
try:
yield getattr(sys, stream_name)
finally:
setattr(sys, stream_name, orig_stdout)
def captured_stdout():
"""Capture the output of sys.stdout:
with captured_stdout() as stdout:
print('hello')
self.assertEqual(stdout.getvalue(), 'hello\n')
Taken from Lib/support/__init__.py in the CPython repo.
"""
return captured_output('stdout')
def captured_stderr():
"""
See captured_stdout().
"""
return captured_output('stderr')
def get_installed_version(dist_name, working_set=None):
"""Get the installed version of dist_name avoiding pkg_resources cache"""
# Create a requirement that we'll look for inside of setuptools.
req = pkg_resources.Requirement.parse(dist_name)
if working_set is None:
# We want to avoid having this cached, so we need to construct a new
# working set each time.
working_set = pkg_resources.WorkingSet()
# Get the installed distribution from our working set
dist = working_set.find(req)
# Check to see if we got an installed distribution or not, if we did
# we want to return it's version.
return dist.version if dist else None
def consume(iterator):
"""Consume an iterable at C speed."""
deque(iterator, maxlen=0)
# Simulates an enum
def enum(*sequential, **named):
enums = dict(zip(sequential, range(len(sequential))), **named)
reverse = {value: key for key, value in enums.items()}
enums['reverse_mapping'] = reverse
return type('Enum', (), enums)
def build_netloc(host, port):
# type: (str, Optional[int]) -> str
"""
Build a netloc from a host-port pair
"""
if port is None:
return host
if ':' in host:
# Only wrap host with square brackets when it is IPv6
host = '[{}]'.format(host)
return '{}:{}'.format(host, port)
def build_url_from_netloc(netloc, scheme='https'):
# type: (str, str) -> str
"""
Build a full URL from a netloc.
"""
if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc:
# It must be a bare IPv6 address, so wrap it with brackets.
netloc = '[{}]'.format(netloc)
return '{}://{}'.format(scheme, netloc)
def parse_netloc(netloc):
# type: (str) -> Tuple[str, Optional[int]]
"""
Return the host-port pair from a netloc.
"""
url = build_url_from_netloc(netloc)
parsed = urllib_parse.urlparse(url)
return parsed.hostname, parsed.port
def split_auth_from_netloc(netloc):
"""
Parse out and remove the auth information from a netloc.
Returns: (netloc, (username, password)).
"""
if '@' not in netloc:
return netloc, (None, None)
# Split from the right because that's how urllib.parse.urlsplit()
# behaves if more than one @ is present (which can be checked using
# the password attribute of urlsplit()'s return value).
auth, netloc = netloc.rsplit('@', 1)
if ':' in auth:
# Split from the left because that's how urllib.parse.urlsplit()
# behaves if more than one : is present (which again can be checked
# using the password attribute of the return value)
user_pass = auth.split(':', 1)
else:
user_pass = auth, None
user_pass = tuple(
None if x is None else urllib_unquote(x) for x in user_pass
)
return netloc, user_pass
def redact_netloc(netloc):
# type: (str) -> str
"""
Replace the sensitive data in a netloc with "****", if it exists.
For example:
- "user:pass@example.com" returns "user:****@example.com"
- "accesstoken@example.com" returns "****@example.com"
"""
netloc, (user, password) = split_auth_from_netloc(netloc)
if user is None:
return netloc
if password is None:
user = '****'
password = ''
else:
user = urllib_parse.quote(user)
password = ':****'
return '{user}{password}@{netloc}'.format(user=user,
password=password,
netloc=netloc)
def _transform_url(url, transform_netloc):
"""Transform and replace netloc in a url.
transform_netloc is a function taking the netloc and returning a
tuple. The first element of this tuple is the new netloc. The
entire tuple is returned.
Returns a tuple containing the transformed url as item 0 and the
original tuple returned by transform_netloc as item 1.
"""
purl = urllib_parse.urlsplit(url)
netloc_tuple = transform_netloc(purl.netloc)
# stripped url
url_pieces = (
purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment
)
surl = urllib_parse.urlunsplit(url_pieces)
return surl, netloc_tuple
def _get_netloc(netloc):
return split_auth_from_netloc(netloc)
def _redact_netloc(netloc):
return (redact_netloc(netloc),)
def split_auth_netloc_from_url(url):
# type: (str) -> Tuple[str, str, Tuple[str, str]]
"""
Parse a url into separate netloc, auth, and url with no auth.
Returns: (url_without_auth, netloc, (username, password))
"""
url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
return url_without_auth, netloc, auth
def remove_auth_from_url(url):
# type: (str) -> str
"""Return a copy of url with 'username:password@' removed."""
# username/pass params are passed to subversion through flags
# and are not recognized in the url.
return _transform_url(url, _get_netloc)[0]
def redact_auth_from_url(url):
# type: (str) -> str
"""Replace the password in a given url with ****."""
return _transform_url(url, _redact_netloc)[0]
class HiddenText(object):
def __init__(
self,
secret, # type: str
redacted, # type: str
):
# type: (...) -> None
self.secret = secret
self.redacted = redacted
def __repr__(self):
# type: (...) -> str
return '<HiddenText {!r}>'.format(str(self))
def __str__(self):
# type: (...) -> str
return self.redacted
# This is useful for testing.
def __eq__(self, other):
# type: (Any) -> bool
if type(self) != type(other):
return False
# The string being used for redaction doesn't also have to match,
# just the raw, original string.
return (self.secret == other.secret)
# We need to provide an explicit __ne__ implementation for Python 2.
# TODO: remove this when we drop PY2 support.
def __ne__(self, other):
# type: (Any) -> bool
return not self == other
def hide_value(value):
# type: (str) -> HiddenText
return HiddenText(value, redacted='****')
def hide_url(url):
# type: (str) -> HiddenText
redacted = redact_auth_from_url(url)
return HiddenText(url, redacted=redacted)
def protect_pip_from_modification_on_windows(modifying_pip):
# type: (bool) -> None
"""Protection of pip.exe from modification on Windows
On Windows, any operation modifying pip should be run as:
python -m pip ...
"""
pip_names = [
"pip.exe",
"pip{}.exe".format(sys.version_info[0]),
"pip{}.{}.exe".format(*sys.version_info[:2])
]
# See https://github.com/pypa/pip/issues/1299 for more discussion
should_show_use_python_msg = (
modifying_pip and
WINDOWS and
os.path.basename(sys.argv[0]) in pip_names
)
if should_show_use_python_msg:
new_command = [
sys.executable, "-m", "pip"
] + sys.argv[1:]
raise CommandError(
'To modify pip, please run the following command:\n{}'
.format(" ".join(new_command))
)
def is_console_interactive():
# type: () -> bool
"""Is this console interactive?
"""
return sys.stdin is not None and sys.stdin.isatty()
def hash_file(path, blocksize=1 << 20):
# type: (Text, int) -> Tuple[Any, int]
"""Return (hash, length) for path using hashlib.sha256()
"""
h = hashlib.sha256()
length = 0
with open(path, 'rb') as f:
for block in read_chunks(f, size=blocksize):
length += len(block)
h.update(block)
return h, length
def is_wheel_installed():
"""
Return whether the wheel package is installed.
"""
try:
import wheel # noqa: F401
except ImportError:
return False
return True
def pairwise(iterable):
# type: (Iterable[Any]) -> Iterator[Tuple[Any, Any]]
"""
Return paired elements.
For example:
s -> (s0, s1), (s2, s3), (s4, s5), ...
"""
iterable = iter(iterable)
return zip_longest(iterable, iterable)
def partition(
pred, # type: Callable[[T], bool]
iterable, # type: Iterable[T]
):
# type: (...) -> Tuple[Iterable[T], Iterable[T]]
"""
Use a predicate to partition entries into false entries and true entries,
like
partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
"""
t1, t2 = tee(iterable)
return filterfalse(pred, t1), filter(pred, t2)

View file

@ -0,0 +1,44 @@
"""Utilities for defining models
"""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import operator
class KeyBasedCompareMixin(object):
"""Provides comparison capabilities that is based on a key
"""
__slots__ = ['_compare_key', '_defining_class']
def __init__(self, key, defining_class):
self._compare_key = key
self._defining_class = defining_class
def __hash__(self):
return hash(self._compare_key)
def __lt__(self, other):
return self._compare(other, operator.__lt__)
def __le__(self, other):
return self._compare(other, operator.__le__)
def __gt__(self, other):
return self._compare(other, operator.__gt__)
def __ge__(self, other):
return self._compare(other, operator.__ge__)
def __eq__(self, other):
return self._compare(other, operator.__eq__)
def __ne__(self, other):
return self._compare(other, operator.__ne__)
def _compare(self, other, method):
if not isinstance(other, self._defining_class):
return NotImplemented
return method(self._compare_key, other._compare_key)

View file

@ -0,0 +1,94 @@
from __future__ import absolute_import
import logging
from email.parser import FeedParser
from pip._vendor import pkg_resources
from pip._vendor.packaging import specifiers, version
from pip._internal.exceptions import NoneMetadataError
from pip._internal.utils.misc import display_path
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
from email.message import Message
from pip._vendor.pkg_resources import Distribution
logger = logging.getLogger(__name__)
def check_requires_python(requires_python, version_info):
# type: (Optional[str], Tuple[int, ...]) -> bool
"""
Check if the given Python version matches a "Requires-Python" specifier.
:param version_info: A 3-tuple of ints representing a Python
major-minor-micro version to check (e.g. `sys.version_info[:3]`).
:return: `True` if the given Python version satisfies the requirement.
Otherwise, return `False`.
:raises InvalidSpecifier: If `requires_python` has an invalid format.
"""
if requires_python is None:
# The package provides no information
return True
requires_python_specifier = specifiers.SpecifierSet(requires_python)
python_version = version.parse('.'.join(map(str, version_info)))
return python_version in requires_python_specifier
def get_metadata(dist):
# type: (Distribution) -> Message
"""
:raises NoneMetadataError: if the distribution reports `has_metadata()`
True but `get_metadata()` returns None.
"""
metadata_name = 'METADATA'
if (isinstance(dist, pkg_resources.DistInfoDistribution) and
dist.has_metadata(metadata_name)):
metadata = dist.get_metadata(metadata_name)
elif dist.has_metadata('PKG-INFO'):
metadata_name = 'PKG-INFO'
metadata = dist.get_metadata(metadata_name)
else:
logger.warning("No metadata found in %s", display_path(dist.location))
metadata = ''
if metadata is None:
raise NoneMetadataError(dist, metadata_name)
feed_parser = FeedParser()
# The following line errors out if with a "NoneType" TypeError if
# passed metadata=None.
feed_parser.feed(metadata)
return feed_parser.close()
def get_requires_python(dist):
# type: (pkg_resources.Distribution) -> Optional[str]
"""
Return the "Requires-Python" metadata for a distribution, or None
if not present.
"""
pkg_info_dict = get_metadata(dist)
requires_python = pkg_info_dict.get('Requires-Python')
if requires_python is not None:
# Convert to a str to satisfy the type checker, since requires_python
# can be a Header object.
requires_python = str(requires_python)
return requires_python
def get_installer(dist):
# type: (Distribution) -> str
if dist.has_metadata('INSTALLER'):
for line in dist.get_metadata_lines('INSTALLER'):
if line.strip():
return line.strip()
return ''

View file

@ -0,0 +1,107 @@
"""Convenient parallelization of higher order functions.
This module provides two helper functions, with appropriate fallbacks on
Python 2 and on systems lacking support for synchronization mechanisms:
- map_multiprocess
- map_multithread
These helpers work like Python 3's map, with two differences:
- They don't guarantee the order of processing of
the elements of the iterable.
- The underlying process/thread pools chop the iterable into
a number of chunks, so that for very long iterables using
a large value for chunksize can make the job complete much faster
than using the default value of 1.
"""
__all__ = ['map_multiprocess', 'map_multithread']
from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
from pip._vendor.six import PY2
from pip._vendor.six.moves import map
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Callable, Iterable, Iterator, Union, TypeVar
from multiprocessing import pool
Pool = Union[pool.Pool, pool.ThreadPool]
S = TypeVar('S')
T = TypeVar('T')
# On platforms without sem_open, multiprocessing[.dummy] Pool
# cannot be created.
try:
import multiprocessing.synchronize # noqa
except ImportError:
LACK_SEM_OPEN = True
else:
LACK_SEM_OPEN = False
# Incredibly large timeout to work around bpo-8296 on Python 2.
TIMEOUT = 2000000
@contextmanager
def closing(pool):
# type: (Pool) -> Iterator[Pool]
"""Return a context manager making sure the pool closes properly."""
try:
yield pool
finally:
# For Pool.imap*, close and join are needed
# for the returned iterator to begin yielding.
pool.close()
pool.join()
pool.terminate()
def _map_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.
This function is the sequential fallback either on Python 2
where Pool.imap* doesn't react to KeyboardInterrupt
or when sem_open is unavailable.
"""
return map(func, iterable)
def _map_multiprocess(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with closing(ProcessPool()) as pool:
return pool.imap_unordered(func, iterable, chunksize)
def _map_multithread(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
return pool.imap_unordered(func, iterable, chunksize)
if LACK_SEM_OPEN or PY2:
map_multiprocess = map_multithread = _map_fallback
else:
map_multiprocess = _map_multiprocess
map_multithread = _map_multithread

View file

@ -0,0 +1,44 @@
from pip._vendor.pkg_resources import yield_lines
from pip._vendor.six import ensure_str
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Dict, Iterable, List
class DictMetadata(object):
"""IMetadataProvider that reads metadata files from a dictionary.
"""
def __init__(self, metadata):
# type: (Dict[str, bytes]) -> None
self._metadata = metadata
def has_metadata(self, name):
# type: (str) -> bool
return name in self._metadata
def get_metadata(self, name):
# type: (str) -> str
try:
return ensure_str(self._metadata[name])
except UnicodeDecodeError as e:
# Mirrors handling done in pkg_resources.NullProvider.
e.reason += " in {} file".format(name)
raise
def get_metadata_lines(self, name):
# type: (str) -> Iterable[str]
return yield_lines(self.get_metadata(name))
def metadata_isdir(self, name):
# type: (str) -> bool
return False
def metadata_listdir(self, name):
# type: (str) -> List[str]
return []
def run_script(self, script_name, namespace):
# type: (str, str) -> None
pass

View file

@ -0,0 +1,181 @@
import sys
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List, Optional, Sequence
# Shim to wrap setup.py invocation with setuptools
#
# We set sys.argv[0] to the path to the underlying setup.py file so
# setuptools / distutils don't take the path to the setup.py to be "-c" when
# invoking via the shim. This avoids e.g. the following manifest_maker
# warning: "warning: manifest_maker: standard file '-c' not found".
_SETUPTOOLS_SHIM = (
"import sys, setuptools, tokenize; sys.argv[0] = {0!r}; __file__={0!r};"
"f=getattr(tokenize, 'open', open)(__file__);"
"code=f.read().replace('\\r\\n', '\\n');"
"f.close();"
"exec(compile(code, __file__, 'exec'))"
)
def make_setuptools_shim_args(
setup_py_path, # type: str
global_options=None, # type: Sequence[str]
no_user_config=False, # type: bool
unbuffered_output=False # type: bool
):
# type: (...) -> List[str]
"""
Get setuptools command arguments with shim wrapped setup file invocation.
:param setup_py_path: The path to setup.py to be wrapped.
:param global_options: Additional global options.
:param no_user_config: If True, disables personal user configuration.
:param unbuffered_output: If True, adds the unbuffered switch to the
argument list.
"""
args = [sys.executable]
if unbuffered_output:
args += ["-u"]
args += ["-c", _SETUPTOOLS_SHIM.format(setup_py_path)]
if global_options:
args += global_options
if no_user_config:
args += ["--no-user-cfg"]
return args
def make_setuptools_bdist_wheel_args(
setup_py_path, # type: str
global_options, # type: Sequence[str]
build_options, # type: Sequence[str]
destination_dir, # type: str
):
# type: (...) -> List[str]
# NOTE: Eventually, we'd want to also -S to the flags here, when we're
# isolating. Currently, it breaks Python in virtualenvs, because it
# relies on site.py to find parts of the standard library outside the
# virtualenv.
args = make_setuptools_shim_args(
setup_py_path,
global_options=global_options,
unbuffered_output=True
)
args += ["bdist_wheel", "-d", destination_dir]
args += build_options
return args
def make_setuptools_clean_args(
setup_py_path, # type: str
global_options, # type: Sequence[str]
):
# type: (...) -> List[str]
args = make_setuptools_shim_args(
setup_py_path,
global_options=global_options,
unbuffered_output=True
)
args += ["clean", "--all"]
return args
def make_setuptools_develop_args(
setup_py_path, # type: str
global_options, # type: Sequence[str]
install_options, # type: Sequence[str]
no_user_config, # type: bool
prefix, # type: Optional[str]
home, # type: Optional[str]
use_user_site, # type: bool
):
# type: (...) -> List[str]
assert not (use_user_site and prefix)
args = make_setuptools_shim_args(
setup_py_path,
global_options=global_options,
no_user_config=no_user_config,
)
args += ["develop", "--no-deps"]
args += install_options
if prefix:
args += ["--prefix", prefix]
if home is not None:
args += ["--home", home]
if use_user_site:
args += ["--user", "--prefix="]
return args
def make_setuptools_egg_info_args(
setup_py_path, # type: str
egg_info_dir, # type: Optional[str]
no_user_config, # type: bool
):
# type: (...) -> List[str]
args = make_setuptools_shim_args(
setup_py_path, no_user_config=no_user_config
)
args += ["egg_info"]
if egg_info_dir:
args += ["--egg-base", egg_info_dir]
return args
def make_setuptools_install_args(
setup_py_path, # type: str
global_options, # type: Sequence[str]
install_options, # type: Sequence[str]
record_filename, # type: str
root, # type: Optional[str]
prefix, # type: Optional[str]
header_dir, # type: Optional[str]
home, # type: Optional[str]
use_user_site, # type: bool
no_user_config, # type: bool
pycompile # type: bool
):
# type: (...) -> List[str]
assert not (use_user_site and prefix)
assert not (use_user_site and root)
args = make_setuptools_shim_args(
setup_py_path,
global_options=global_options,
no_user_config=no_user_config,
unbuffered_output=True
)
args += ["install", "--record", record_filename]
args += ["--single-version-externally-managed"]
if root is not None:
args += ["--root", root]
if prefix is not None:
args += ["--prefix", prefix]
if home is not None:
args += ["--home", home]
if use_user_site:
args += ["--user", "--prefix="]
if pycompile:
args += ["--compile"]
else:
args += ["--no-compile"]
if header_dir:
args += ["--install-headers", header_dir]
args += install_options
return args

View file

@ -0,0 +1,280 @@
from __future__ import absolute_import
import logging
import os
import subprocess
from pip._vendor.six.moves import shlex_quote
from pip._internal.cli.spinners import SpinnerInterface, open_spinner
from pip._internal.exceptions import InstallationError
from pip._internal.utils.compat import console_to_str, str_to_display
from pip._internal.utils.logging import subprocess_logger
from pip._internal.utils.misc import HiddenText, path_to_display
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import (
Any, Callable, Iterable, List, Mapping, Optional, Text, Union,
)
CommandArgs = List[Union[str, HiddenText]]
LOG_DIVIDER = '----------------------------------------'
def make_command(*args):
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
"""
Create a CommandArgs object.
"""
command_args = [] # type: CommandArgs
for arg in args:
# Check for list instead of CommandArgs since CommandArgs is
# only known during type-checking.
if isinstance(arg, list):
command_args.extend(arg)
else:
# Otherwise, arg is str or HiddenText.
command_args.append(arg)
return command_args
def format_command_args(args):
# type: (Union[List[str], CommandArgs]) -> str
"""
Format command arguments for display.
"""
# For HiddenText arguments, display the redacted form by calling str().
# Also, we don't apply str() to arguments that aren't HiddenText since
# this can trigger a UnicodeDecodeError in Python 2 if the argument
# has type unicode and includes a non-ascii character. (The type
# checker doesn't ensure the annotations are correct in all cases.)
return ' '.join(
shlex_quote(str(arg)) if isinstance(arg, HiddenText)
else shlex_quote(arg) for arg in args
)
def reveal_command_args(args):
# type: (Union[List[str], CommandArgs]) -> List[str]
"""
Return the arguments in their raw, unredacted form.
"""
return [
arg.secret if isinstance(arg, HiddenText) else arg for arg in args
]
def make_subprocess_output_error(
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[Text]
exit_status, # type: int
):
# type: (...) -> Text
"""
Create and return the error message to use to log a subprocess error
with command output.
:param lines: A list of lines, each ending with a newline.
"""
command = format_command_args(cmd_args)
# Convert `command` and `cwd` to text (unicode in Python 2) so we can use
# them as arguments in the unicode format string below. This avoids
# "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2
# if either contains a non-ascii character.
command_display = str_to_display(command, desc='command bytes')
cwd_display = path_to_display(cwd)
# We know the joined output value ends in a newline.
output = ''.join(lines)
msg = (
# Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
# codec can't encode character ..." in Python 2 when a format
# argument (e.g. `output`) has a non-ascii character.
u'Command errored out with exit status {exit_status}:\n'
' command: {command_display}\n'
' cwd: {cwd_display}\n'
'Complete output ({line_count} lines):\n{output}{divider}'
).format(
exit_status=exit_status,
command_display=command_display,
cwd_display=cwd_display,
line_count=len(lines),
output=output,
divider=LOG_DIVIDER,
)
return msg
def call_subprocess(
cmd, # type: Union[List[str], CommandArgs]
show_stdout=False, # type: bool
cwd=None, # type: Optional[str]
on_returncode='raise', # type: str
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
command_desc=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
unset_environ=None, # type: Optional[Iterable[str]]
spinner=None, # type: Optional[SpinnerInterface]
log_failed_cmd=True # type: Optional[bool]
):
# type: (...) -> Text
"""
Args:
show_stdout: if true, use INFO to log the subprocess's stderr and
stdout streams. Otherwise, use DEBUG. Defaults to False.
extra_ok_returncodes: an iterable of integer return codes that are
acceptable, in addition to 0. Defaults to None, which means [].
unset_environ: an iterable of environment variable names to unset
prior to calling subprocess.Popen().
log_failed_cmd: if false, failed commands are not logged, only raised.
"""
if extra_ok_returncodes is None:
extra_ok_returncodes = []
if unset_environ is None:
unset_environ = []
# Most places in pip use show_stdout=False. What this means is--
#
# - We connect the child's output (combined stderr and stdout) to a
# single pipe, which we read.
# - We log this output to stderr at DEBUG level as it is received.
# - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
# requested), then we show a spinner so the user can still see the
# subprocess is in progress.
# - If the subprocess exits with an error, we log the output to stderr
# at ERROR level if it hasn't already been displayed to the console
# (e.g. if --verbose logging wasn't enabled). This way we don't log
# the output to the console twice.
#
# If show_stdout=True, then the above is still done, but with DEBUG
# replaced by INFO.
if show_stdout:
# Then log the subprocess output at INFO level.
log_subprocess = subprocess_logger.info
used_level = logging.INFO
else:
# Then log the subprocess output using DEBUG. This also ensures
# it will be logged to the log file (aka user_log), if enabled.
log_subprocess = subprocess_logger.debug
used_level = logging.DEBUG
# Whether the subprocess will be visible in the console.
showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
# Only use the spinner if we're not showing the subprocess output
# and we have a spinner.
use_spinner = not showing_subprocess and spinner is not None
if command_desc is None:
command_desc = format_command_args(cmd)
log_subprocess("Running command %s", command_desc)
env = os.environ.copy()
if extra_environ:
env.update(extra_environ)
for name in unset_environ:
env.pop(name, None)
try:
proc = subprocess.Popen(
# Convert HiddenText objects to the underlying str.
reveal_command_args(cmd),
stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, cwd=cwd, env=env,
)
assert proc.stdin
assert proc.stdout
proc.stdin.close()
except Exception as exc:
if log_failed_cmd:
subprocess_logger.critical(
"Error %s while executing command %s", exc, command_desc,
)
raise
all_output = []
while True:
# The "line" value is a unicode string in Python 2.
line = console_to_str(proc.stdout.readline())
if not line:
break
line = line.rstrip()
all_output.append(line + '\n')
# Show the line immediately.
log_subprocess(line)
# Update the spinner.
if use_spinner:
assert spinner
spinner.spin()
try:
proc.wait()
finally:
if proc.stdout:
proc.stdout.close()
proc_had_error = (
proc.returncode and proc.returncode not in extra_ok_returncodes
)
if use_spinner:
assert spinner
if proc_had_error:
spinner.finish("error")
else:
spinner.finish("done")
if proc_had_error:
if on_returncode == 'raise':
if not showing_subprocess and log_failed_cmd:
# Then the subprocess streams haven't been logged to the
# console yet.
msg = make_subprocess_output_error(
cmd_args=cmd,
cwd=cwd,
lines=all_output,
exit_status=proc.returncode,
)
subprocess_logger.error(msg)
exc_msg = (
'Command errored out with exit status {}: {} '
'Check the logs for full command output.'
).format(proc.returncode, command_desc)
raise InstallationError(exc_msg)
elif on_returncode == 'warn':
subprocess_logger.warning(
'Command "%s" had error code %s in %s',
command_desc,
proc.returncode,
cwd,
)
elif on_returncode == 'ignore':
pass
else:
raise ValueError('Invalid value: on_returncode={!r}'.format(
on_returncode))
return ''.join(all_output)
def runner_with_spinner_message(message):
# type: (str) -> Callable[..., None]
"""Provide a subprocess_runner that shows a spinner message.
Intended for use with for pep517's Pep517HookCaller. Thus, the runner has
an API that matches what's expected by Pep517HookCaller.subprocess_runner.
"""
def runner(
cmd, # type: List[str]
cwd=None, # type: Optional[str]
extra_environ=None # type: Optional[Mapping[str, Any]]
):
# type: (...) -> None
with open_spinner(message) as spinner:
call_subprocess(
cmd,
cwd=cwd,
extra_environ=extra_environ,
spinner=spinner,
)
return runner

View file

@ -0,0 +1,274 @@
from __future__ import absolute_import
import errno
import itertools
import logging
import os.path
import tempfile
from contextlib import contextmanager
from pip._vendor.contextlib2 import ExitStack
from pip._vendor.six import ensure_text
from pip._internal.utils.misc import enum, rmtree
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Any, Dict, Iterator, Optional, TypeVar, Union
_T = TypeVar('_T', bound='TempDirectory')
logger = logging.getLogger(__name__)
# Kinds of temporary directories. Only needed for ones that are
# globally-managed.
tempdir_kinds = enum(
BUILD_ENV="build-env",
EPHEM_WHEEL_CACHE="ephem-wheel-cache",
REQ_BUILD="req-build",
)
_tempdir_manager = None # type: Optional[ExitStack]
@contextmanager
def global_tempdir_manager():
# type: () -> Iterator[None]
global _tempdir_manager
with ExitStack() as stack:
old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
try:
yield
finally:
_tempdir_manager = old_tempdir_manager
class TempDirectoryTypeRegistry(object):
"""Manages temp directory behavior
"""
def __init__(self):
# type: () -> None
self._should_delete = {} # type: Dict[str, bool]
def set_delete(self, kind, value):
# type: (str, bool) -> None
"""Indicate whether a TempDirectory of the given kind should be
auto-deleted.
"""
self._should_delete[kind] = value
def get_delete(self, kind):
# type: (str) -> bool
"""Get configured auto-delete flag for a given TempDirectory type,
default True.
"""
return self._should_delete.get(kind, True)
_tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry]
@contextmanager
def tempdir_registry():
# type: () -> Iterator[TempDirectoryTypeRegistry]
"""Provides a scoped global tempdir registry that can be used to dictate
whether directories should be deleted.
"""
global _tempdir_registry
old_tempdir_registry = _tempdir_registry
_tempdir_registry = TempDirectoryTypeRegistry()
try:
yield _tempdir_registry
finally:
_tempdir_registry = old_tempdir_registry
class _Default(object):
pass
_default = _Default()
class TempDirectory(object):
"""Helper class that owns and cleans up a temporary directory.
This class can be used as a context manager or as an OO representation of a
temporary directory.
Attributes:
path
Location to the created temporary directory
delete
Whether the directory should be deleted when exiting
(when used as a contextmanager)
Methods:
cleanup()
Deletes the temporary directory
When used as a context manager, if the delete attribute is True, on
exiting the context the temporary directory is deleted.
"""
def __init__(
self,
path=None, # type: Optional[str]
delete=_default, # type: Union[bool, None, _Default]
kind="temp", # type: str
globally_managed=False, # type: bool
):
super(TempDirectory, self).__init__()
if delete is _default:
if path is not None:
# If we were given an explicit directory, resolve delete option
# now.
delete = False
else:
# Otherwise, we wait until cleanup and see what
# tempdir_registry says.
delete = None
if path is None:
path = self._create(kind)
self._path = path
self._deleted = False
self.delete = delete
self.kind = kind
if globally_managed:
assert _tempdir_manager is not None
_tempdir_manager.enter_context(self)
@property
def path(self):
# type: () -> str
assert not self._deleted, (
"Attempted to access deleted path: {}".format(self._path)
)
return self._path
def __repr__(self):
# type: () -> str
return "<{} {!r}>".format(self.__class__.__name__, self.path)
def __enter__(self):
# type: (_T) -> _T
return self
def __exit__(self, exc, value, tb):
# type: (Any, Any, Any) -> None
if self.delete is not None:
delete = self.delete
elif _tempdir_registry:
delete = _tempdir_registry.get_delete(self.kind)
else:
delete = True
if delete:
self.cleanup()
def _create(self, kind):
# type: (str) -> str
"""Create a temporary directory and store its path in self.path
"""
# We realpath here because some systems have their default tmpdir
# symlinked to another directory. This tends to confuse build
# scripts, so we canonicalize the path by traversing potential
# symlinks here.
path = os.path.realpath(
tempfile.mkdtemp(prefix="pip-{}-".format(kind))
)
logger.debug("Created temporary directory: %s", path)
return path
def cleanup(self):
# type: () -> None
"""Remove the temporary directory created and reset state
"""
self._deleted = True
if os.path.exists(self._path):
# Make sure to pass unicode on Python 2 to make the contents also
# use unicode, ensuring non-ASCII names and can be represented.
rmtree(ensure_text(self._path))
class AdjacentTempDirectory(TempDirectory):
"""Helper class that creates a temporary directory adjacent to a real one.
Attributes:
original
The original directory to create a temp directory for.
path
After calling create() or entering, contains the full
path to the temporary directory.
delete
Whether the directory should be deleted when exiting
(when used as a contextmanager)
"""
# The characters that may be used to name the temp directory
# We always prepend a ~ and then rotate through these until
# a usable name is found.
# pkg_resources raises a different error for .dist-info folder
# with leading '-' and invalid metadata
LEADING_CHARS = "-~.=%0123456789"
def __init__(self, original, delete=None):
# type: (str, Optional[bool]) -> None
self.original = original.rstrip('/\\')
super(AdjacentTempDirectory, self).__init__(delete=delete)
@classmethod
def _generate_names(cls, name):
# type: (str) -> Iterator[str]
"""Generates a series of temporary names.
The algorithm replaces the leading characters in the name
with ones that are valid filesystem characters, but are not
valid package names (for both Python and pip definitions of
package).
"""
for i in range(1, len(name)):
for candidate in itertools.combinations_with_replacement(
cls.LEADING_CHARS, i - 1):
new_name = '~' + ''.join(candidate) + name[i:]
if new_name != name:
yield new_name
# If we make it this far, we will have to make a longer name
for i in range(len(cls.LEADING_CHARS)):
for candidate in itertools.combinations_with_replacement(
cls.LEADING_CHARS, i):
new_name = '~' + ''.join(candidate) + name
if new_name != name:
yield new_name
def _create(self, kind):
# type: (str) -> str
root, name = os.path.split(self.original)
for candidate in self._generate_names(name):
path = os.path.join(root, candidate)
try:
os.mkdir(path)
except OSError as ex:
# Continue if the name exists already
if ex.errno != errno.EEXIST:
raise
else:
path = os.path.realpath(path)
break
else:
# Final fallback on the default behavior.
path = os.path.realpath(
tempfile.mkdtemp(prefix="pip-{}-".format(kind))
)
logger.debug("Created temporary directory: %s", path)
return path

View file

@ -0,0 +1,38 @@
"""For neatly implementing static typing in pip.
`mypy` - the static type analysis tool we use - uses the `typing` module, which
provides core functionality fundamental to mypy's functioning.
Generally, `typing` would be imported at runtime and used in that fashion -
it acts as a no-op at runtime and does not have any run-time overhead by
design.
As it turns out, `typing` is not vendorable - it uses separate sources for
Python 2/Python 3. Thus, this codebase can not expect it to be present.
To work around this, mypy allows the typing import to be behind a False-y
optional to prevent it from running at runtime and type-comments can be used
to remove the need for the types to be accessible directly during runtime.
This module provides the False-y guard in a nicely named fashion so that a
curious maintainer can reach here to read this.
In pip, all static-typing related imports should be guarded as follows:
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import ...
Ref: https://github.com/python/mypy/issues/3216
"""
MYPY_CHECK_RUNNING = False
if MYPY_CHECK_RUNNING:
from typing import cast
else:
# typing's cast() is needed at runtime, but we don't want to import typing.
# Thus, we use a dummy no-op version, which we tell mypy to ignore.
def cast(type_, value): # type: ignore
return value

View file

@ -0,0 +1,281 @@
"""Utilities related archives.
"""
from __future__ import absolute_import
import logging
import os
import shutil
import stat
import tarfile
import zipfile
from pip._internal.exceptions import InstallationError
from pip._internal.utils.filetypes import (
BZ2_EXTENSIONS,
TAR_EXTENSIONS,
XZ_EXTENSIONS,
ZIP_EXTENSIONS,
)
from pip._internal.utils.misc import ensure_dir
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Iterable, List, Optional, Text, Union
from zipfile import ZipInfo
logger = logging.getLogger(__name__)
SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
try:
import bz2 # noqa
SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
except ImportError:
logger.debug('bz2 module is not available')
try:
# Only for Python 3.3+
import lzma # noqa
SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
except ImportError:
logger.debug('lzma module is not available')
def current_umask():
# type: () -> int
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def split_leading_dir(path):
# type: (Union[str, Text]) -> List[Union[str, Text]]
path = path.lstrip('/').lstrip('\\')
if (
'/' in path and (
('\\' in path and path.find('/') < path.find('\\')) or
'\\' not in path
)
):
return path.split('/', 1)
elif '\\' in path:
return path.split('\\', 1)
else:
return [path, '']
def has_leading_dir(paths):
# type: (Iterable[Union[str, Text]]) -> bool
"""Returns true if all the paths have the same leading path name
(i.e., everything is in one subdirectory in an archive)"""
common_prefix = None
for path in paths:
prefix, rest = split_leading_dir(path)
if not prefix:
return False
elif common_prefix is None:
common_prefix = prefix
elif prefix != common_prefix:
return False
return True
def is_within_directory(directory, target):
# type: ((Union[str, Text]), (Union[str, Text])) -> bool
"""
Return true if the absolute path of target is within the directory
"""
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix([abs_directory, abs_target])
return prefix == abs_directory
def set_extracted_file_to_default_mode_plus_executable(path):
# type: (Union[str, Text]) -> None
"""
Make file present at path have execute for user/group/world
(chmod +x) is no-op on windows per python docs
"""
os.chmod(path, (0o777 & ~current_umask() | 0o111))
def zip_item_is_executable(info):
# type: (ZipInfo) -> bool
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
return bool(mode and stat.S_ISREG(mode) and mode & 0o111)
def unzip_file(filename, location, flatten=True):
# type: (str, str, bool) -> None
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if not is_within_directory(location, fn):
message = (
'The zip file ({}) has a file ({}) trying to install '
'outside target directory ({})'
)
raise InstallationError(message.format(filename, fn, location))
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
# Don't use read() to avoid allocating an arbitrarily large
# chunk of memory for the file's content
fp = zip.open(name)
try:
with open(fn, 'wb') as destfp:
shutil.copyfileobj(fp, destfp)
finally:
fp.close()
if zip_item_is_executable(info):
set_extracted_file_to_default_mode_plus_executable(fn)
finally:
zipfp.close()
def untar_file(filename, location):
# type: (str, str) -> None
"""
Untar the file (with path `filename`) to the destination `location`.
All files are written based on system defaults and umask (i.e. permissions
are not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'):
mode = 'r:gz'
elif filename.lower().endswith(BZ2_EXTENSIONS):
mode = 'r:bz2'
elif filename.lower().endswith(XZ_EXTENSIONS):
mode = 'r:xz'
elif filename.lower().endswith('.tar'):
mode = 'r'
else:
logger.warning(
'Cannot determine compression type for file %s', filename,
)
mode = 'r:*'
tar = tarfile.open(filename, mode)
try:
leading = has_leading_dir([
member.name for member in tar.getmembers()
])
for member in tar.getmembers():
fn = member.name
if leading:
# https://github.com/python/mypy/issues/1174
fn = split_leading_dir(fn)[1] # type: ignore
path = os.path.join(location, fn)
if not is_within_directory(location, path):
message = (
'The tar file ({}) has a file ({}) trying to install '
'outside target directory ({})'
)
raise InstallationError(
message.format(filename, path, location)
)
if member.isdir():
ensure_dir(path)
elif member.issym():
try:
# https://github.com/python/typeshed/issues/2673
tar._extract_member(member, path) # type: ignore
except Exception as exc:
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
logger.warning(
'In the tar file %s the member %s is invalid: %s',
filename, member.name, exc,
)
continue
else:
try:
fp = tar.extractfile(member)
except (KeyError, AttributeError) as exc:
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
logger.warning(
'In the tar file %s the member %s is invalid: %s',
filename, member.name, exc,
)
continue
ensure_dir(os.path.dirname(path))
assert fp is not None
with open(path, 'wb') as destfp:
shutil.copyfileobj(fp, destfp)
fp.close()
# Update the timestamp (useful for cython compiled files)
# https://github.com/python/typeshed/issues/2673
tar.utime(member, path) # type: ignore
# member have any execute permissions for user/group/world?
if member.mode & 0o111:
set_extracted_file_to_default_mode_plus_executable(path)
finally:
tar.close()
def unpack_file(
filename, # type: str
location, # type: str
content_type=None, # type: Optional[str]
):
# type: (...) -> None
filename = os.path.realpath(filename)
if (
content_type == 'application/zip' or
filename.lower().endswith(ZIP_EXTENSIONS) or
zipfile.is_zipfile(filename)
):
unzip_file(
filename,
location,
flatten=not filename.endswith('.whl')
)
elif (
content_type == 'application/x-gzip' or
tarfile.is_tarfile(filename) or
filename.lower().endswith(
TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS
)
):
untar_file(filename, location)
else:
# FIXME: handle?
# FIXME: magic signatures?
logger.critical(
'Cannot unpack file %s (downloaded from %s, content-type: %s); '
'cannot detect archive format',
filename, location, content_type,
)
raise InstallationError(
'Cannot determine archive format of {}'.format(location)
)

View file

@ -0,0 +1,55 @@
import os
import sys
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._vendor.six.moves.urllib import request as urllib_request
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Text, Union
def get_url_scheme(url):
# type: (Union[str, Text]) -> Optional[Text]
if ':' not in url:
return None
return url.split(':', 1)[0].lower()
def path_to_url(path):
# type: (Union[str, Text]) -> str
"""
Convert a path to a file: URL. The path will be made absolute and have
quoted path parts.
"""
path = os.path.normpath(os.path.abspath(path))
url = urllib_parse.urljoin('file:', urllib_request.pathname2url(path))
return url
def url_to_path(url):
# type: (str) -> str
"""
Convert a file: URL to a path.
"""
assert url.startswith('file:'), (
"You can only turn file: urls into filenames (not {url!r})"
.format(**locals()))
_, netloc, path, _, _ = urllib_parse.urlsplit(url)
if not netloc or netloc == 'localhost':
# According to RFC 8089, same as empty authority.
netloc = ''
elif sys.platform == 'win32':
# If we have a UNC path, prepend UNC share notation.
netloc = '\\\\' + netloc
else:
raise ValueError(
'non-local file URIs are not supported on this platform: {url!r}'
.format(**locals())
)
path = urllib_request.url2pathname(netloc + path)
return path

View file

@ -0,0 +1,119 @@
from __future__ import absolute_import
import io
import logging
import os
import re
import site
import sys
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List, Optional
logger = logging.getLogger(__name__)
_INCLUDE_SYSTEM_SITE_PACKAGES_REGEX = re.compile(
r"include-system-site-packages\s*=\s*(?P<value>true|false)"
)
def _running_under_venv():
# type: () -> bool
"""Checks if sys.base_prefix and sys.prefix match.
This handles PEP 405 compliant virtual environments.
"""
return sys.prefix != getattr(sys, "base_prefix", sys.prefix)
def _running_under_regular_virtualenv():
# type: () -> bool
"""Checks if sys.real_prefix is set.
This handles virtual environments created with pypa's virtualenv.
"""
# pypa/virtualenv case
return hasattr(sys, 'real_prefix')
def running_under_virtualenv():
# type: () -> bool
"""Return True if we're running inside a virtualenv, False otherwise.
"""
return _running_under_venv() or _running_under_regular_virtualenv()
def _get_pyvenv_cfg_lines():
# type: () -> Optional[List[str]]
"""Reads {sys.prefix}/pyvenv.cfg and returns its contents as list of lines
Returns None, if it could not read/access the file.
"""
pyvenv_cfg_file = os.path.join(sys.prefix, 'pyvenv.cfg')
try:
# Although PEP 405 does not specify, the built-in venv module always
# writes with UTF-8. (pypa/pip#8717)
with io.open(pyvenv_cfg_file, encoding='utf-8') as f:
return f.read().splitlines() # avoids trailing newlines
except IOError:
return None
def _no_global_under_venv():
# type: () -> bool
"""Check `{sys.prefix}/pyvenv.cfg` for system site-packages inclusion
PEP 405 specifies that when system site-packages are not supposed to be
visible from a virtual environment, `pyvenv.cfg` must contain the following
line:
include-system-site-packages = false
Additionally, log a warning if accessing the file fails.
"""
cfg_lines = _get_pyvenv_cfg_lines()
if cfg_lines is None:
# We're not in a "sane" venv, so assume there is no system
# site-packages access (since that's PEP 405's default state).
logger.warning(
"Could not access 'pyvenv.cfg' despite a virtual environment "
"being active. Assuming global site-packages is not accessible "
"in this environment."
)
return True
for line in cfg_lines:
match = _INCLUDE_SYSTEM_SITE_PACKAGES_REGEX.match(line)
if match is not None and match.group('value') == 'false':
return True
return False
def _no_global_under_regular_virtualenv():
# type: () -> bool
"""Check if "no-global-site-packages.txt" exists beside site.py
This mirrors logic in pypa/virtualenv for determining whether system
site-packages are visible in the virtual environment.
"""
site_mod_dir = os.path.dirname(os.path.abspath(site.__file__))
no_global_site_packages_file = os.path.join(
site_mod_dir, 'no-global-site-packages.txt',
)
return os.path.exists(no_global_site_packages_file)
def virtualenv_no_global():
# type: () -> bool
"""Returns a boolean, whether running in venv with no system site-packages.
"""
# PEP 405 compliance needs to be checked first since virtualenv >=20 would
# return True for both checks, but is only able to use the PEP 405 config.
if _running_under_venv():
return _no_global_under_venv()
if _running_under_regular_virtualenv():
return _no_global_under_regular_virtualenv()
return False

View file

@ -0,0 +1,225 @@
"""Support functions for working with wheel files.
"""
from __future__ import absolute_import
import logging
from email.parser import Parser
from zipfile import ZipFile
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.pkg_resources import DistInfoDistribution
from pip._vendor.six import PY2, ensure_str
from pip._internal.exceptions import UnsupportedWheel
from pip._internal.utils.pkg_resources import DictMetadata
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from email.message import Message
from typing import Dict, Tuple
from pip._vendor.pkg_resources import Distribution
if PY2:
from zipfile import BadZipfile as BadZipFile
else:
from zipfile import BadZipFile
VERSION_COMPATIBLE = (1, 0)
logger = logging.getLogger(__name__)
class WheelMetadata(DictMetadata):
"""Metadata provider that maps metadata decoding exceptions to our
internal exception type.
"""
def __init__(self, metadata, wheel_name):
# type: (Dict[str, bytes], str) -> None
super(WheelMetadata, self).__init__(metadata)
self._wheel_name = wheel_name
def get_metadata(self, name):
# type: (str) -> str
try:
return super(WheelMetadata, self).get_metadata(name)
except UnicodeDecodeError as e:
# Augment the default error with the origin of the file.
raise UnsupportedWheel(
"Error decoding metadata for {}: {}".format(
self._wheel_name, e
)
)
def pkg_resources_distribution_for_wheel(wheel_zip, name, location):
# type: (ZipFile, str, str) -> Distribution
"""Get a pkg_resources distribution given a wheel.
:raises UnsupportedWheel: on any errors
"""
info_dir, _ = parse_wheel(wheel_zip, name)
metadata_files = [
p for p in wheel_zip.namelist() if p.startswith("{}/".format(info_dir))
]
metadata_text = {} # type: Dict[str, bytes]
for path in metadata_files:
# If a flag is set, namelist entries may be unicode in Python 2.
# We coerce them to native str type to match the types used in the rest
# of the code. This cannot fail because unicode can always be encoded
# with UTF-8.
full_path = ensure_str(path)
_, metadata_name = full_path.split("/", 1)
try:
metadata_text[metadata_name] = read_wheel_metadata_file(
wheel_zip, full_path
)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
metadata = WheelMetadata(metadata_text, location)
return DistInfoDistribution(
location=location, metadata=metadata, project_name=name
)
def parse_wheel(wheel_zip, name):
# type: (ZipFile, str) -> Tuple[str, Message]
"""Extract information from the provided wheel, ensuring it meets basic
standards.
Returns the name of the .dist-info directory and the parsed WHEEL metadata.
"""
try:
info_dir = wheel_dist_info_dir(wheel_zip, name)
metadata = wheel_metadata(wheel_zip, info_dir)
version = wheel_version(metadata)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
check_compatibility(version, name)
return info_dir, metadata
def wheel_dist_info_dir(source, name):
# type: (ZipFile, str) -> str
"""Returns the name of the contained .dist-info directory.
Raises AssertionError or UnsupportedWheel if not found, >1 found, or
it doesn't match the provided name.
"""
# Zip file path separators must be /
subdirs = set(p.split("/", 1)[0] for p in source.namelist())
info_dirs = [s for s in subdirs if s.endswith('.dist-info')]
if not info_dirs:
raise UnsupportedWheel(".dist-info directory not found")
if len(info_dirs) > 1:
raise UnsupportedWheel(
"multiple .dist-info directories found: {}".format(
", ".join(info_dirs)
)
)
info_dir = info_dirs[0]
info_dir_name = canonicalize_name(info_dir)
canonical_name = canonicalize_name(name)
if not info_dir_name.startswith(canonical_name):
raise UnsupportedWheel(
".dist-info directory {!r} does not start with {!r}".format(
info_dir, canonical_name
)
)
# Zip file paths can be unicode or str depending on the zip entry flags,
# so normalize it.
return ensure_str(info_dir)
def read_wheel_metadata_file(source, path):
# type: (ZipFile, str) -> bytes
try:
return source.read(path)
# BadZipFile for general corruption, KeyError for missing entry,
# and RuntimeError for password-protected files
except (BadZipFile, KeyError, RuntimeError) as e:
raise UnsupportedWheel(
"could not read {!r} file: {!r}".format(path, e)
)
def wheel_metadata(source, dist_info_dir):
# type: (ZipFile, str) -> Message
"""Return the WHEEL metadata of an extracted wheel, if possible.
Otherwise, raise UnsupportedWheel.
"""
path = "{}/WHEEL".format(dist_info_dir)
# Zip file path separators must be /
wheel_contents = read_wheel_metadata_file(source, path)
try:
wheel_text = ensure_str(wheel_contents)
except UnicodeDecodeError as e:
raise UnsupportedWheel("error decoding {!r}: {!r}".format(path, e))
# FeedParser (used by Parser) does not raise any exceptions. The returned
# message may have .defects populated, but for backwards-compatibility we
# currently ignore them.
return Parser().parsestr(wheel_text)
def wheel_version(wheel_data):
# type: (Message) -> Tuple[int, ...]
"""Given WHEEL metadata, return the parsed Wheel-Version.
Otherwise, raise UnsupportedWheel.
"""
version_text = wheel_data["Wheel-Version"]
if version_text is None:
raise UnsupportedWheel("WHEEL is missing Wheel-Version")
version = version_text.strip()
try:
return tuple(map(int, version.split('.')))
except ValueError:
raise UnsupportedWheel("invalid Wheel-Version: {!r}".format(version))
def check_compatibility(version, name):
# type: (Tuple[int, ...], str) -> None
"""Raises errors or warns if called with an incompatible Wheel-Version.
pip should refuse to install a Wheel-Version that's a major series
ahead of what it's compatible with (e.g 2.0 > 1.1); and warn when
installing a version only minor version ahead (e.g 1.2 > 1.1).
version: a 2-tuple representing a Wheel-Version (Major, Minor)
name: name of wheel or package to raise exception about
:raises UnsupportedWheel: when an incompatible Wheel-Version is given
"""
if version[0] > VERSION_COMPATIBLE[0]:
raise UnsupportedWheel(
"{}'s Wheel-Version ({}) is not compatible with this version "
"of pip".format(name, '.'.join(map(str, version)))
)
elif version > VERSION_COMPATIBLE:
logger.warning(
'Installing from a newer Wheel-Version (%s)',
'.'.join(map(str, version)),
)