diff --git a/pysnooper/variables.py b/pysnooper/variables.py index f3db008..8468c6b 100644 --- a/pysnooper/variables.py +++ b/pysnooper/variables.py @@ -1,6 +1,9 @@ import itertools import abc -from collections import Mapping, Sequence +try: + from collections.abc import Mapping, Sequence +except ImportError: + from collections import Mapping, Sequence from copy import deepcopy from . import utils diff --git a/tests/mini_toolbox/__init__.py b/tests/mini_toolbox/__init__.py new file mode 100644 index 0000000..97d6f57 --- /dev/null +++ b/tests/mini_toolbox/__init__.py @@ -0,0 +1,255 @@ +# Copyright 2019 Ram Rachum and collaborators. +# This program is distributed under the MIT license. + +import tempfile +import shutil +import io +import sys +from . import pathlib +from . import contextlib + + + +@contextlib.contextmanager +def BlankContextManager(): + yield + +@contextlib.contextmanager +def create_temp_folder(prefix=tempfile.template, suffix='', + parent_folder=None, chmod=None): + ''' + Context manager that creates a temporary folder and deletes it after usage. + + After the suite finishes, the temporary folder and all its files and + subfolders will be deleted. + + Example: + + with create_temp_folder() as temp_folder: + + # We have a temporary folder! + assert temp_folder.is_dir() + + # We can create files in it: + (temp_folder / 'my_file').open('w') + + # The suite is finished, now it's all cleaned: + assert not temp_folder.exists() + + Use the `prefix` and `suffix` string arguments to dictate a prefix and/or a + suffix to the temporary folder's name in the filesystem. + + If you'd like to set the permissions of the temporary folder, pass them to + the optional `chmod` argument, like this: + + create_temp_folder(chmod=0o550) + + ''' + temp_folder = pathlib.Path(tempfile.mkdtemp(prefix=prefix, suffix=suffix, + dir=parent_folder)) + try: + if chmod is not None: + temp_folder.chmod(chmod) + yield temp_folder + finally: + shutil.rmtree(str(temp_folder)) + + +class NotInDict: + '''Object signifying that the key was not found in the dict.''' + + +class TempValueSetter(object): + ''' + Context manager for temporarily setting a value to a variable. + + The value is set to the variable before the suite starts, and gets reset + back to the old value after the suite finishes. + ''' + + def __init__(self, variable, value, assert_no_fiddling=True): + ''' + Construct the `TempValueSetter`. + + `variable` may be either an `(object, attribute_string)`, a `(dict, + key)` pair, or a `(getter, setter)` pair. + + `value` is the temporary value to set to the variable. + ''' + + self.assert_no_fiddling = assert_no_fiddling + + + ####################################################################### + # We let the user input either an `(object, attribute_string)`, a + # `(dict, key)` pair, or a `(getter, setter)` pair. So now it's our job + # to inspect `variable` and figure out which one of these options the + # user chose, and then obtain from that a `(getter, setter)` pair that + # we could use. + + bad_input_exception = Exception( + '`variable` must be either an `(object, attribute_string)` pair, ' + 'a `(dict, key)` pair, or a `(getter, setter)` pair.' + ) + + try: + first, second = variable + except Exception: + raise bad_input_exception + if hasattr(first, '__getitem__') and hasattr(first, 'get') and \ + hasattr(first, '__setitem__') and hasattr(first, '__delitem__'): + # `first` is a dictoid; so we were probably handed a `(dict, key)` + # pair. + self.getter = lambda: first.get(second, NotInDict) + self.setter = lambda value: (first.__setitem__(second, value) if + value is not NotInDict else + first.__delitem__(second)) + ### Finished handling the `(dict, key)` case. ### + + elif callable(second): + # `second` is a callable; so we were probably handed a `(getter, + # setter)` pair. + if not callable(first): + raise bad_input_exception + self.getter, self.setter = first, second + ### Finished handling the `(getter, setter)` case. ### + else: + # All that's left is the `(object, attribute_string)` case. + if not isinstance(second, str): + raise bad_input_exception + + parent, attribute_name = first, second + self.getter = lambda: getattr(parent, attribute_name) + self.setter = lambda value: setattr(parent, attribute_name, value) + ### Finished handling the `(object, attribute_string)` case. ### + + # + # + ### Finished obtaining a `(getter, setter)` pair from `variable`. ##### + + + self.getter = self.getter + '''Getter for getting the current value of the variable.''' + + self.setter = self.setter + '''Setter for Setting the the variable's value.''' + + self.value = value + '''The value to temporarily set to the variable.''' + + self.active = False + + + def __enter__(self): + + self.active = True + + self.old_value = self.getter() + '''The old value of the variable, before entering the suite.''' + + self.setter(self.value) + + # In `__exit__` we'll want to check if anyone changed the value of the + # variable in the suite, which is unallowed. But we can't compare to + # `.value`, because sometimes when you set a value to a variable, some + # mechanism modifies that value for various reasons, resulting in a + # supposedly equivalent, but not identical, value. For example this + # happens when you set the current working directory on Mac OS. + # + # So here we record the value right after setting, and after any + # possible processing the system did to it: + self._value_right_after_setting = self.getter() + + return self + + + def __exit__(self, exc_type, exc_value, exc_traceback): + + if self.assert_no_fiddling: + # Asserting no-one inside the suite changed our variable: + assert self.getter() == self._value_right_after_setting + + self.setter(self.old_value) + + self.active = False + +class OutputCapturer(object): + ''' + Context manager for catching all system output generated during suite. + + Example: + + with OutputCapturer() as output_capturer: + print('woo!') + + assert output_capturer.output == 'woo!\n' + + The boolean arguments `stdout` and `stderr` determine, respectively, + whether the standard-output and the standard-error streams will be + captured. + ''' + def __init__(self, stdout=True, stderr=True): + self.string_io = io.StringIO() + + if stdout: + self._stdout_temp_setter = \ + TempValueSetter((sys, 'stdout'), self.string_io) + else: # not stdout + self._stdout_temp_setter = BlankContextManager() + + if stderr: + self._stderr_temp_setter = \ + TempValueSetter((sys, 'stderr'), self.string_io) + else: # not stderr + self._stderr_temp_setter = BlankContextManager() + + def __enter__(self): + '''Manage the `OutputCapturer`'s context.''' + self._stdout_temp_setter.__enter__() + self._stderr_temp_setter.__enter__() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + # Not doing exception swallowing anywhere here. + self._stderr_temp_setter.__exit__(exc_type, exc_value, exc_traceback) + self._stdout_temp_setter.__exit__(exc_type, exc_value, exc_traceback) + return self + + output = property(lambda self: self.string_io.getvalue(), + doc='''The string of output that was captured.''') + + +class TempSysPathAdder(object): + ''' + Context manager for temporarily adding paths to `sys.path`. + + Removes the path(s) after suite. + + Example: + + with TempSysPathAdder('path/to/fubar/package'): + import fubar + fubar.do_stuff() + + ''' + def __init__(self, addition): + self.addition = [str(addition)] + + + def __enter__(self): + self.entries_not_in_sys_path = [entry for entry in self.addition if + entry not in sys.path] + sys.path += self.entries_not_in_sys_path + return self + + + def __exit__(self, *args, **kwargs): + + for entry in self.entries_not_in_sys_path: + + # We don't allow anyone to remove it except for us: + assert entry in sys.path + + sys.path.remove(entry) + + diff --git a/tests/mini_toolbox/contextlib.py b/tests/mini_toolbox/contextlib.py new file mode 100644 index 0000000..f08df14 --- /dev/null +++ b/tests/mini_toolbox/contextlib.py @@ -0,0 +1,436 @@ +"""contextlib2 - backports and enhancements to the contextlib module""" + +import sys +import warnings +from collections import deque +from functools import wraps + +__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack", + "redirect_stdout", "redirect_stderr", "suppress"] + +# Backwards compatibility +__all__ += ["ContextStack"] + +class ContextDecorator(object): + "A base class or mixin that enables context managers to work as decorators." + + def refresh_cm(self): + """Returns the context manager used to actually wrap the call to the + decorated function. + + The default implementation just returns *self*. + + Overriding this method allows otherwise one-shot context managers + like _GeneratorContextManager to support use as decorators via + implicit recreation. + + DEPRECATED: refresh_cm was never added to the standard library's + ContextDecorator API + """ + warnings.warn("refresh_cm was never added to the standard library", + DeprecationWarning) + return self._recreate_cm() + + def _recreate_cm(self): + """Return a recreated instance of self. + + Allows an otherwise one-shot context manager like + _GeneratorContextManager to support use as + a decorator via implicit recreation. + + This is a private interface just for _GeneratorContextManager. + See issue #11647 for details. + """ + return self + + def __call__(self, func): + @wraps(func) + def inner(*args, **kwds): + with self._recreate_cm(): + return func(*args, **kwds) + return inner + + +class _GeneratorContextManager(ContextDecorator): + """Helper for @contextmanager decorator.""" + + def __init__(self, func, args, kwds): + self.gen = func(*args, **kwds) + self.func, self.args, self.kwds = func, args, kwds + # Issue 19330: ensure context manager instances have good docstrings + doc = getattr(func, "__doc__", None) + if doc is None: + doc = type(self).__doc__ + self.__doc__ = doc + # Unfortunately, this still doesn't provide good help output when + # inspecting the created context manager instances, since pydoc + # currently bypasses the instance docstring and shows the docstring + # for the class instead. + # See http://bugs.python.org/issue19404 for more details. + + def _recreate_cm(self): + # _GCM instances are one-shot context managers, so the + # CM must be recreated each time a decorated function is + # called + return self.__class__(self.func, self.args, self.kwds) + + def __enter__(self): + try: + return next(self.gen) + except StopIteration: + raise RuntimeError("generator didn't yield") + + def __exit__(self, type, value, traceback): + if type is None: + try: + next(self.gen) + except StopIteration: + return + else: + raise RuntimeError("generator didn't stop") + else: + if value is None: + # Need to force instantiation so we can reliably + # tell if we get the same exception back + value = type() + try: + self.gen.throw(type, value, traceback) + raise RuntimeError("generator didn't stop after throw()") + except StopIteration as exc: + # Suppress StopIteration *unless* it's the same exception that + # was passed to throw(). This prevents a StopIteration + # raised inside the "with" statement from being suppressed. + return exc is not value + except RuntimeError as exc: + # Don't re-raise the passed in exception + if exc is value: + return False + # Likewise, avoid suppressing if a StopIteration exception + # was passed to throw() and later wrapped into a RuntimeError + # (see PEP 479). + if _HAVE_EXCEPTION_CHAINING and exc.__cause__ is value: + return False + raise + except: + # only re-raise if it's *not* the exception that was + # passed to throw(), because __exit__() must not raise + # an exception unless __exit__() itself failed. But throw() + # has to raise the exception to signal propagation, so this + # fixes the impedance mismatch between the throw() protocol + # and the __exit__() protocol. + # + if sys.exc_info()[1] is not value: + raise + + +def contextmanager(func): + """@contextmanager decorator. + + Typical usage: + + @contextmanager + def some_generator(): + + try: + yield + finally: + + + This makes this: + + with some_generator() as : + + + equivalent to this: + + + try: + = + + finally: + + + """ + @wraps(func) + def helper(*args, **kwds): + return _GeneratorContextManager(func, args, kwds) + return helper + + +class closing(object): + """Context to automatically close something at the end of a block. + + Code like this: + + with closing(.open()) as f: + + + is equivalent to this: + + f = .open() + try: + + finally: + f.close() + + """ + def __init__(self, thing): + self.thing = thing + def __enter__(self): + return self.thing + def __exit__(self, *exc_info): + self.thing.close() + + +class _RedirectStream(object): + + _stream = None + + def __init__(self, new_target): + self._new_target = new_target + # We use a list of old targets to make this CM re-entrant + self._old_targets = [] + + def __enter__(self): + self._old_targets.append(getattr(sys, self._stream)) + setattr(sys, self._stream, self._new_target) + return self._new_target + + def __exit__(self, exctype, excinst, exctb): + setattr(sys, self._stream, self._old_targets.pop()) + + +class redirect_stdout(_RedirectStream): + """Context manager for temporarily redirecting stdout to another file. + + # How to send help() to stderr + with redirect_stdout(sys.stderr): + help(dir) + + # How to write help() to a file + with open('help.txt', 'w') as f: + with redirect_stdout(f): + help(pow) + """ + + _stream = "stdout" + + +class redirect_stderr(_RedirectStream): + """Context manager for temporarily redirecting stderr to another file.""" + + _stream = "stderr" + + +class suppress(object): + """Context manager to suppress specified exceptions + + After the exception is suppressed, execution proceeds with the next + statement following the with statement. + + with suppress(FileNotFoundError): + os.remove(somefile) + # Execution still resumes here if the file was already removed + """ + + def __init__(self, *exceptions): + self._exceptions = exceptions + + def __enter__(self): + pass + + def __exit__(self, exctype, excinst, exctb): + # Unlike isinstance and issubclass, CPython exception handling + # currently only looks at the concrete type hierarchy (ignoring + # the instance and subclass checking hooks). While Guido considers + # that a bug rather than a feature, it's a fairly hard one to fix + # due to various internal implementation details. suppress provides + # the simpler issubclass based semantics, rather than trying to + # exactly reproduce the limitations of the CPython interpreter. + # + # See http://bugs.python.org/issue12029 for more details + return exctype is not None and issubclass(exctype, self._exceptions) + + +# Context manipulation is Python 3 only +_HAVE_EXCEPTION_CHAINING = sys.version_info[0] >= 3 +if _HAVE_EXCEPTION_CHAINING: + def _make_context_fixer(frame_exc): + def _fix_exception_context(new_exc, old_exc): + # Context may not be correct, so find the end of the chain + while 1: + exc_context = new_exc.__context__ + if exc_context is old_exc: + # Context is already set correctly (see issue 20317) + return + if exc_context is None or exc_context is frame_exc: + break + new_exc = exc_context + # Change the end of the chain to point to the exception + # we expect it to reference + new_exc.__context__ = old_exc + return _fix_exception_context + + def _reraise_with_existing_context(exc_details): + try: + # bare "raise exc_details[1]" replaces our carefully + # set-up context + fixed_ctx = exc_details[1].__context__ + raise exc_details[1] + except BaseException: + exc_details[1].__context__ = fixed_ctx + raise +else: + # No exception context in Python 2 + def _make_context_fixer(frame_exc): + return lambda new_exc, old_exc: None + + # Use 3 argument raise in Python 2, + # but use exec to avoid SyntaxError in Python 3 + def _reraise_with_existing_context(exc_details): + exc_type, exc_value, exc_tb = exc_details + exec ("raise exc_type, exc_value, exc_tb") + +# Handle old-style classes if they exist +try: + from types import InstanceType +except ImportError: + # Python 3 doesn't have old-style classes + _get_type = type +else: + # Need to handle old-style context managers on Python 2 + def _get_type(obj): + obj_type = type(obj) + if obj_type is InstanceType: + return obj.__class__ # Old-style class + return obj_type # New-style class + +# Inspired by discussions on http://bugs.python.org/issue13585 +class ExitStack(object): + """Context manager for dynamic management of a stack of exit callbacks + + For example: + + with ExitStack() as stack: + files = [stack.enter_context(open(fname)) for fname in filenames] + # All opened files will automatically be closed at the end of + # the with statement, even if attempts to open files later + # in the list raise an exception + + """ + def __init__(self): + self._exit_callbacks = deque() + + def pop_all(self): + """Preserve the context stack by transferring it to a new instance""" + new_stack = type(self)() + new_stack._exit_callbacks = self._exit_callbacks + self._exit_callbacks = deque() + return new_stack + + def _push_cm_exit(self, cm, cm_exit): + """Helper to correctly register callbacks to __exit__ methods""" + def _exit_wrapper(*exc_details): + return cm_exit(cm, *exc_details) + _exit_wrapper.__self__ = cm + self.push(_exit_wrapper) + + def push(self, exit): + """Registers a callback with the standard __exit__ method signature + + Can suppress exceptions the same way __exit__ methods can. + + Also accepts any object with an __exit__ method (registering a call + to the method instead of the object itself) + """ + # We use an unbound method rather than a bound method to follow + # the standard lookup behaviour for special methods + _cb_type = _get_type(exit) + try: + exit_method = _cb_type.__exit__ + except AttributeError: + # Not a context manager, so assume its a callable + self._exit_callbacks.append(exit) + else: + self._push_cm_exit(exit, exit_method) + return exit # Allow use as a decorator + + def callback(self, callback, *args, **kwds): + """Registers an arbitrary callback and arguments. + + Cannot suppress exceptions. + """ + def _exit_wrapper(exc_type, exc, tb): + callback(*args, **kwds) + # We changed the signature, so using @wraps is not appropriate, but + # setting __wrapped__ may still help with introspection + _exit_wrapper.__wrapped__ = callback + self.push(_exit_wrapper) + return callback # Allow use as a decorator + + def enter_context(self, cm): + """Enters the supplied context manager + + If successful, also pushes its __exit__ method as a callback and + returns the result of the __enter__ method. + """ + # We look up the special methods on the type to match the with statement + _cm_type = _get_type(cm) + _exit = _cm_type.__exit__ + result = _cm_type.__enter__(cm) + self._push_cm_exit(cm, _exit) + return result + + def close(self): + """Immediately unwind the context stack""" + self.__exit__(None, None, None) + + def __enter__(self): + return self + + def __exit__(self, *exc_details): + received_exc = exc_details[0] is not None + + # We manipulate the exception state so it behaves as though + # we were actually nesting multiple with statements + frame_exc = sys.exc_info()[1] + _fix_exception_context = _make_context_fixer(frame_exc) + + # Callbacks are invoked in LIFO order to match the behaviour of + # nested context managers + suppressed_exc = False + pending_raise = False + while self._exit_callbacks: + cb = self._exit_callbacks.pop() + try: + if cb(*exc_details): + suppressed_exc = True + pending_raise = False + exc_details = (None, None, None) + except: + new_exc_details = sys.exc_info() + # simulate the stack of exceptions by setting the context + _fix_exception_context(new_exc_details[1], exc_details[1]) + pending_raise = True + exc_details = new_exc_details + if pending_raise: + _reraise_with_existing_context(exc_details) + return received_exc and suppressed_exc + +# Preserve backwards compatibility +class ContextStack(ExitStack): + """Backwards compatibility alias for ExitStack""" + + def __init__(self): + warnings.warn("ContextStack has been renamed to ExitStack", + DeprecationWarning) + super(ContextStack, self).__init__() + + def register_exit(self, callback): + return self.push(callback) + + def register(self, callback, *args, **kwds): + return self.callback(callback, *args, **kwds) + + def preserve(self): + return self.pop_all() diff --git a/tests/mini_toolbox/pathlib.py b/tests/mini_toolbox/pathlib.py new file mode 100644 index 0000000..69a6aa4 --- /dev/null +++ b/tests/mini_toolbox/pathlib.py @@ -0,0 +1,1673 @@ +# Copyright (c) 2014-2017 Matthias C. M. Troffaes +# Copyright (c) 2012-2014 Antoine Pitrou and contributors +# Distributed under the terms of the MIT License. + +import ctypes +import fnmatch +import functools +import io +import ntpath +import os +import posixpath +import re +import six +import sys +try: + from collections.abc import Sequence +except ImportError: + from collections import Sequence +from errno import EINVAL, ENOENT, ENOTDIR, EEXIST, EPERM, EACCES +from operator import attrgetter + +from stat import ( + S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO) +try: + from urllib import quote as urlquote_from_bytes +except ImportError: + from urllib.parse import quote_from_bytes as urlquote_from_bytes + + +try: + intern = intern +except NameError: + intern = sys.intern + +supports_symlinks = True +if os.name == 'nt': + import nt + if sys.getwindowsversion()[:2] >= (6, 0) and sys.version_info >= (3, 2): + from nt import _getfinalpathname + else: + supports_symlinks = False + _getfinalpathname = None +else: + nt = None + +try: + from os import scandir as os_scandir +except ImportError: + from scandir import scandir as os_scandir + +__all__ = [ + "PurePath", "PurePosixPath", "PureWindowsPath", + "Path", "PosixPath", "WindowsPath", + ] + +# +# Internals +# + + +def _py2_fsencode(parts): + # py2 => minimal unicode support + assert six.PY2 + return [part.encode('ascii') if isinstance(part, six.text_type) + else part for part in parts] + + +def _try_except_fileexistserror(try_func, except_func, else_func=None): + if sys.version_info >= (3, 3): + try: + try_func() + except FileExistsError as exc: + except_func(exc) + else: + if else_func is not None: + else_func() + else: + try: + try_func() + except EnvironmentError as exc: + if exc.errno != EEXIST: + raise + else: + except_func(exc) + else: + if else_func is not None: + else_func() + + +def _try_except_filenotfounderror(try_func, except_func): + if sys.version_info >= (3, 3): + try: + try_func() + except FileNotFoundError as exc: + except_func(exc) + else: + try: + try_func() + except EnvironmentError as exc: + if exc.errno != ENOENT: + raise + else: + except_func(exc) + + +def _try_except_permissionerror_iter(try_iter, except_iter): + if sys.version_info >= (3, 3): + try: + for x in try_iter(): + yield x + except PermissionError as exc: + for x in except_iter(exc): + yield x + else: + try: + for x in try_iter(): + yield x + except EnvironmentError as exc: + if exc.errno not in (EPERM, EACCES): + raise + else: + for x in except_iter(exc): + yield x + + +def _win32_get_unique_path_id(path): + # get file information, needed for samefile on older Python versions + # see http://timgolden.me.uk/python/win32_how_do_i/ + # see_if_two_files_are_the_same_file.html + from ctypes import POINTER, Structure, WinError + from ctypes.wintypes import DWORD, HANDLE, BOOL + + class FILETIME(Structure): + _fields_ = [("datetime_lo", DWORD), + ("datetime_hi", DWORD), + ] + + class BY_HANDLE_FILE_INFORMATION(Structure): + _fields_ = [("attributes", DWORD), + ("created_at", FILETIME), + ("accessed_at", FILETIME), + ("written_at", FILETIME), + ("volume", DWORD), + ("file_hi", DWORD), + ("file_lo", DWORD), + ("n_links", DWORD), + ("index_hi", DWORD), + ("index_lo", DWORD), + ] + + CreateFile = ctypes.windll.kernel32.CreateFileW + CreateFile.argtypes = [ctypes.c_wchar_p, DWORD, DWORD, ctypes.c_void_p, + DWORD, DWORD, HANDLE] + CreateFile.restype = HANDLE + GetFileInformationByHandle = ( + ctypes.windll.kernel32.GetFileInformationByHandle) + GetFileInformationByHandle.argtypes = [ + HANDLE, POINTER(BY_HANDLE_FILE_INFORMATION)] + GetFileInformationByHandle.restype = BOOL + CloseHandle = ctypes.windll.kernel32.CloseHandle + CloseHandle.argtypes = [HANDLE] + CloseHandle.restype = BOOL + GENERIC_READ = 0x80000000 + FILE_SHARE_READ = 0x00000001 + FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 + OPEN_EXISTING = 3 + if os.path.isdir(path): + flags = FILE_FLAG_BACKUP_SEMANTICS + else: + flags = 0 + hfile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, + None, OPEN_EXISTING, flags, None) + if hfile == 0xffffffff: + if sys.version_info >= (3, 3): + raise FileNotFoundError(path) + else: + exc = OSError("file not found: path") + exc.errno = ENOENT + raise exc + info = BY_HANDLE_FILE_INFORMATION() + success = GetFileInformationByHandle(hfile, info) + CloseHandle(hfile) + if success == 0: + raise WinError() + return info.volume, info.index_hi, info.index_lo + + +def _is_wildcard_pattern(pat): + # Whether this pattern needs actual matching using fnmatch, or can + # be looked up directly as a file. + return "*" in pat or "?" in pat or "[" in pat + + +class _Flavour(object): + + """A flavour implements a particular (platform-specific) set of path + semantics.""" + + def __init__(self): + self.join = self.sep.join + + def parse_parts(self, parts): + if six.PY2: + parts = _py2_fsencode(parts) + parsed = [] + sep = self.sep + altsep = self.altsep + drv = root = '' + it = reversed(parts) + for part in it: + if not part: + continue + if altsep: + part = part.replace(altsep, sep) + drv, root, rel = self.splitroot(part) + if sep in rel: + for x in reversed(rel.split(sep)): + if x and x != '.': + parsed.append(intern(x)) + else: + if rel and rel != '.': + parsed.append(intern(rel)) + if drv or root: + if not drv: + # If no drive is present, try to find one in the previous + # parts. This makes the result of parsing e.g. + # ("C:", "/", "a") reasonably intuitive. + for part in it: + if not part: + continue + if altsep: + part = part.replace(altsep, sep) + drv = self.splitroot(part)[0] + if drv: + break + break + if drv or root: + parsed.append(drv + root) + parsed.reverse() + return drv, root, parsed + + def join_parsed_parts(self, drv, root, parts, drv2, root2, parts2): + """ + Join the two paths represented by the respective + (drive, root, parts) tuples. Return a new (drive, root, parts) tuple. + """ + if root2: + if not drv2 and drv: + return drv, root2, [drv + root2] + parts2[1:] + elif drv2: + if drv2 == drv or self.casefold(drv2) == self.casefold(drv): + # Same drive => second path is relative to the first + return drv, root, parts + parts2[1:] + else: + # Second path is non-anchored (common case) + return drv, root, parts + parts2 + return drv2, root2, parts2 + + +class _WindowsFlavour(_Flavour): + # Reference for Windows paths can be found at + # http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx + + sep = '\\' + altsep = '/' + has_drv = True + pathmod = ntpath + + is_supported = (os.name == 'nt') + + drive_letters = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ') + ext_namespace_prefix = '\\\\?\\' + + reserved_names = ( + set(['CON', 'PRN', 'AUX', 'NUL']) | + set(['COM%d' % i for i in range(1, 10)]) | + set(['LPT%d' % i for i in range(1, 10)]) + ) + + # Interesting findings about extended paths: + # - '\\?\c:\a', '//?/c:\a' and '//?/c:/a' are all supported + # but '\\?\c:/a' is not + # - extended paths are always absolute; "relative" extended paths will + # fail. + + def splitroot(self, part, sep=sep): + first = part[0:1] + second = part[1:2] + if (second == sep and first == sep): + # XXX extended paths should also disable the collapsing of "." + # components (according to MSDN docs). + prefix, part = self._split_extended_path(part) + first = part[0:1] + second = part[1:2] + else: + prefix = '' + third = part[2:3] + if (second == sep and first == sep and third != sep): + # is a UNC path: + # vvvvvvvvvvvvvvvvvvvvv root + # \\machine\mountpoint\directory\etc\... + # directory ^^^^^^^^^^^^^^ + index = part.find(sep, 2) + if index != -1: + index2 = part.find(sep, index + 1) + # a UNC path can't have two slashes in a row + # (after the initial two) + if index2 != index + 1: + if index2 == -1: + index2 = len(part) + if prefix: + return prefix + part[1:index2], sep, part[index2 + 1:] + else: + return part[:index2], sep, part[index2 + 1:] + drv = root = '' + if second == ':' and first in self.drive_letters: + drv = part[:2] + part = part[2:] + first = third + if first == sep: + root = first + part = part.lstrip(sep) + return prefix + drv, root, part + + def casefold(self, s): + return s.lower() + + def casefold_parts(self, parts): + return [p.lower() for p in parts] + + def resolve(self, path, strict=False): + s = str(path) + if not s: + return os.getcwd() + previous_s = None + if _getfinalpathname is not None: + if strict: + return self._ext_to_normal(_getfinalpathname(s)) + else: + # End of the path after the first one not found + tail_parts = [] + while True: + try: + s = self._ext_to_normal(_getfinalpathname(s)) + except FileNotFoundError: + previous_s = s + s, tail = os.path.split(s) + tail_parts.append(tail) + if previous_s == s: + return path + else: + return os.path.join(s, *reversed(tail_parts)) + # Means fallback on absolute + return None + + def _split_extended_path(self, s, ext_prefix=ext_namespace_prefix): + prefix = '' + if s.startswith(ext_prefix): + prefix = s[:4] + s = s[4:] + if s.startswith('UNC\\'): + prefix += s[:3] + s = '\\' + s[3:] + return prefix, s + + def _ext_to_normal(self, s): + # Turn back an extended path into a normal DOS-like path + return self._split_extended_path(s)[1] + + def is_reserved(self, parts): + # NOTE: the rules for reserved names seem somewhat complicated + # (e.g. r"..\NUL" is reserved but not r"foo\NUL"). + # We err on the side of caution and return True for paths which are + # not considered reserved by Windows. + if not parts: + return False + if parts[0].startswith('\\\\'): + # UNC paths are never reserved + return False + return parts[-1].partition('.')[0].upper() in self.reserved_names + + def make_uri(self, path): + # Under Windows, file URIs use the UTF-8 encoding. + drive = path.drive + if len(drive) == 2 and drive[1] == ':': + # It's a path on a local drive => 'file:///c:/a/b' + rest = path.as_posix()[2:].lstrip('/') + return 'file:///%s/%s' % ( + drive, urlquote_from_bytes(rest.encode('utf-8'))) + else: + # It's a path on a network drive => 'file://host/share/a/b' + return 'file:' + urlquote_from_bytes( + path.as_posix().encode('utf-8')) + + def gethomedir(self, username): + if 'HOME' in os.environ: + userhome = os.environ['HOME'] + elif 'USERPROFILE' in os.environ: + userhome = os.environ['USERPROFILE'] + elif 'HOMEPATH' in os.environ: + try: + drv = os.environ['HOMEDRIVE'] + except KeyError: + drv = '' + userhome = drv + os.environ['HOMEPATH'] + else: + raise RuntimeError("Can't determine home directory") + + if username: + # Try to guess user home directory. By default all users + # directories are located in the same place and are named by + # corresponding usernames. If current user home directory points + # to nonstandard place, this guess is likely wrong. + if os.environ['USERNAME'] != username: + drv, root, parts = self.parse_parts((userhome,)) + if parts[-1] != os.environ['USERNAME']: + raise RuntimeError("Can't determine home directory " + "for %r" % username) + parts[-1] = username + if drv or root: + userhome = drv + root + self.join(parts[1:]) + else: + userhome = self.join(parts) + return userhome + + +class _PosixFlavour(_Flavour): + sep = '/' + altsep = '' + has_drv = False + pathmod = posixpath + + is_supported = (os.name != 'nt') + + def splitroot(self, part, sep=sep): + if part and part[0] == sep: + stripped_part = part.lstrip(sep) + # According to POSIX path resolution: + # http://pubs.opengroup.org/onlinepubs/009695399/basedefs/ + # xbd_chap04.html#tag_04_11 + # "A pathname that begins with two successive slashes may be + # interpreted in an implementation-defined manner, although more + # than two leading slashes shall be treated as a single slash". + if len(part) - len(stripped_part) == 2: + return '', sep * 2, stripped_part + else: + return '', sep, stripped_part + else: + return '', '', part + + def casefold(self, s): + return s + + def casefold_parts(self, parts): + return parts + + def resolve(self, path, strict=False): + sep = self.sep + accessor = path._accessor + seen = {} + + def _resolve(path, rest): + if rest.startswith(sep): + path = '' + + for name in rest.split(sep): + if not name or name == '.': + # current dir + continue + if name == '..': + # parent dir + path, _, _ = path.rpartition(sep) + continue + newpath = path + sep + name + if newpath in seen: + # Already seen this path + path = seen[newpath] + if path is not None: + # use cached value + continue + # The symlink is not resolved, so we must have a symlink + # loop. + raise RuntimeError("Symlink loop from %r" % newpath) + # Resolve the symbolic link + try: + target = accessor.readlink(newpath) + except OSError as e: + if e.errno != EINVAL and strict: + raise + # Not a symlink, or non-strict mode. We just leave the path + # untouched. + path = newpath + else: + seen[newpath] = None # not resolved symlink + path = _resolve(path, target) + seen[newpath] = path # resolved symlink + + return path + # NOTE: according to POSIX, getcwd() cannot contain path components + # which are symlinks. + base = '' if path.is_absolute() else os.getcwd() + return _resolve(base, str(path)) or sep + + def is_reserved(self, parts): + return False + + def make_uri(self, path): + # We represent the path using the local filesystem encoding, + # for portability to other applications. + bpath = bytes(path) + return 'file://' + urlquote_from_bytes(bpath) + + def gethomedir(self, username): + if not username: + try: + return os.environ['HOME'] + except KeyError: + import pwd + return pwd.getpwuid(os.getuid()).pw_dir + else: + import pwd + try: + return pwd.getpwnam(username).pw_dir + except KeyError: + raise RuntimeError("Can't determine home directory " + "for %r" % username) + + +_windows_flavour = _WindowsFlavour() +_posix_flavour = _PosixFlavour() + + +class _Accessor: + + """An accessor implements a particular (system-specific or not) way of + accessing paths on the filesystem.""" + + +class _NormalAccessor(_Accessor): + + def _wrap_strfunc(strfunc): + @functools.wraps(strfunc) + def wrapped(pathobj, *args): + return strfunc(str(pathobj), *args) + return staticmethod(wrapped) + + def _wrap_binary_strfunc(strfunc): + @functools.wraps(strfunc) + def wrapped(pathobjA, pathobjB, *args): + return strfunc(str(pathobjA), str(pathobjB), *args) + return staticmethod(wrapped) + + stat = _wrap_strfunc(os.stat) + + lstat = _wrap_strfunc(os.lstat) + + open = _wrap_strfunc(os.open) + + listdir = _wrap_strfunc(os.listdir) + + scandir = _wrap_strfunc(os_scandir) + + chmod = _wrap_strfunc(os.chmod) + + if hasattr(os, "lchmod"): + lchmod = _wrap_strfunc(os.lchmod) + else: + def lchmod(self, pathobj, mode): + raise NotImplementedError("lchmod() not available on this system") + + mkdir = _wrap_strfunc(os.mkdir) + + unlink = _wrap_strfunc(os.unlink) + + rmdir = _wrap_strfunc(os.rmdir) + + rename = _wrap_binary_strfunc(os.rename) + + if sys.version_info >= (3, 3): + replace = _wrap_binary_strfunc(os.replace) + + if nt: + if supports_symlinks: + symlink = _wrap_binary_strfunc(os.symlink) + else: + def symlink(a, b, target_is_directory): + raise NotImplementedError( + "symlink() not available on this system") + else: + # Under POSIX, os.symlink() takes two args + @staticmethod + def symlink(a, b, target_is_directory): + return os.symlink(str(a), str(b)) + + utime = _wrap_strfunc(os.utime) + + # Helper for resolve() + def readlink(self, path): + return os.readlink(path) + + +_normal_accessor = _NormalAccessor() + + +# +# Globbing helpers +# + +def _make_selector(pattern_parts): + pat = pattern_parts[0] + child_parts = pattern_parts[1:] + if pat == '**': + cls = _RecursiveWildcardSelector + elif '**' in pat: + raise ValueError( + "Invalid pattern: '**' can only be an entire path component") + elif _is_wildcard_pattern(pat): + cls = _WildcardSelector + else: + cls = _PreciseSelector + return cls(pat, child_parts) + + +if hasattr(functools, "lru_cache"): + _make_selector = functools.lru_cache()(_make_selector) + + +class _Selector: + + """A selector matches a specific glob pattern part against the children + of a given path.""" + + def __init__(self, child_parts): + self.child_parts = child_parts + if child_parts: + self.successor = _make_selector(child_parts) + self.dironly = True + else: + self.successor = _TerminatingSelector() + self.dironly = False + + def select_from(self, parent_path): + """Iterate over all child paths of `parent_path` matched by this + selector. This can contain parent_path itself.""" + path_cls = type(parent_path) + is_dir = path_cls.is_dir + exists = path_cls.exists + scandir = parent_path._accessor.scandir + if not is_dir(parent_path): + return iter([]) + return self._select_from(parent_path, is_dir, exists, scandir) + + +class _TerminatingSelector: + + def _select_from(self, parent_path, is_dir, exists, scandir): + yield parent_path + + +class _PreciseSelector(_Selector): + + def __init__(self, name, child_parts): + self.name = name + _Selector.__init__(self, child_parts) + + def _select_from(self, parent_path, is_dir, exists, scandir): + def try_iter(): + path = parent_path._make_child_relpath(self.name) + if (is_dir if self.dironly else exists)(path): + for p in self.successor._select_from( + path, is_dir, exists, scandir): + yield p + + def except_iter(exc): + return + yield + + for x in _try_except_permissionerror_iter(try_iter, except_iter): + yield x + + +class _WildcardSelector(_Selector): + + def __init__(self, pat, child_parts): + self.pat = re.compile(fnmatch.translate(pat)) + _Selector.__init__(self, child_parts) + + def _select_from(self, parent_path, is_dir, exists, scandir): + def try_iter(): + cf = parent_path._flavour.casefold + entries = list(scandir(parent_path)) + for entry in entries: + if not self.dironly or entry.is_dir(): + name = entry.name + casefolded = cf(name) + if self.pat.match(casefolded): + path = parent_path._make_child_relpath(name) + for p in self.successor._select_from( + path, is_dir, exists, scandir): + yield p + + def except_iter(exc): + return + yield + + for x in _try_except_permissionerror_iter(try_iter, except_iter): + yield x + + +class _RecursiveWildcardSelector(_Selector): + + def __init__(self, pat, child_parts): + _Selector.__init__(self, child_parts) + + def _iterate_directories(self, parent_path, is_dir, scandir): + yield parent_path + + def try_iter(): + entries = list(scandir(parent_path)) + for entry in entries: + if entry.is_dir() and not entry.is_symlink(): + path = parent_path._make_child_relpath(entry.name) + for p in self._iterate_directories(path, is_dir, scandir): + yield p + + def except_iter(exc): + return + yield + + for x in _try_except_permissionerror_iter(try_iter, except_iter): + yield x + + def _select_from(self, parent_path, is_dir, exists, scandir): + def try_iter(): + yielded = set() + try: + successor_select = self.successor._select_from + for starting_point in self._iterate_directories( + parent_path, is_dir, scandir): + for p in successor_select( + starting_point, is_dir, exists, scandir): + if p not in yielded: + yield p + yielded.add(p) + finally: + yielded.clear() + + def except_iter(exc): + return + yield + + for x in _try_except_permissionerror_iter(try_iter, except_iter): + yield x + + +# +# Public API +# + +class _PathParents(Sequence): + + """This object provides sequence-like access to the logical ancestors + of a path. Don't try to construct it yourself.""" + __slots__ = ('_pathcls', '_drv', '_root', '_parts') + + def __init__(self, path): + # We don't store the instance to avoid reference cycles + self._pathcls = type(path) + self._drv = path._drv + self._root = path._root + self._parts = path._parts + + def __len__(self): + if self._drv or self._root: + return len(self._parts) - 1 + else: + return len(self._parts) + + def __getitem__(self, idx): + if idx < 0 or idx >= len(self): + raise IndexError(idx) + return self._pathcls._from_parsed_parts(self._drv, self._root, + self._parts[:-idx - 1]) + + def __repr__(self): + return "<{0}.parents>".format(self._pathcls.__name__) + + +class PurePath(object): + + """PurePath represents a filesystem path and offers operations which + don't imply any actual filesystem I/O. Depending on your system, + instantiating a PurePath will return either a PurePosixPath or a + PureWindowsPath object. You can also instantiate either of these classes + directly, regardless of your system. + """ + __slots__ = ( + '_drv', '_root', '_parts', + '_str', '_hash', '_pparts', '_cached_cparts', + ) + + def __new__(cls, *args): + """Construct a PurePath from one or several strings and or existing + PurePath objects. The strings and path objects are combined so as + to yield a canonicalized path, which is incorporated into the + new PurePath object. + """ + if cls is PurePath: + cls = PureWindowsPath if os.name == 'nt' else PurePosixPath + return cls._from_parts(args) + + def __reduce__(self): + # Using the parts tuple helps share interned path parts + # when pickling related paths. + return (self.__class__, tuple(self._parts)) + + @classmethod + def _parse_args(cls, args): + # This is useful when you don't want to create an instance, just + # canonicalize some constructor arguments. + parts = [] + for a in args: + if isinstance(a, PurePath): + parts += a._parts + else: + if sys.version_info >= (3, 6): + a = os.fspath(a) + else: + # duck typing for older Python versions + if hasattr(a, "__fspath__"): + a = a.__fspath__() + if isinstance(a, str): + # Force-cast str subclasses to str (issue #21127) + parts.append(str(a)) + # also handle unicode for PY2 (six.text_type = unicode) + elif six.PY2 and isinstance(a, six.text_type): + # cast to str using filesystem encoding + parts.append(a.encode(sys.getfilesystemencoding())) + else: + raise TypeError( + "argument should be a str object or an os.PathLike " + "object returning str, not %r" + % type(a)) + return cls._flavour.parse_parts(parts) + + @classmethod + def _from_parts(cls, args, init=True): + # We need to call _parse_args on the instance, so as to get the + # right flavour. + self = object.__new__(cls) + drv, root, parts = self._parse_args(args) + self._drv = drv + self._root = root + self._parts = parts + if init: + self._init() + return self + + @classmethod + def _from_parsed_parts(cls, drv, root, parts, init=True): + self = object.__new__(cls) + self._drv = drv + self._root = root + self._parts = parts + if init: + self._init() + return self + + @classmethod + def _format_parsed_parts(cls, drv, root, parts): + if drv or root: + return drv + root + cls._flavour.join(parts[1:]) + else: + return cls._flavour.join(parts) + + def _init(self): + # Overridden in concrete Path + pass + + def _make_child(self, args): + drv, root, parts = self._parse_args(args) + drv, root, parts = self._flavour.join_parsed_parts( + self._drv, self._root, self._parts, drv, root, parts) + return self._from_parsed_parts(drv, root, parts) + + def __str__(self): + """Return the string representation of the path, suitable for + passing to system calls.""" + try: + return self._str + except AttributeError: + self._str = self._format_parsed_parts(self._drv, self._root, + self._parts) or '.' + return self._str + + def __fspath__(self): + return str(self) + + def as_posix(self): + """Return the string representation of the path with forward (/) + slashes.""" + f = self._flavour + return str(self).replace(f.sep, '/') + + def __bytes__(self): + """Return the bytes representation of the path. This is only + recommended to use under Unix.""" + if sys.version_info < (3, 2): + raise NotImplementedError("needs Python 3.2 or later") + return os.fsencode(str(self)) + + def __repr__(self): + return "{0}({1!r})".format(self.__class__.__name__, self.as_posix()) + + def as_uri(self): + """Return the path as a 'file' URI.""" + if not self.is_absolute(): + raise ValueError("relative path can't be expressed as a file URI") + return self._flavour.make_uri(self) + + @property + def _cparts(self): + # Cached casefolded parts, for hashing and comparison + try: + return self._cached_cparts + except AttributeError: + self._cached_cparts = self._flavour.casefold_parts(self._parts) + return self._cached_cparts + + def __eq__(self, other): + if not isinstance(other, PurePath): + return NotImplemented + return ( + self._cparts == other._cparts + and self._flavour is other._flavour) + + def __ne__(self, other): + return not self == other + + def __hash__(self): + try: + return self._hash + except AttributeError: + self._hash = hash(tuple(self._cparts)) + return self._hash + + def __lt__(self, other): + if (not isinstance(other, PurePath) + or self._flavour is not other._flavour): + return NotImplemented + return self._cparts < other._cparts + + def __le__(self, other): + if (not isinstance(other, PurePath) + or self._flavour is not other._flavour): + return NotImplemented + return self._cparts <= other._cparts + + def __gt__(self, other): + if (not isinstance(other, PurePath) + or self._flavour is not other._flavour): + return NotImplemented + return self._cparts > other._cparts + + def __ge__(self, other): + if (not isinstance(other, PurePath) + or self._flavour is not other._flavour): + return NotImplemented + return self._cparts >= other._cparts + + drive = property(attrgetter('_drv'), + doc="""The drive prefix (letter or UNC path), if any.""") + + root = property(attrgetter('_root'), + doc="""The root of the path, if any.""") + + @property + def anchor(self): + """The concatenation of the drive and root, or ''.""" + anchor = self._drv + self._root + return anchor + + @property + def name(self): + """The final path component, if any.""" + parts = self._parts + if len(parts) == (1 if (self._drv or self._root) else 0): + return '' + return parts[-1] + + @property + def suffix(self): + """The final component's last suffix, if any.""" + name = self.name + i = name.rfind('.') + if 0 < i < len(name) - 1: + return name[i:] + else: + return '' + + @property + def suffixes(self): + """A list of the final component's suffixes, if any.""" + name = self.name + if name.endswith('.'): + return [] + name = name.lstrip('.') + return ['.' + suffix for suffix in name.split('.')[1:]] + + @property + def stem(self): + """The final path component, minus its last suffix.""" + name = self.name + i = name.rfind('.') + if 0 < i < len(name) - 1: + return name[:i] + else: + return name + + def with_name(self, name): + """Return a new path with the file name changed.""" + if not self.name: + raise ValueError("%r has an empty name" % (self,)) + drv, root, parts = self._flavour.parse_parts((name,)) + if (not name or name[-1] in [self._flavour.sep, self._flavour.altsep] + or drv or root or len(parts) != 1): + raise ValueError("Invalid name %r" % (name)) + return self._from_parsed_parts(self._drv, self._root, + self._parts[:-1] + [name]) + + def with_suffix(self, suffix): + """Return a new path with the file suffix changed (or added, if + none). + """ + # XXX if suffix is None, should the current suffix be removed? + f = self._flavour + if f.sep in suffix or f.altsep and f.altsep in suffix: + raise ValueError("Invalid suffix %r" % (suffix)) + if suffix and not suffix.startswith('.') or suffix == '.': + raise ValueError("Invalid suffix %r" % (suffix)) + name = self.name + if not name: + raise ValueError("%r has an empty name" % (self,)) + old_suffix = self.suffix + if not old_suffix: + name = name + suffix + else: + name = name[:-len(old_suffix)] + suffix + return self._from_parsed_parts(self._drv, self._root, + self._parts[:-1] + [name]) + + def relative_to(self, *other): + """Return the relative path to another path identified by the passed + arguments. If the operation is not possible (because this is not + a subpath of the other path), raise ValueError. + """ + # For the purpose of this method, drive and root are considered + # separate parts, i.e.: + # Path('c:/').relative_to('c:') gives Path('/') + # Path('c:/').relative_to('/') raise ValueError + if not other: + raise TypeError("need at least one argument") + parts = self._parts + drv = self._drv + root = self._root + if root: + abs_parts = [drv, root] + parts[1:] + else: + abs_parts = parts + to_drv, to_root, to_parts = self._parse_args(other) + if to_root: + to_abs_parts = [to_drv, to_root] + to_parts[1:] + else: + to_abs_parts = to_parts + n = len(to_abs_parts) + cf = self._flavour.casefold_parts + if (root or drv) if n == 0 else cf(abs_parts[:n]) != cf(to_abs_parts): + formatted = self._format_parsed_parts(to_drv, to_root, to_parts) + raise ValueError("{0!r} does not start with {1!r}" + .format(str(self), str(formatted))) + return self._from_parsed_parts('', root if n == 1 else '', + abs_parts[n:]) + + @property + def parts(self): + """An object providing sequence-like access to the + components in the filesystem path.""" + # We cache the tuple to avoid building a new one each time .parts + # is accessed. XXX is this necessary? + try: + return self._pparts + except AttributeError: + self._pparts = tuple(self._parts) + return self._pparts + + def joinpath(self, *args): + """Combine this path with one or several arguments, and return a + new path representing either a subpath (if all arguments are relative + paths) or a totally different path (if one of the arguments is + anchored). + """ + return self._make_child(args) + + def __truediv__(self, key): + return self._make_child((key,)) + + def __rtruediv__(self, key): + return self._from_parts([key] + self._parts) + + if six.PY2: + __div__ = __truediv__ + __rdiv__ = __rtruediv__ + + @property + def parent(self): + """The logical parent of the path.""" + drv = self._drv + root = self._root + parts = self._parts + if len(parts) == 1 and (drv or root): + return self + return self._from_parsed_parts(drv, root, parts[:-1]) + + @property + def parents(self): + """A sequence of this path's logical parents.""" + return _PathParents(self) + + def is_absolute(self): + """True if the path is absolute (has both a root and, if applicable, + a drive).""" + if not self._root: + return False + return not self._flavour.has_drv or bool(self._drv) + + def is_reserved(self): + """Return True if the path contains one of the special names reserved + by the system, if any.""" + return self._flavour.is_reserved(self._parts) + + def match(self, path_pattern): + """ + Return True if this path matches the given pattern. + """ + cf = self._flavour.casefold + path_pattern = cf(path_pattern) + drv, root, pat_parts = self._flavour.parse_parts((path_pattern,)) + if not pat_parts: + raise ValueError("empty pattern") + if drv and drv != cf(self._drv): + return False + if root and root != cf(self._root): + return False + parts = self._cparts + if drv or root: + if len(pat_parts) != len(parts): + return False + pat_parts = pat_parts[1:] + elif len(pat_parts) > len(parts): + return False + for part, pat in zip(reversed(parts), reversed(pat_parts)): + if not fnmatch.fnmatchcase(part, pat): + return False + return True + + +# Can't subclass os.PathLike from PurePath and keep the constructor +# optimizations in PurePath._parse_args(). +if sys.version_info >= (3, 6): + os.PathLike.register(PurePath) + + +class PurePosixPath(PurePath): + _flavour = _posix_flavour + __slots__ = () + + +class PureWindowsPath(PurePath): + _flavour = _windows_flavour + __slots__ = () + + +# Filesystem-accessing classes + + +class Path(PurePath): + __slots__ = ( + '_accessor', + '_closed', + ) + + def __new__(cls, *args, **kwargs): + if cls is Path: + cls = WindowsPath if os.name == 'nt' else PosixPath + self = cls._from_parts(args, init=False) + if not self._flavour.is_supported: + raise NotImplementedError("cannot instantiate %r on your system" + % (cls.__name__,)) + self._init() + return self + + def _init(self, + # Private non-constructor arguments + template=None, + ): + self._closed = False + if template is not None: + self._accessor = template._accessor + else: + self._accessor = _normal_accessor + + def _make_child_relpath(self, part): + # This is an optimization used for dir walking. `part` must be + # a single part relative to this path. + parts = self._parts + [part] + return self._from_parsed_parts(self._drv, self._root, parts) + + def __enter__(self): + if self._closed: + self._raise_closed() + return self + + def __exit__(self, t, v, tb): + self._closed = True + + def _raise_closed(self): + raise ValueError("I/O operation on closed path") + + def _opener(self, name, flags, mode=0o666): + # A stub for the opener argument to built-in open() + return self._accessor.open(self, flags, mode) + + def _raw_open(self, flags, mode=0o777): + """ + Open the file pointed by this path and return a file descriptor, + as os.open() does. + """ + if self._closed: + self._raise_closed() + return self._accessor.open(self, flags, mode) + + # Public API + + @classmethod + def cwd(cls): + """Return a new path pointing to the current working directory + (as returned by os.getcwd()). + """ + return cls(os.getcwd()) + + @classmethod + def home(cls): + """Return a new path pointing to the user's home directory (as + returned by os.path.expanduser('~')). + """ + return cls(cls()._flavour.gethomedir(None)) + + def samefile(self, other_path): + """Return whether other_path is the same or not as this file + (as returned by os.path.samefile()). + """ + if hasattr(os.path, "samestat"): + st = self.stat() + try: + other_st = other_path.stat() + except AttributeError: + other_st = os.stat(other_path) + return os.path.samestat(st, other_st) + else: + filename1 = six.text_type(self) + filename2 = six.text_type(other_path) + st1 = _win32_get_unique_path_id(filename1) + st2 = _win32_get_unique_path_id(filename2) + return st1 == st2 + + def iterdir(self): + """Iterate over the files in this directory. Does not yield any + result for the special paths '.' and '..'. + """ + if self._closed: + self._raise_closed() + for name in self._accessor.listdir(self): + if name in ('.', '..'): + # Yielding a path object for these makes little sense + continue + yield self._make_child_relpath(name) + if self._closed: + self._raise_closed() + + def glob(self, pattern): + """Iterate over this subtree and yield all existing files (of any + kind, including directories) matching the given pattern. + """ + if not pattern: + raise ValueError("Unacceptable pattern: {0!r}".format(pattern)) + pattern = self._flavour.casefold(pattern) + drv, root, pattern_parts = self._flavour.parse_parts((pattern,)) + if drv or root: + raise NotImplementedError("Non-relative patterns are unsupported") + selector = _make_selector(tuple(pattern_parts)) + for p in selector.select_from(self): + yield p + + def rglob(self, pattern): + """Recursively yield all existing files (of any kind, including + directories) matching the given pattern, anywhere in this subtree. + """ + pattern = self._flavour.casefold(pattern) + drv, root, pattern_parts = self._flavour.parse_parts((pattern,)) + if drv or root: + raise NotImplementedError("Non-relative patterns are unsupported") + selector = _make_selector(("**",) + tuple(pattern_parts)) + for p in selector.select_from(self): + yield p + + def absolute(self): + """Return an absolute version of this path. This function works + even if the path doesn't point to anything. + + No normalization is done, i.e. all '.' and '..' will be kept along. + Use resolve() to get the canonical path to a file. + """ + # XXX untested yet! + if self._closed: + self._raise_closed() + if self.is_absolute(): + return self + # FIXME this must defer to the specific flavour (and, under Windows, + # use nt._getfullpathname()) + obj = self._from_parts([os.getcwd()] + self._parts, init=False) + obj._init(template=self) + return obj + + def resolve(self, strict=False): + """ + Make the path absolute, resolving all symlinks on the way and also + normalizing it (for example turning slashes into backslashes under + Windows). + """ + if self._closed: + self._raise_closed() + s = self._flavour.resolve(self, strict=strict) + if s is None: + # No symlink resolution => for consistency, raise an error if + # the path doesn't exist or is forbidden + self.stat() + s = str(self.absolute()) + # Now we have no symlinks in the path, it's safe to normalize it. + normed = self._flavour.pathmod.normpath(s) + obj = self._from_parts((normed,), init=False) + obj._init(template=self) + return obj + + def stat(self): + """ + Return the result of the stat() system call on this path, like + os.stat() does. + """ + return self._accessor.stat(self) + + def owner(self): + """ + Return the login name of the file owner. + """ + import pwd + return pwd.getpwuid(self.stat().st_uid).pw_name + + def group(self): + """ + Return the group name of the file gid. + """ + import grp + return grp.getgrgid(self.stat().st_gid).gr_name + + def open(self, mode='r', buffering=-1, encoding=None, + errors=None, newline=None): + """ + Open the file pointed by this path and return a file object, as + the built-in open() function does. + """ + if self._closed: + self._raise_closed() + if sys.version_info >= (3, 3): + return io.open( + str(self), mode, buffering, encoding, errors, newline, + opener=self._opener) + else: + return io.open(str(self), mode, buffering, + encoding, errors, newline) + + def read_bytes(self): + """ + Open the file in bytes mode, read it, and close the file. + """ + with self.open(mode='rb') as f: + return f.read() + + def read_text(self, encoding=None, errors=None): + """ + Open the file in text mode, read it, and close the file. + """ + with self.open(mode='r', encoding=encoding, errors=errors) as f: + return f.read() + + def write_bytes(self, data): + """ + Open the file in bytes mode, write to it, and close the file. + """ + if not isinstance(data, six.binary_type): + raise TypeError( + 'data must be %s, not %s' % + (six.binary_type.__name__, data.__class__.__name__)) + with self.open(mode='wb') as f: + return f.write(data) + + def write_text(self, data, encoding=None, errors=None): + """ + Open the file in text mode, write to it, and close the file. + """ + if not isinstance(data, six.text_type): + raise TypeError( + 'data must be %s, not %s' % + (six.text_type.__name__, data.__class__.__name__)) + with self.open(mode='w', encoding=encoding, errors=errors) as f: + return f.write(data) + + def touch(self, mode=0o666, exist_ok=True): + """ + Create this file with the given access mode, if it doesn't exist. + """ + if self._closed: + self._raise_closed() + if exist_ok: + # First try to bump modification time + # Implementation note: GNU touch uses the UTIME_NOW option of + # the utimensat() / futimens() functions. + try: + self._accessor.utime(self, None) + except OSError: + # Avoid exception chaining + pass + else: + return + flags = os.O_CREAT | os.O_WRONLY + if not exist_ok: + flags |= os.O_EXCL + fd = self._raw_open(flags, mode) + os.close(fd) + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a new directory at this given path. + """ + if self._closed: + self._raise_closed() + + def _try_func(): + self._accessor.mkdir(self, mode) + + def _exc_func(exc): + if not parents or self.parent == self: + raise exc + self.parent.mkdir(parents=True, exist_ok=True) + self.mkdir(mode, parents=False, exist_ok=exist_ok) + + try: + _try_except_filenotfounderror(_try_func, _exc_func) + except OSError: + if not exist_ok or not self.is_dir(): + raise + + def chmod(self, mode): + """ + Change the permissions of the path, like os.chmod(). + """ + if self._closed: + self._raise_closed() + self._accessor.chmod(self, mode) + + def lchmod(self, mode): + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + if self._closed: + self._raise_closed() + self._accessor.lchmod(self, mode) + + def unlink(self): + """ + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + if self._closed: + self._raise_closed() + self._accessor.unlink(self) + + def rmdir(self): + """ + Remove this directory. The directory must be empty. + """ + if self._closed: + self._raise_closed() + self._accessor.rmdir(self) + + def lstat(self): + """ + Like stat(), except if the path points to a symlink, the symlink's + status information is returned, rather than its target's. + """ + if self._closed: + self._raise_closed() + return self._accessor.lstat(self) + + def rename(self, target): + """ + Rename this path to the given path. + """ + if self._closed: + self._raise_closed() + self._accessor.rename(self, target) + + def replace(self, target): + """ + Rename this path to the given path, clobbering the existing + destination if it exists. + """ + if sys.version_info < (3, 3): + raise NotImplementedError("replace() is only available " + "with Python 3.3 and later") + if self._closed: + self._raise_closed() + self._accessor.replace(self, target) + + def symlink_to(self, target, target_is_directory=False): + """ + Make this path a symlink pointing to the given path. + Note the order of arguments (self, target) is the reverse of + os.symlink's. + """ + if self._closed: + self._raise_closed() + self._accessor.symlink(target, self, target_is_directory) + + # Convenience functions for querying the stat results + + def exists(self): + """ + Whether this path exists. + """ + try: + self.stat() + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + return False + return True + + def is_dir(self): + """ + Whether this path is a directory. + """ + try: + return S_ISDIR(self.stat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_file(self): + """ + Whether this path is a regular file (also True for symlinks pointing + to regular files). + """ + try: + return S_ISREG(self.stat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_symlink(self): + """ + Whether this path is a symbolic link. + """ + try: + return S_ISLNK(self.lstat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist + return False + + def is_block_device(self): + """ + Whether this path is a block device. + """ + try: + return S_ISBLK(self.stat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_char_device(self): + """ + Whether this path is a character device. + """ + try: + return S_ISCHR(self.stat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_fifo(self): + """ + Whether this path is a FIFO. + """ + try: + return S_ISFIFO(self.stat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_socket(self): + """ + Whether this path is a socket. + """ + try: + return S_ISSOCK(self.stat().st_mode) + except OSError as e: + if e.errno not in (ENOENT, ENOTDIR): + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def expanduser(self): + """ Return a new path with expanded ~ and ~user constructs + (as returned by os.path.expanduser) + """ + if (not (self._drv or self._root) + and self._parts and self._parts[0][:1] == '~'): + homedir = self._flavour.gethomedir(self._parts[0][1:]) + return self._from_parts([homedir] + self._parts[1:]) + + return self + + +class PosixPath(Path, PurePosixPath): + __slots__ = () + + +class WindowsPath(Path, PureWindowsPath): + __slots__ = () + + def owner(self): + raise NotImplementedError("Path.owner() is unsupported on this system") + + def group(self): + raise NotImplementedError("Path.group() is unsupported on this system") diff --git a/tests/test_chinese.py b/tests/test_chinese.py index 269c8df..e92b15a 100644 --- a/tests/test_chinese.py +++ b/tests/test_chinese.py @@ -9,7 +9,6 @@ import types import sys from pysnooper.utils import truncate -from python_toolbox import sys_tools, temp_file_tools import pytest import pysnooper @@ -18,11 +17,12 @@ from pysnooper.variables import needs_parentheses from .utils import (assert_output, assert_sample_output, VariableEntry, CallEntry, LineEntry, ReturnEntry, OpcodeEntry, ReturnValueEntry, ExceptionEntry) +from . import mini_toolbox def test_chinese(): - with temp_file_tools.create_temp_folder(prefix='pysnooper') as folder: + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder: path = folder / 'foo.log' @pysnooper.snoop(path) def foo(): diff --git a/tests/test_pysnooper.py b/tests/test_pysnooper.py index f7043f7..7c8ede8 100644 --- a/tests/test_pysnooper.py +++ b/tests/test_pysnooper.py @@ -8,7 +8,6 @@ import types import sys from pysnooper.utils import truncate -from python_toolbox import sys_tools, temp_file_tools import pytest import pysnooper @@ -16,6 +15,7 @@ from pysnooper.variables import needs_parentheses from .utils import (assert_output, assert_sample_output, VariableEntry, CallEntry, LineEntry, ReturnEntry, OpcodeEntry, ReturnValueEntry, ExceptionEntry) +from . import mini_toolbox def test_string_io(): @@ -54,7 +54,7 @@ def test_thread_info(): y = 8 return y + x - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function('baba') assert result == 15 @@ -83,7 +83,7 @@ def test_multi_thread_info(): y = 8 return y + x - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: my_function('baba') t1 = threading.Thread(target=my_function, name="test123",args=['bubu']) @@ -206,7 +206,7 @@ def test_watch(): for i in range(2): foo.square() - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function() assert result is None @@ -252,7 +252,7 @@ def test_watch_explode(): lst = [7, 8, 9] lst.append(10) - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function() assert result is None @@ -306,7 +306,7 @@ def test_variables_classes(): _s = WithSlots() _lst = list(range(1000)) - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function() assert result is None @@ -351,7 +351,7 @@ def test_single_watch_no_comma(): for i in range(2): foo.square() - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function() assert result is None @@ -382,7 +382,7 @@ def test_long_variable(): foo = list(range(1000)) return foo - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function() assert result == list(range(1000)) @@ -410,7 +410,7 @@ def test_repr_exception(): def my_function(): bad = Bad() - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = my_function() assert result is None @@ -500,7 +500,7 @@ def test_method_and_prefix(): baz = Baz() - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = baz.square() assert result is baz @@ -525,7 +525,7 @@ def test_method_and_prefix(): def test_file_output(): - with temp_file_tools.create_temp_folder(prefix='pysnooper') as folder: + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder: path = folder / 'foo.log' @pysnooper.snoop(path) @@ -615,8 +615,8 @@ def test_lambda(): def test_unavailable_source(): - with temp_file_tools.create_temp_folder(prefix='pysnooper') as folder, \ - sys_tools.TempSysPathAdder(str(folder)): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): module_name = 'iaerojajsijf' python_file_path = folder / ('%s.py' % (module_name,)) content = textwrap.dedent(u''' @@ -629,7 +629,7 @@ def test_unavailable_source(): python_file.write(content) module = __import__(module_name) python_file_path.unlink() - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = getattr(module, 'f')(7) assert result == 7 @@ -647,7 +647,7 @@ def test_unavailable_source(): def test_no_overwrite_by_default(): - with temp_file_tools.create_temp_folder(prefix='pysnooper') as folder: + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder: path = folder / 'foo.log' with path.open('w') as output_file: output_file.write(u'lala') @@ -679,7 +679,7 @@ def test_no_overwrite_by_default(): def test_overwrite(): - with temp_file_tools.create_temp_folder(prefix='pysnooper') as folder: + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder: path = folder / 'foo.log' with path.open('w') as output_file: output_file.write(u'lala') @@ -721,7 +721,7 @@ def test_overwrite(): def test_error_in_overwrite_argument(): - with temp_file_tools.create_temp_folder(prefix='pysnooper') as folder: + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder: with pytest.raises(Exception, match='can only be used when writing'): @pysnooper.snoop(overwrite=True) def my_function(foo): @@ -783,7 +783,7 @@ def test_with_block(): def qux(): return 9 # not traced, mustn't show up - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: result = foo(2) assert result == 2 diff --git a/tests/utils.py b/tests/utils.py index 05185f0..349ab00 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ try: except ImportError: from itertools import izip_longest as zip_longest -from python_toolbox import caching, sys_tools +from . import mini_toolbox import pysnooper.pycompat @@ -190,7 +190,7 @@ class _BaseEventEntry(_BaseEntry): self.thread_info_regex = (None if thread_info_regex is None else re.compile(thread_info_regex)) - @caching.CachedProperty + @property def event_name(self): return re.match('^[A-Z][a-z_]*', type(self).__name__).group(0).lower() @@ -270,7 +270,7 @@ def assert_output(output, expected_entries, prefix=None): def assert_sample_output(module): - with sys_tools.OutputCapturer(stdout=False, + with mini_toolbox.OutputCapturer(stdout=False, stderr=True) as output_capturer: module.main() diff --git a/tox.ini b/tox.ini index 94e91ca..0c4b4f3 100644 --- a/tox.ini +++ b/tox.ini @@ -15,11 +15,7 @@ envlist = description = Unit tests deps = pytest - python_toolbox commands = pytest -setenv = - # until python_toolbox is fixed - PYTHONWARNINGS = ignore::DeprecationWarning [testenv:bandit] description = PyCQA security linter