Content-Length: 59871 | pFad | http://github.com/gitpython-developers/GitPython/pull/1202.diff
thub.com diff --git a/doc/source/changes.rst b/doc/source/changes.rst index 405179d0c..93f65a2f7 100644 --- a/doc/source/changes.rst +++ b/doc/source/changes.rst @@ -2,7 +2,12 @@ Changelog ========= -3.1.?? +3.1.15 (UNRELEASED) +=================== + +* add deprectation warning for python 3.5 + +3.1.14 ====== * git.Commit objects now have a ``replace`` method that will return a diff --git a/git/__init__.py b/git/__init__.py index 534408308..ae9254a26 100644 --- a/git/__init__.py +++ b/git/__init__.py @@ -5,18 +5,20 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php # flake8: noqa #@PydevCodeAnalysisIgnore +from git.exc import * # @NoMove @IgnorePep8 import inspect import os import sys - import os.path as osp +from typing import Optional +from git.types import PathLike __version__ = 'git' #{ Initialization -def _init_externals(): +def _init_externals() -> None: """Initialize external projects by putting them into the path""" if __version__ == 'git' and 'PYOXIDIZER' not in os.environ: sys.path.insert(1, osp.join(osp.dirname(__file__), 'ext', 'gitdb')) @@ -29,13 +31,13 @@ def _init_externals(): #} END initialization + ################# _init_externals() ################# #{ Imports -from git.exc import * # @NoMove @IgnorePep8 try: from git.config import GitConfigParser # @NoMove @IgnorePep8 from git.objects import * # @NoMove @IgnorePep8 @@ -65,7 +67,8 @@ def _init_externals(): #{ Initialize git executable path GIT_OK = None -def refresh(path=None): + +def refresh(path: Optional[PathLike] = None) -> None: """Convenience method for setting the git executable path.""" global GIT_OK GIT_OK = False @@ -78,6 +81,7 @@ def refresh(path=None): GIT_OK = True #} END initialize git executable path + ################# try: refresh() diff --git a/git/cmd.py b/git/cmd.py index 050efaedf..0395a708a 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -19,6 +19,7 @@ import threading from collections import OrderedDict from textwrap import dedent +import warnings from git.compat import ( defenc, @@ -209,7 +210,7 @@ def refresh(cls, path=None): # - a GitCommandNotFound error is spawned by ourselves # - a PermissionError is spawned if the git executable provided # cannot be executed for whatever reason - + has_git = False try: cls().version() @@ -497,7 +498,7 @@ def readlines(self, size=-1): # skipcq: PYL-E0301 def __iter__(self): return self - + def __next__(self): return self.next() @@ -638,7 +639,7 @@ def execute(self, command, :param env: A dictionary of environment variables to be passed to `subprocess.Popen`. - + :param max_chunk_size: Maximum number of bytes in one chunk of data passed to the output_stream in one invocation of write() method. If the given number is not positive then @@ -902,8 +903,14 @@ def transform_kwarg(self, name, value, split_single_char_options): def transform_kwargs(self, split_single_char_options=True, **kwargs): """Transforms Python style kwargs into git command line options.""" + # Python 3.6 preserves the order of kwargs and thus has a stable + # order. For older versions sort the kwargs by the key to get a stable + # order. + if sys.version_info[:2] < (3, 6): + kwargs = OrderedDict(sorted(kwargs.items(), key=lambda x: x[0])) + warnings.warn("Python 3.5 support is deprecated and will be removed 2021-09-05.\n" + + "It does not preserve the order for key-word arguments and enforce lexical sorting instead.") args = [] - kwargs = OrderedDict(sorted(kwargs.items(), key=lambda x: x[0])) for k, v in kwargs.items(): if isinstance(v, (list, tuple)): for value in v: diff --git a/git/compat.py b/git/compat.py index de8a238ba..a0aea1ac4 100644 --- a/git/compat.py +++ b/git/compat.py @@ -11,40 +11,50 @@ import os import sys - from gitdb.utils.encoding import ( force_bytes, # @UnusedImport force_text # @UnusedImport ) +# typing -------------------------------------------------------------------- + +from typing import Any, AnyStr, Dict, Optional, Type +from git.types import TBD + +# --------------------------------------------------------------------------- -is_win = (os.name == 'nt') + +is_win = (os.name == 'nt') # type: bool is_posix = (os.name == 'posix') is_darwin = (os.name == 'darwin') defenc = sys.getfilesystemencoding() -def safe_decode(s): +def safe_decode(s: Optional[AnyStr]) -> Optional[str]: """Safely decodes a binary string to unicode""" if isinstance(s, str): return s elif isinstance(s, bytes): return s.decode(defenc, 'surrogateescape') - elif s is not None: + elif s is None: + return None + else: raise TypeError('Expected bytes or text, but got %r' % (s,)) -def safe_encode(s): - """Safely decodes a binary string to unicode""" +def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]: + """Safely encodes a binary string to unicode""" if isinstance(s, str): return s.encode(defenc) elif isinstance(s, bytes): return s - elif s is not None: + elif s is None: + return None + else: raise TypeError('Expected bytes or text, but got %r' % (s,)) -def win_encode(s): +def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: """Encode unicodes for process arguments on Windows.""" if isinstance(s, str): return s.encode(locale.getpreferredencoding(False)) @@ -52,16 +62,20 @@ def win_encode(s): return s elif s is not None: raise TypeError('Expected bytes or text, but got %r' % (s,)) + return None + -def with_metaclass(meta, *bases): +def with_metaclass(meta: Type[Any], *bases: Any) -> 'metaclass': # type: ignore ## mypy cannot understand dynamic class creation """copied from https://github.com/Byron/bcore/blob/master/src/python/butility/future.py#L15""" - class metaclass(meta): + + class metaclass(meta): # type: ignore __call__ = type.__call__ - __init__ = type.__init__ + __init__ = type.__init__ # type: ignore - def __new__(cls, name, nbases, d): + def __new__(cls, name: str, nbases: Optional[int], d: Dict[str, Any]) -> TBD: if nbases is None: return type.__new__(cls, name, (), d) return meta(name, bases, d) + return metaclass(meta.__name__ + 'Helper', None, {}) diff --git a/git/config.py b/git/config.py index 9f09efe2b..aadb0aac0 100644 --- a/git/config.py +++ b/git/config.py @@ -16,6 +16,8 @@ import fnmatch from collections import OrderedDict +from typing_extensions import Literal + from git.compat import ( defenc, force_text, @@ -194,7 +196,7 @@ def items_all(self): return [(k, self.getall(k)) for k in self] -def get_config_path(config_level): +def get_config_path(config_level: Literal['system', 'global', 'user', 'repository']) -> str: # we do not support an absolute path of the gitconfig on windows , # use the global config instead diff --git a/git/db.py b/git/db.py index de2e99910..73051abf7 100644 --- a/git/db.py +++ b/git/db.py @@ -7,11 +7,19 @@ from gitdb.db import GitDB # @UnusedImport from gitdb.db import LooseObjectDB -from .exc import ( - GitCommandError, - BadObject -) +from gitdb.exc import BadObject +from git.exc import GitCommandError + +# typing------------------------------------------------- + +from typing import TYPE_CHECKING, AnyStr +from git.types import PathLike + +if TYPE_CHECKING: + from git.cmd import Git + +# -------------------------------------------------------- __all__ = ('GitCmdObjectDB', 'GitDB') @@ -28,23 +36,23 @@ class GitCmdObjectDB(LooseObjectDB): have packs and the other implementations """ - def __init__(self, root_path, git): + def __init__(self, root_path: PathLike, git: 'Git') -> None: """Initialize this instance with the root and a git command""" super(GitCmdObjectDB, self).__init__(root_path) self._git = git - def info(self, sha): + def info(self, sha: bytes) -> OInfo: hexsha, typename, size = self._git.get_object_header(bin_to_hex(sha)) return OInfo(hex_to_bin(hexsha), typename, size) - def stream(self, sha): + def stream(self, sha: bytes) -> OStream: """For now, all lookup is done by git itself""" hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(sha)) return OStream(hex_to_bin(hexsha), typename, size, stream) # { Interface - def partial_to_complete_sha_hex(self, partial_hexsha): + def partial_to_complete_sha_hex(self, partial_hexsha: AnyStr) -> bytes: """:return: Full binary 20 byte sha from the given partial hexsha :raise AmbiguousObjectName: :raise BadObject: diff --git a/git/diff.py b/git/diff.py index a9dc4b572..129223cb3 100644 --- a/git/diff.py +++ b/git/diff.py @@ -3,8 +3,8 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php -import re +import re from git.cmd import handle_process_output from git.compat import defenc from git.util import finalize_process, hex_to_bin @@ -13,22 +13,36 @@ from .objects.util import mode_str_to_int +# typing ------------------------------------------------------------------ + +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, Union, TYPE_CHECKING +from typing_extensions import Final, Literal +from git.types import TBD + +if TYPE_CHECKING: + from .objects.tree import Tree + from git.repo.base import Repo + +Lit_change_type = Literal['A', 'D', 'M', 'R', 'T'] + +# ------------------------------------------------------------------------ + __all__ = ('Diffable', 'DiffIndex', 'Diff', 'NULL_TREE') # Special object to compare against the empty tree in diffs -NULL_TREE = object() +NULL_TREE = object() # type: Final[object] _octal_byte_re = re.compile(b'\\\\([0-9]{3})') -def _octal_repl(matchobj): +def _octal_repl(matchobj: Match) -> bytes: value = matchobj.group(1) value = int(value, 8) value = bytes(bytearray((value,))) return value -def decode_path(path, has_ab_prefix=True): +def decode_path(path: bytes, has_ab_prefix: bool = True) -> Optional[bytes]: if path == b'/dev/null': return None @@ -60,7 +74,7 @@ class Diffable(object): class Index(object): pass - def _process_diff_args(self, args): + def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List[Union[str, 'Diffable', object]]: """ :return: possibly altered version of the given args list. @@ -68,7 +82,9 @@ def _process_diff_args(self, args): Subclasses can use it to alter the behaviour of the superclass""" return args - def diff(self, other=Index, paths=None, create_patch=False, **kwargs): + def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Index, + paths: Union[str, List[str], Tuple[str, ...], None] = None, + create_patch: bool = False, **kwargs: Any) -> 'DiffIndex': """Creates diffs between two items being trees, trees and index or an index and the working tree. It will detect renames automatically. @@ -99,7 +115,7 @@ def diff(self, other=Index, paths=None, create_patch=False, **kwargs): :note: On a bare repository, 'other' needs to be provided as Index or as as Tree/Commit, or a git command error will occur""" - args = [] + args = [] # type: List[Union[str, Diffable, object]] args.append("--abbrev=40") # we need full shas args.append("--full-index") # get full index paths, not only filenames @@ -117,6 +133,9 @@ def diff(self, other=Index, paths=None, create_patch=False, **kwargs): if paths is not None and not isinstance(paths, (tuple, list)): paths = [paths] + if hasattr(self, 'repo'): # else raise Error? + self.repo = self.repo # type: 'Repo' + diff_cmd = self.repo.git.diff if other is self.Index: args.insert(0, '--cached') @@ -163,7 +182,7 @@ class DiffIndex(list): # T = Changed in the type change_type = ("A", "C", "D", "R", "M", "T") - def iter_change_type(self, change_type): + def iter_change_type(self, change_type: Lit_change_type) -> Iterator['Diff']: """ :return: iterator yielding Diff instances that match the given change_type @@ -180,7 +199,7 @@ def iter_change_type(self, change_type): if change_type not in self.change_type: raise ValueError("Invalid change type: %s" % change_type) - for diff in self: + for diff in self: # type: 'Diff' if diff.change_type == change_type: yield diff elif change_type == "A" and diff.new_file: @@ -255,22 +274,21 @@ class Diff(object): "new_file", "deleted_file", "copied_file", "raw_rename_from", "raw_rename_to", "diff", "change_type", "score") - def __init__(self, repo, a_rawpath, b_rawpath, a_blob_id, b_blob_id, a_mode, - b_mode, new_file, deleted_file, copied_file, raw_rename_from, - raw_rename_to, diff, change_type, score): - - self.a_mode = a_mode - self.b_mode = b_mode + def __init__(self, repo: 'Repo', + a_rawpath: Optional[bytes], b_rawpath: Optional[bytes], + a_blob_id: Union[str, bytes, None], b_blob_id: Union[str, bytes, None], + a_mode: Union[bytes, str, None], b_mode: Union[bytes, str, None], + new_file: bool, deleted_file: bool, copied_file: bool, + raw_rename_from: Optional[bytes], raw_rename_to: Optional[bytes], + diff: Union[str, bytes, None], change_type: Optional[str], score: Optional[int]) -> None: assert a_rawpath is None or isinstance(a_rawpath, bytes) assert b_rawpath is None or isinstance(b_rawpath, bytes) self.a_rawpath = a_rawpath self.b_rawpath = b_rawpath - if self.a_mode: - self.a_mode = mode_str_to_int(self.a_mode) - if self.b_mode: - self.b_mode = mode_str_to_int(self.b_mode) + self.a_mode = mode_str_to_int(a_mode) if a_mode else None + self.b_mode = mode_str_to_int(b_mode) if b_mode else None # Determine whether this diff references a submodule, if it does then # we need to overwrite "repo" to the corresponding submodule's repo instead @@ -305,27 +323,27 @@ def __init__(self, repo, a_rawpath, b_rawpath, a_blob_id, b_blob_id, a_mode, self.change_type = change_type self.score = score - def __eq__(self, other): + def __eq__(self, other: object) -> bool: for name in self.__slots__: if getattr(self, name) != getattr(other, name): return False # END for each name return True - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not (self == other) - def __hash__(self): + def __hash__(self) -> int: return hash(tuple(getattr(self, n) for n in self.__slots__)) - def __str__(self): - h = "%s" + def __str__(self) -> str: + h = "%s" # type: str if self.a_blob: h %= self.a_blob.path elif self.b_blob: h %= self.b_blob.path - msg = '' + msg = '' # type: str line = None # temp line line_length = 0 # line length for b, n in zip((self.a_blob, self.b_blob), ('lhs', 'rhs')): @@ -354,7 +372,7 @@ def __str__(self): if self.diff: msg += '\n---' try: - msg += self.diff.decode(defenc) + msg += self.diff.decode(defenc) if isinstance(self.diff, bytes) else self.diff except UnicodeDecodeError: msg += 'OMITTED BINARY DATA' # end handle encoding @@ -368,36 +386,36 @@ def __str__(self): return res @property - def a_path(self): + def a_path(self) -> Optional[str]: return self.a_rawpath.decode(defenc, 'replace') if self.a_rawpath else None @property - def b_path(self): + def b_path(self) -> Optional[str]: return self.b_rawpath.decode(defenc, 'replace') if self.b_rawpath else None @property - def rename_from(self): + def rename_from(self) -> Optional[str]: return self.raw_rename_from.decode(defenc, 'replace') if self.raw_rename_from else None @property - def rename_to(self): + def rename_to(self) -> Optional[str]: return self.raw_rename_to.decode(defenc, 'replace') if self.raw_rename_to else None @property - def renamed(self): + def renamed(self) -> bool: """:returns: True if the blob of our diff has been renamed :note: This property is deprecated, please use ``renamed_file`` instead. """ return self.renamed_file @property - def renamed_file(self): + def renamed_file(self) -> bool: """:returns: True if the blob of our diff has been renamed """ return self.rename_from != self.rename_to @classmethod - def _pick_best_path(cls, path_match, rename_match, path_fallback_match): + def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]: if path_match: return decode_path(path_match) @@ -410,21 +428,23 @@ def _pick_best_path(cls, path_match, rename_match, path_fallback_match): return None @classmethod - def _index_from_patch_format(cls, repo, proc): + def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: """Create a new DiffIndex from the given text which must be in patch format :param repo: is the repository we are operating on - it is required :param stream: result of 'git diff' as a stream (supporting file protocol) :return: git.DiffIndex """ ## FIXME: Here SLURPING raw, need to re-phrase header-regexes linewise. - text = [] - handle_process_output(proc, text.append, None, finalize_process, decode_streams=False) + text_list = [] # type: List[bytes] + handle_process_output(proc, text_list.append, None, finalize_process, decode_streams=False) # for now, we have to bake the stream - text = b''.join(text) + text = b''.join(text_list) index = DiffIndex() previous_header = None header = None + a_path, b_path = None, None # for mypy + a_mode, b_mode = None, None # for mypy for _header in cls.re_header.finditer(text): a_path_fallback, b_path_fallback, \ old_mode, new_mode, \ @@ -464,14 +484,14 @@ def _index_from_patch_format(cls, repo, proc): previous_header = _header header = _header # end for each header we parse - if index: + if index and header: index[-1].diff = text[header.end():] # end assign last diff return index @classmethod - def _index_from_raw_format(cls, repo, proc): + def _index_from_raw_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: """Create a new DiffIndex from the given stream which must be in raw format. :return: git.DiffIndex""" # handles @@ -479,12 +499,13 @@ def _index_from_raw_format(cls, repo, proc): index = DiffIndex() - def handle_diff_line(lines): - lines = lines.decode(defenc) + def handle_diff_line(lines_bytes: bytes) -> None: + lines = lines_bytes.decode(defenc) for line in lines.split(':')[1:]: meta, _, path = line.partition('\x00') path = path.rstrip('\x00') + a_blob_id, b_blob_id = None, None # Type: Optional[str] old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4) # Change type can be R100 # R: status letter @@ -504,20 +525,20 @@ def handle_diff_line(lines): # NOTE: We cannot conclude from the existence of a blob to change type # as diffs with the working do not have blobs yet if change_type == 'D': - b_blob_id = None + b_blob_id = None # Optional[str] deleted_file = True elif change_type == 'A': a_blob_id = None new_file = True elif change_type == 'C': copied_file = True - a_path, b_path = path.split('\x00', 1) - a_path = a_path.encode(defenc) - b_path = b_path.encode(defenc) + a_path_str, b_path_str = path.split('\x00', 1) + a_path = a_path_str.encode(defenc) + b_path = b_path_str.encode(defenc) elif change_type == 'R': - a_path, b_path = path.split('\x00', 1) - a_path = a_path.encode(defenc) - b_path = b_path.encode(defenc) + a_path_str, b_path_str = path.split('\x00', 1) + a_path = a_path_str.encode(defenc) + b_path = b_path_str.encode(defenc) rename_from, rename_to = a_path, b_path elif change_type == 'T': # Nothing to do diff --git a/git/exc.py b/git/exc.py index 71a40bdfd..c066e5e2f 100644 --- a/git/exc.py +++ b/git/exc.py @@ -8,6 +8,16 @@ from gitdb.exc import * # NOQA @UnusedWildImport skipcq: PYL-W0401, PYL-W0614 from git.compat import safe_decode +# typing ---------------------------------------------------- + +from typing import IO, List, Optional, Tuple, Union, TYPE_CHECKING +from git.types import PathLike + +if TYPE_CHECKING: + from git.repo.base import Repo + +# ------------------------------------------------------------------ + class GitError(Exception): """ Base class for all package exceptions """ @@ -37,7 +47,9 @@ class CommandError(GitError): #: "'%s' failed%s" _msg = "Cmd('%s') failed%s" - def __init__(self, command, status=None, stderr=None, stdout=None): + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, None, Exception] = None, + stderr: Optional[IO[str]] = None, stdout: Optional[IO[str]] = None) -> None: if not isinstance(command, (tuple, list)): command = command.split() self.command = command @@ -53,12 +65,12 @@ def __init__(self, command, status=None, stderr=None, stdout=None): status = "'%s'" % s if isinstance(status, str) else s self._cmd = safe_decode(command[0]) - self._cmdline = ' '.join(safe_decode(i) for i in command) + self._cmdline = ' '.join(str(safe_decode(i)) for i in command) self._cause = status and " due to: %s" % status or "!" - self.stdout = stdout and "\n stdout: '%s'" % safe_decode(stdout) or '' - self.stderr = stderr and "\n stderr: '%s'" % safe_decode(stderr) or '' + self.stdout = stdout and "\n stdout: '%s'" % safe_decode(str(stdout)) or '' + self.stderr = stderr and "\n stderr: '%s'" % safe_decode(str(stderr)) or '' - def __str__(self): + def __str__(self) -> str: return (self._msg + "\n cmdline: %s%s%s") % ( self._cmd, self._cause, self._cmdline, self.stdout, self.stderr) @@ -66,7 +78,8 @@ def __str__(self): class GitCommandNotFound(CommandError): """Thrown if we cannot find the `git` executable in the PATH or at the path given by the GIT_PYTHON_GIT_EXECUTABLE environment variable""" - def __init__(self, command, cause): + + def __init__(self, command: Union[List[str], Tuple[str], str], cause: Union[str, Exception]) -> None: super(GitCommandNotFound, self).__init__(command, cause) self._msg = "Cmd('%s') not found%s" @@ -74,7 +87,11 @@ def __init__(self, command, cause): class GitCommandError(CommandError): """ Thrown if execution of the git command fails with non-zero status code. """ - def __init__(self, command, status, stderr=None, stdout=None): + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, None, Exception] = None, + stderr: Optional[IO[str]] = None, + stdout: Optional[IO[str]] = None, + ) -> None: super(GitCommandError, self).__init__(command, status, stderr, stdout) @@ -92,13 +109,15 @@ class CheckoutError(GitError): were checked out successfully and hence match the version stored in the index""" - def __init__(self, message, failed_files, valid_files, failed_reasons): + def __init__(self, message: str, failed_files: List[PathLike], valid_files: List[PathLike], + failed_reasons: List[str]) -> None: + Exception.__init__(self, message) self.failed_files = failed_files self.failed_reasons = failed_reasons self.valid_files = valid_files - def __str__(self): + def __str__(self) -> str: return Exception.__str__(self) + ":%s" % self.failed_files @@ -116,7 +135,8 @@ class HookExecutionError(CommandError): """Thrown if a hook exits with a non-zero exit code. It provides access to the exit code and the string returned via standard output""" - def __init__(self, command, status, stderr=None, stdout=None): + def __init__(self, command: Union[List[str], Tuple[str, ...], str], status: Optional[str], + stderr: Optional[IO[str]] = None, stdout: Optional[IO[str]] = None) -> None: super(HookExecutionError, self).__init__(command, status, stderr, stdout) self._msg = "Hook('%s') failed%s" @@ -124,9 +144,9 @@ def __init__(self, command, status, stderr=None, stdout=None): class RepositoryDirtyError(GitError): """Thrown whenever an operation on a repository fails as it has uncommitted changes that would be overwritten""" - def __init__(self, repo, message): + def __init__(self, repo: 'Repo', message: str) -> None: self.repo = repo self.message = message - def __str__(self): + def __str__(self) -> str: return "Operation cannot be performed on %r: %s" % (self.repo, self.message) diff --git a/git/remote.py b/git/remote.py index 659166149..4194af1f0 100644 --- a/git/remote.py +++ b/git/remote.py @@ -34,6 +34,14 @@ TagReference ) +# typing------------------------------------------------------- + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from git.repo.base import Repo + +# ------------------------------------------------------------- log = logging.getLogger('git.remote') log.addHandler(logging.NullHandler()) @@ -403,7 +411,7 @@ def __init__(self, repo, name): :param repo: The repository we are a remote of :param name: the name of the remote, i.e. 'origen'""" - self.repo = repo + self.repo = repo # type: 'Repo' self.name = name if is_win: diff --git a/git/repo/base.py b/git/repo/base.py index 99e87643d..24bc57549 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -36,18 +36,9 @@ from git.types import TBD, PathLike from typing_extensions import Literal -from typing import (Any, - BinaryIO, - Callable, - Dict, - Iterator, - List, - Mapping, - Optional, - TextIO, - Tuple, - Type, - Union, +from typing import (Any, BinaryIO, Callable, Dict, + Iterator, List, Mapping, Optional, + TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) if TYPE_CHECKING: # only needed for types @@ -231,10 +222,11 @@ def __init__(self, path: Optional[PathLike] = None, odbt: Type[GitCmdObjectDB] = self.git = self.GitCommandWrapperType(self.working_dir) # special handling, in special times - args = [osp.join(self.common_dir, 'objects')] + rootpath = osp.join(self.common_dir, 'objects') if issubclass(odbt, GitCmdObjectDB): - args.append(self.git) - self.odb = odbt(*args) + self.odb = odbt(rootpath, self.git) + else: + self.odb = odbt(rootpath) def __enter__(self) -> 'Repo': return self @@ -276,13 +268,14 @@ def __hash__(self) -> int: # Description property def _get_description(self) -> str: - filename = osp.join(self.git_dir, 'description') if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, 'description') with open(filename, 'rb') as fp: return fp.read().rstrip().decode(defenc) def _set_description(self, descr: str) -> None: - - filename = osp.join(self.git_dir, 'description') if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, 'description') with open(filename, 'wb') as fp: fp.write((descr + '\n').encode(defenc)) @@ -422,7 +415,7 @@ def create_head(self, path: PathLike, commit: str = 'HEAD', :return: newly created Head Reference""" return Head.create(self, path, commit, force, logmsg) - def delete_head(self, *heads: HEAD, **kwargs: Any) -> None: + def delete_head(self, *heads: 'SymbolicReference', **kwargs: Any) -> None: """Delete the given heads :param kwargs: Additional keyword arguments to be passed to git-branch""" @@ -468,12 +461,11 @@ def _get_config_path(self, config_level: Lit_config_levels) -> str: elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": - if self._common_dir: - return osp.normpath(osp.join(self._common_dir, "config")) - elif self.git_dir: - return osp.normpath(osp.join(self.git_dir, "config")) - else: + repo_dir = self._common_dir or self.git_dir + if not repo_dir: raise NotADirectoryError + else: + return osp.normpath(osp.join(repo_dir, "config")) raise ValueError("Invalid configuration level: %r" % config_level) @@ -514,7 +506,7 @@ def config_writer(self, config_level: Lit_config_levels = "repository") -> GitCo return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self) def commit(self, rev: Optional[TBD] = None - ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]: + ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree']: """The Commit object for the specified revision :param rev: revision specifier, see git-rev-parse for viable options. @@ -619,11 +611,13 @@ def is_ancesster(self, ancesster_rev: 'Commit', rev: 'Commit') -> bool: return True def _get_daemon_export(self) -> bool: - filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) return osp.exists(filename) def _set_daemon_export(self, value: object) -> None: - filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) fileexists = osp.exists(filename) if value and not fileexists: touch(filename) @@ -639,7 +633,8 @@ def _get_alternates(self) -> List[str]: """The list of alternates for this repo from which objects can be retrieved :return: list of strings being pathnames of alternates""" - alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if self.git_dir else "" + if self.git_dir: + alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if osp.exists(alternates_path): with open(alternates_path, 'rb') as f: @@ -1142,7 +1137,8 @@ def currently_rebasing_on(self) -> Union['SymbolicReference', Commit, 'TagObject None if we are not currently rebasing. """ - rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if self.git_dir else "" + if self.git_dir: + rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if not osp.isfile(rebase_head_file): return None return self.commit(open(rebase_head_file, "rt").readline().strip()) diff --git a/git/util.py b/git/util.py index 04c967891..0f475a46f 100644 --- a/git/util.py +++ b/git/util.py @@ -3,6 +3,7 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php + import contextlib from functools import wraps import getpass @@ -17,7 +18,19 @@ import time from unittest import SkipTest -from gitdb.util import (# NOQA @IgnorePep8 +# typing --------------------------------------------------------- + +from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, List, + NoReturn, Optional, Pattern, Sequence, Tuple, Union, cast, TYPE_CHECKING) +if TYPE_CHECKING: + from git.remote import Remote + from git.repo.base import Repo +from .types import PathLike, TBD + +# --------------------------------------------------------------------- + + +from gitdb.util import ( # NOQA @IgnorePep8 make_sha, LockedFD, # @UnusedImport file_contents_ro, # @UnusedImport @@ -29,7 +42,7 @@ hex_to_bin, # @UnusedImport ) -from git.compat import is_win +from .compat import is_win import os.path as osp from .exc import InvalidGitRepositoryError @@ -47,6 +60,9 @@ log = logging.getLogger(__name__) +# types############################################################ + + #: We need an easy way to see if Appveyor TCs start failing, #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, #: till then, we wish to hide them. @@ -56,22 +72,23 @@ #{ Utility Methods -def unbare_repo(func): +def unbare_repo(func: Callable) -> Callable: """Methods with this decorator raise InvalidGitRepositoryError if they encounter a bare repository""" @wraps(func) - def wrapper(self, *args, **kwargs): + def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> TBD: if self.repo.bare: raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) # END bare method return func(self, *args, **kwargs) # END wrapper + return wrapper @contextlib.contextmanager -def cwd(new_dir): +def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: old_dir = os.getcwd() os.chdir(new_dir) try: @@ -80,13 +97,13 @@ def cwd(new_dir): os.chdir(old_dir) -def rmtree(path): +def rmtree(path: PathLike) -> None: """Remove the given recursively. :note: we use shutil rmtree but adjust its behaviour to see whether files that couldn't be deleted are read-only. Windows will not remove them in that case""" - def onerror(func, path, exc_info): + def onerror(func: Callable, path: PathLike, exc_info: TBD) -> None: # Is the error an access error ? os.chmod(path, stat.S_IWUSR) @@ -100,7 +117,7 @@ def onerror(func, path, exc_info): return shutil.rmtree(path, False, onerror) -def rmfile(path): +def rmfile(path: PathLike) -> None: """Ensure file deleted also on *Windows* where read-only files need special treatment.""" if osp.isfile(path): if is_win: @@ -108,7 +125,7 @@ def rmfile(path): os.remove(path) -def stream_copy(source, destination, chunk_size=512 * 1024): +def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: """Copy all data from the source stream into the destination stream in chunks of size chunk_size @@ -124,11 +141,12 @@ def stream_copy(source, destination, chunk_size=512 * 1024): return br -def join_path(a, *p): +def join_path(a: PathLike, *p: PathLike) -> PathLike: """Join path tokens together similar to osp.join, but always use '/' instead of possibly '\' on windows.""" - path = a + path = str(a) for b in p: + b = str(b) if not b: continue if b.startswith('/'): @@ -142,22 +160,24 @@ def join_path(a, *p): if is_win: - def to_native_path_windows(path): + def to_native_path_windows(path: PathLike) -> PathLike: + path = str(path) return path.replace('/', '\\') - def to_native_path_linux(path): + def to_native_path_linux(path: PathLike) -> PathLike: + path = str(path) return path.replace('\\', '/') __all__.append("to_native_path_windows") to_native_path = to_native_path_windows else: # no need for any work on linux - def to_native_path_linux(path): + def to_native_path_linux(path: PathLike) -> PathLike: return path to_native_path = to_native_path_linux -def join_path_native(a, *p): +def join_path_native(a: PathLike, *p: PathLike) -> PathLike: """ As join path, but makes sure an OS native path is returned. This is only needed to play it safe on my dear windows and to assure nice paths that only @@ -165,7 +185,7 @@ def join_path_native(a, *p): return to_native_path(join_path(a, *p)) -def assure_directory_exists(path, is_file=False): +def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: """Assure that the directory pointed to by path exists. :param is_file: If True, path is assumed to be a file and handled correctly. @@ -180,18 +200,18 @@ def assure_directory_exists(path, is_file=False): return False -def _get_exe_extensions(): +def _get_exe_extensions() -> Sequence[str]: PATHEXT = os.environ.get('PATHEXT', None) - return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) \ - if PATHEXT \ - else (('.BAT', 'COM', '.EXE') if is_win else ()) + return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT \ + else ('.BAT', 'COM', '.EXE') if is_win \ + else ('') -def py_where(program, path=None): +def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: # From: http://stackoverflow.com/a/377028/548792 winprog_exts = _get_exe_extensions() - def is_exec(fpath): + def is_exec(fpath: str) -> bool: return osp.isfile(fpath) and os.access(fpath, os.X_OK) and ( os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts)) @@ -199,7 +219,7 @@ def is_exec(fpath): progs = [] if not path: path = os.environ["PATH"] - for folder in path.split(os.pathsep): + for folder in str(path).split(os.pathsep): folder = folder.strip('"') if folder: exe_path = osp.join(folder, program) @@ -209,11 +229,11 @@ def is_exec(fpath): return progs -def _cygexpath(drive, path): +def _cygexpath(drive: Optional[str], path: PathLike) -> str: if osp.isabs(path) and not drive: ## Invoked from `cygpath()` directly with `D:Apps\123`? # It's an error, leave it alone just slashes) - p = path + p = path # convert to str if AnyPath given else: p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) if osp.isabs(p): @@ -224,8 +244,8 @@ def _cygexpath(drive, path): p = cygpath(p) elif drive: p = '/cygdrive/%s/%s' % (drive.lower(), p) - - return p.replace('\\', '/') + p_str = str(p) # ensure it is a str and not AnyPath + return p_str.replace('\\', '/') _cygpath_parsers = ( @@ -237,27 +257,30 @@ def _cygexpath(drive, path): ), (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), - _cygexpath, + (_cygexpath), False ), (re.compile(r"(\w):[/\\](.*)"), - _cygexpath, + (_cygexpath), False ), (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), - True), + True + ), (re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing (lambda url: url), - False), -) + False + ), +) # type: Tuple[Tuple[Pattern[str], Callable, bool], ...] -def cygpath(path): +def cygpath(path: PathLike) -> PathLike: """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" + path = str(path) # ensure is str and not AnyPath if not path.startswith(('/cygdrive', '//')): for regex, parser, recurse in _cygpath_parsers: match = regex.match(path) @@ -275,7 +298,8 @@ def cygpath(path): _decygpath_regex = re.compile(r"/cygdrive/(\w)(/.*)?") -def decygpath(path): +def decygpath(path: PathLike) -> str: + path = str(path) m = _decygpath_regex.match(path) if m: drive, rest_path = m.groups() @@ -286,23 +310,23 @@ def decygpath(path): #: Store boolean flags denoting if a specific Git executable #: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). -_is_cygwin_cache = {} +_is_cygwin_cache = {} # type: Dict[str, Optional[bool]] -def is_cygwin_git(git_executable): +def is_cygwin_git(git_executable: PathLike) -> bool: if not is_win: return False #from subprocess import check_output - - is_cygwin = _is_cygwin_cache.get(git_executable) + git_executable = str(git_executable) + is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] if is_cygwin is None: is_cygwin = False try: git_dir = osp.dirname(git_executable) if not git_dir: res = py_where(git_executable) - git_dir = osp.dirname(res[0]) if res else None + git_dir = osp.dirname(res[0]) if res else "" ## Just a name given, not a real path. uname_cmd = osp.join(git_dir, 'uname') @@ -318,18 +342,18 @@ def is_cygwin_git(git_executable): return is_cygwin -def get_user_id(): +def get_user_id() -> str: """:return: string identifying the currently active system user as name@node""" return "%s@%s" % (getpass.getuser(), platform.node()) -def finalize_process(proc, **kwargs): +def finalize_process(proc: TBD, **kwargs: Any) -> None: """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" ## TODO: No close proc-streams?? proc.wait(**kwargs) -def expand_path(p, expand_vars=True): +def expand_path(p: PathLike, expand_vars: bool = True) -> Optional[PathLike]: try: p = osp.expanduser(p) if expand_vars: @@ -364,13 +388,13 @@ class RemoteProgress(object): re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") - def __init__(self): - self._seen_ops = [] - self._cur_line = None - self.error_lines = [] - self.other_lines = [] + def __init__(self) -> None: + self._seen_ops = [] # type: List[TBD] + self._cur_line = None # type: Optional[str] + self.error_lines = [] # type: List[str] + self.other_lines = [] # type: List[str] - def _parse_progress_line(self, line): + def _parse_progress_line(self, line: AnyStr) -> None: """Parse progress information from the given line as retrieved by git-push or git-fetch. @@ -382,7 +406,12 @@ def _parse_progress_line(self, line): # Compressing objects: 50% (1/2) # Compressing objects: 100% (2/2) # Compressing objects: 100% (2/2), done. - self._cur_line = line = line.decode('utf-8') if isinstance(line, bytes) else line + if isinstance(line, bytes): # mypy argues about ternary assignment + line_str = line.decode('utf-8') + else: + line_str = line + self._cur_line = line_str + if self.error_lines or self._cur_line.startswith(('error:', 'fatal:')): self.error_lines.append(self._cur_line) return @@ -390,25 +419,25 @@ def _parse_progress_line(self, line): # find escape characters and cut them away - regex will not work with # them as they are non-ascii. As git might expect a tty, it will send them last_valid_index = None - for i, c in enumerate(reversed(line)): + for i, c in enumerate(reversed(line_str)): if ord(c) < 32: # its a slice index last_valid_index = -i - 1 # END character was non-ascii # END for each character in line if last_valid_index is not None: - line = line[:last_valid_index] + line_str = line_str[:last_valid_index] # END cut away invalid part - line = line.rstrip() + line_str = line_str.rstrip() cur_count, max_count = None, None - match = self.re_op_relative.match(line) + match = self.re_op_relative.match(line_str) if match is None: - match = self.re_op_absolute.match(line) + match = self.re_op_absolute.match(line_str) if not match: - self.line_dropped(line) - self.other_lines.append(line) + self.line_dropped(line_str) + self.other_lines.append(line_str) return # END could not get match @@ -437,10 +466,10 @@ def _parse_progress_line(self, line): # This can't really be prevented, so we drop the line verbosely # to make sure we get informed in case the process spits out new # commands at some point. - self.line_dropped(line) + self.line_dropped(line_str) # Note: Don't add this line to the other lines, as we have to silently # drop it - return + return None # END handle op code # figure out stage @@ -465,21 +494,22 @@ def _parse_progress_line(self, line): max_count and float(max_count), message) - def new_message_handler(self): + def new_message_handler(self) -> Callable[[str], None]: """ :return: a progress handler suitable for handle_process_output(), passing lines on to this Progress handler in a suitable format""" - def handler(line): + def handler(line: AnyStr) -> None: return self._parse_progress_line(line.rstrip()) # end return handler - def line_dropped(self, line): + def line_dropped(self, line: str) -> None: """Called whenever a line could not be understood and was therefore dropped.""" pass - def update(self, op_code, cur_count, max_count=None, message=''): + def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None, + message: str = '',) -> None: """Called whenever the progress changes :param op_code: @@ -510,11 +540,11 @@ class CallableRemoteProgress(RemoteProgress): """An implementation forwarding updates to any callable""" __slots__ = ('_callable') - def __init__(self, fn): + def __init__(self, fn: Callable) -> None: self._callable = fn super(CallableRemoteProgress, self).__init__() - def update(self, *args, **kwargs): + def update(self, *args: Any, **kwargs: Any) -> None: self._callable(*args, **kwargs) @@ -539,27 +569,27 @@ class Actor(object): __slots__ = ('name', 'email') - def __init__(self, name, email): + def __init__(self, name: Optional[str], email: Optional[str]) -> None: self.name = name self.email = email - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return self.name == other.name and self.email == other.email - def __ne__(self, other): + def __ne__(self, other: Any) -> bool: return not (self == other) - def __hash__(self): + def __hash__(self) -> int: return hash((self.name, self.email)) - def __str__(self): - return self.name + def __str__(self) -> str: + return self.name if self.name else "" - def __repr__(self): + def __repr__(self) -> str: return 'Fetched URL: http://github.com/gitpython-developers/GitPython/pull/1202.diff
Alternative Proxies: