225 lines
6.8 KiB
Python
225 lines
6.8 KiB
Python
|
import errno
|
||
|
import fnmatch
|
||
|
import os
|
||
|
import os.path
|
||
|
import random
|
||
|
import shutil
|
||
|
import stat
|
||
|
import sys
|
||
|
from contextlib import contextmanager
|
||
|
from tempfile import NamedTemporaryFile
|
||
|
|
||
|
# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
|
||
|
# why we ignore the type on this import.
|
||
|
from pip._vendor.retrying import retry # type: ignore
|
||
|
from pip._vendor.six import PY2
|
||
|
|
||
|
from pip._internal.utils.compat import get_path_uid
|
||
|
from pip._internal.utils.misc import format_size
|
||
|
from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
|
||
|
|
||
|
if MYPY_CHECK_RUNNING:
|
||
|
from typing import Any, BinaryIO, Iterator, List, Union
|
||
|
|
||
|
class NamedTemporaryFileResult(BinaryIO):
|
||
|
@property
|
||
|
def file(self):
|
||
|
# type: () -> BinaryIO
|
||
|
pass
|
||
|
|
||
|
|
||
|
def check_path_owner(path):
|
||
|
# type: (str) -> bool
|
||
|
# If we don't have a way to check the effective uid of this process, then
|
||
|
# we'll just assume that we own the directory.
|
||
|
if sys.platform == "win32" or not hasattr(os, "geteuid"):
|
||
|
return True
|
||
|
|
||
|
assert os.path.isabs(path)
|
||
|
|
||
|
previous = None
|
||
|
while path != previous:
|
||
|
if os.path.lexists(path):
|
||
|
# Check if path is writable by current user.
|
||
|
if os.geteuid() == 0:
|
||
|
# Special handling for root user in order to handle properly
|
||
|
# cases where users use sudo without -H flag.
|
||
|
try:
|
||
|
path_uid = get_path_uid(path)
|
||
|
except OSError:
|
||
|
return False
|
||
|
return path_uid == 0
|
||
|
else:
|
||
|
return os.access(path, os.W_OK)
|
||
|
else:
|
||
|
previous, path = path, os.path.dirname(path)
|
||
|
return False # assume we don't own the path
|
||
|
|
||
|
|
||
|
def copy2_fixed(src, dest):
|
||
|
# type: (str, str) -> None
|
||
|
"""Wrap shutil.copy2() but map errors copying socket files to
|
||
|
SpecialFileError as expected.
|
||
|
|
||
|
See also https://bugs.python.org/issue37700.
|
||
|
"""
|
||
|
try:
|
||
|
shutil.copy2(src, dest)
|
||
|
except (OSError, IOError):
|
||
|
for f in [src, dest]:
|
||
|
try:
|
||
|
is_socket_file = is_socket(f)
|
||
|
except OSError:
|
||
|
# An error has already occurred. Another error here is not
|
||
|
# a problem and we can ignore it.
|
||
|
pass
|
||
|
else:
|
||
|
if is_socket_file:
|
||
|
raise shutil.SpecialFileError(
|
||
|
"`{f}` is a socket".format(**locals()))
|
||
|
|
||
|
raise
|
||
|
|
||
|
|
||
|
def is_socket(path):
|
||
|
# type: (str) -> bool
|
||
|
return stat.S_ISSOCK(os.lstat(path).st_mode)
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def adjacent_tmp_file(path, **kwargs):
|
||
|
# type: (str, **Any) -> Iterator[NamedTemporaryFileResult]
|
||
|
"""Return a file-like object pointing to a tmp file next to path.
|
||
|
|
||
|
The file is created securely and is ensured to be written to disk
|
||
|
after the context reaches its end.
|
||
|
|
||
|
kwargs will be passed to tempfile.NamedTemporaryFile to control
|
||
|
the way the temporary file will be opened.
|
||
|
"""
|
||
|
with NamedTemporaryFile(
|
||
|
delete=False,
|
||
|
dir=os.path.dirname(path),
|
||
|
prefix=os.path.basename(path),
|
||
|
suffix='.tmp',
|
||
|
**kwargs
|
||
|
) as f:
|
||
|
result = cast('NamedTemporaryFileResult', f)
|
||
|
try:
|
||
|
yield result
|
||
|
finally:
|
||
|
result.file.flush()
|
||
|
os.fsync(result.file.fileno())
|
||
|
|
||
|
|
||
|
_replace_retry = retry(stop_max_delay=1000, wait_fixed=250)
|
||
|
|
||
|
if PY2:
|
||
|
@_replace_retry
|
||
|
def replace(src, dest):
|
||
|
# type: (str, str) -> None
|
||
|
try:
|
||
|
os.rename(src, dest)
|
||
|
except OSError:
|
||
|
os.remove(dest)
|
||
|
os.rename(src, dest)
|
||
|
|
||
|
else:
|
||
|
replace = _replace_retry(os.replace)
|
||
|
|
||
|
|
||
|
# test_writable_dir and _test_writable_dir_win are copied from Flit,
|
||
|
# with the author's agreement to also place them under pip's license.
|
||
|
def test_writable_dir(path):
|
||
|
# type: (str) -> bool
|
||
|
"""Check if a directory is writable.
|
||
|
|
||
|
Uses os.access() on POSIX, tries creating files on Windows.
|
||
|
"""
|
||
|
# If the directory doesn't exist, find the closest parent that does.
|
||
|
while not os.path.isdir(path):
|
||
|
parent = os.path.dirname(path)
|
||
|
if parent == path:
|
||
|
break # Should never get here, but infinite loops are bad
|
||
|
path = parent
|
||
|
|
||
|
if os.name == 'posix':
|
||
|
return os.access(path, os.W_OK)
|
||
|
|
||
|
return _test_writable_dir_win(path)
|
||
|
|
||
|
|
||
|
def _test_writable_dir_win(path):
|
||
|
# type: (str) -> bool
|
||
|
# os.access doesn't work on Windows: http://bugs.python.org/issue2528
|
||
|
# and we can't use tempfile: http://bugs.python.org/issue22107
|
||
|
basename = 'accesstest_deleteme_fishfingers_custard_'
|
||
|
alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'
|
||
|
for _ in range(10):
|
||
|
name = basename + ''.join(random.choice(alphabet) for _ in range(6))
|
||
|
file = os.path.join(path, name)
|
||
|
try:
|
||
|
fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
|
||
|
# Python 2 doesn't support FileExistsError and PermissionError.
|
||
|
except OSError as e:
|
||
|
# exception FileExistsError
|
||
|
if e.errno == errno.EEXIST:
|
||
|
continue
|
||
|
# exception PermissionError
|
||
|
if e.errno == errno.EPERM or e.errno == errno.EACCES:
|
||
|
# This could be because there's a directory with the same name.
|
||
|
# But it's highly unlikely there's a directory called that,
|
||
|
# so we'll assume it's because the parent dir is not writable.
|
||
|
# This could as well be because the parent dir is not readable,
|
||
|
# due to non-privileged user access.
|
||
|
return False
|
||
|
raise
|
||
|
else:
|
||
|
os.close(fd)
|
||
|
os.unlink(file)
|
||
|
return True
|
||
|
|
||
|
# This should never be reached
|
||
|
raise EnvironmentError(
|
||
|
'Unexpected condition testing for writable directory'
|
||
|
)
|
||
|
|
||
|
|
||
|
def find_files(path, pattern):
|
||
|
# type: (str, str) -> List[str]
|
||
|
"""Returns a list of absolute paths of files beneath path, recursively,
|
||
|
with filenames which match the UNIX-style shell glob pattern."""
|
||
|
result = [] # type: List[str]
|
||
|
for root, _, files in os.walk(path):
|
||
|
matches = fnmatch.filter(files, pattern)
|
||
|
result.extend(os.path.join(root, f) for f in matches)
|
||
|
return result
|
||
|
|
||
|
|
||
|
def file_size(path):
|
||
|
# type: (str) -> Union[int, float]
|
||
|
# If it's a symlink, return 0.
|
||
|
if os.path.islink(path):
|
||
|
return 0
|
||
|
return os.path.getsize(path)
|
||
|
|
||
|
|
||
|
def format_file_size(path):
|
||
|
# type: (str) -> str
|
||
|
return format_size(file_size(path))
|
||
|
|
||
|
|
||
|
def directory_size(path):
|
||
|
# type: (str) -> Union[int, float]
|
||
|
size = 0.0
|
||
|
for root, _dirs, files in os.walk(path):
|
||
|
for filename in files:
|
||
|
file_path = os.path.join(root, filename)
|
||
|
size += file_size(file_path)
|
||
|
return size
|
||
|
|
||
|
|
||
|
def format_directory_size(path):
|
||
|
# type: (str) -> str
|
||
|
return format_size(directory_size(path))
|