"""Utilities for comparing files and directories. Classes: dircmp Functions: cmp(f1, f2, shallow=True) -> int cmpfiles(a, b, common) -> ([], [], []) clear_cache() """ import os import stat from itertools import filterfalse __all__ = ['clear_cache', 'cmp', 'dircmp', 'cmpfiles', 'DEFAULT_IGNORES'] _cache = {} BUFSIZE = 8*1024 DEFAULT_IGNORES = [ 'RCS', 'CVS', 'tags', '.git', '.hg', '.bzr', '_darcs', '__pycache__'] def clear_cache(): """Clear the filecmp cache.""" _cache.clear() def cmp(f1, f2, shallow=True): """Compare two files. Arguments: f1 -- First file name f2 -- Second file name shallow -- Just check stat signature (do not read the files). defaults to True. Return value: True if the files are the same, False otherwise. This function uses a cache for past comparisons and the results, with cache entries invalidated if their stat information changes. The cache may be cleared by calling clear_cache(). """ s1 = _sig(os.stat(f1)) s2 = _sig(os.stat(f2)) if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: return False if shallow and s1 == s2: return True if s1[1] != s2[1]: return False outcome = _cache.get((f1, f2, s1, s2)) if outcome is None: outcome = _do_cmp(f1, f2) if len(_cache) > 100: clear_cache() _cache[f1, f2, s1, s2] = outcome return outcome def _sig(st): return (stat.S_IFMT(st.st_mode), st.st_size, st.st_mtime) def _do_cmp(f1, f2): bufsize = BUFSIZE with open(f1, 'rb') as fp1, open(f2, 'rb') as fp2: while True: b1 = fp1.read(bufsize) b2 = fp2.read(bufsize) if b1 != b2: return False if not b1: return True class dircmp: """A class that manages the comparison of 2 directories. dircmp(a, b, ignore=None, hide=None) A and B are directories. IGNORE is a list of names to ignore, defaults to DEFAULT_IGNORES. HIDE is a list of names to hide, defaults to [os.curdir, os.pardir]. High level usage: x = dircmp(dir1, dir2) x.report() -> prints a report on the differences between dir1 and dir2 or x.report_partial_closure() -> prints report on differences between dir1 and dir2, and reports on common immediate subdirectories. x.report_full_closure() -> like report_partial_closure, but fully recursive. Attributes: left_list, right_list: The files in dir1 and dir2, filtered by hide and ignore. common: a list of names in both dir1 and dir2. left_only, right_only: names only in dir1, dir2. common_dirs: subdirectories in both dir1 and dir2. common_files: files in both dir1 and dir2. common_funny: names in both dir1 and dir2 where the type differs between dir1 and dir2, or the name is not stat-able. same_files: list of identical files. diff_files: list of filenames which differ. funny_files: list of files which could not be compared. subdirs: a dictionary of dircmp objects, keyed by names in common_dirs. """ def __init__(self, a, b, ignore=None, hide=None): self.left = a self.right = b if hide is None: self.hide = [os.curdir, os.pardir] else: self.hide = hide if ignore is None: self.ignore = DEFAULT_IGNORES else: self.ignore = ignore def phase0(self): self.left_list = _filter(os.listdir(self.left), self.hide+self.ignore) self.right_list = _filter(os.listdir(self.right), self.hide+self.ignore) self.left_list.sort() self.right_list.sort() def phase1(self): a = dict(zip(map(os.path.normcase, self.left_list), self.left_list)) b = dict(zip(map(os.path.normcase, self.right_list), self.right_list)) self.common = list(map(a.__getitem__, filter(b.__contains__, a))) self.left_only = list(map(a.__getitem__, filterfalse(b.__contains__, a))) self.right_only = list(map(b.__getitem__, filterfalse(a.__contains__, b))) def phase2(self): self.common_dirs = [] self.common_files = [] self.common_funny = [] for x in self.common: a_path = os.path.join(self.left, x) b_path = os.path.join(self.right, x) ok = 1 try: a_stat = os.stat(a_path) except OSError as why: ok = 0 try: b_stat = os.stat(b_path) except OSError as why: ok = 0 if ok: a_type = stat.S_IFMT(a_stat.st_mode) b_type = stat.S_IFMT(b_stat.st_mode) if a_type != b_type: self.common_funny.append(x) elif stat.S_ISDIR(a_type): self.common_dirs.append(x) elif stat.S_ISREG(a_type): self.common_files.append(x) else: self.common_funny.append(x) else: self.common_funny.append(x) def phase3(self): xx = cmpfiles(self.left, self.right, self.common_files) self.same_files, self.diff_files, self.funny_files = xx def phase4(self): self.subdirs = {} for x in self.common_dirs: a_x = os.path.join(self.left, x) b_x = os.path.join(self.right, x) self.subdirs[x] = dircmp(a_x, b_x, self.ignore, self.hide) def phase4_closure(self): self.phase4() for sd in self.subdirs.values(): sd.phase4_closure() def report(self): print('diff', self.left, self.right) if self.left_only: self.left_only.sort() print('Only in', self.left, ':', self.left_only) if self.right_only: self.right_only.sort() print('Only in', self.right, ':', self.right_only) if self.same_files: self.same_files.sort() print('Identical files :', self.same_files) if self.diff_files: self.diff_files.sort() print('Differing files :', self.diff_files) if self.funny_files: self.funny_files.sort() print('Trouble with common files :', self.funny_files) if self.common_dirs: self.common_dirs.sort() print('Common subdirectories :', self.common_dirs) if self.common_funny: self.common_funny.sort() print('Common funny cases :', self.common_funny) def report_partial_closure(self): self.report() for sd in self.subdirs.values(): print() sd.report() def report_full_closure(self): self.report() for sd in self.subdirs.values(): print() sd.report_full_closure() methodmap = dict(subdirs=phase4, same_files=phase3, diff_files=phase3, funny_files=phase3, common_dirs = phase2, common_files=phase2, common_funny=phase2, common=phase1, left_only=phase1, right_only=phase1, left_list=phase0, right_list=phase0) def __getattr__(self, attr): if attr not in self.methodmap: raise AttributeError(attr) self.methodmap[attr](self) return getattr(self, attr) def cmpfiles(a, b, common, shallow=True): """Compare common files in two directories. a, b -- directory names common -- list of file names found in both directories shallow -- if true, do comparison based solely on stat() information Returns a tuple of three lists: files that compare equal files that are different filenames that aren't regular files. """ res = ([], [], []) for x in common: ax = os.path.join(a, x) bx = os.path.join(b, x) res[_cmp(ax, bx, shallow)].append(x) return res def _cmp(a, b, sh, abs=abs, cmp=cmp): try: return not abs(cmp(a, b, sh)) except OSError: return 2 def _filter(flist, skip): return list(filterfalse(skip.__contains__, flist)) def demo(): import sys import getopt options, args = getopt.getopt(sys.argv[1:], 'r') if len(args) != 2: raise getopt.GetoptError('need exactly two args', None) dd = dircmp(args[0], args[1]) if ('-r', '') in options: dd.report_full_closure() else: dd.report() if __name__ == '__main__': demo() """Helper class to quickly write a loop over all standard input files. Typical use is: import fileinput for line in fileinput.input(): process(line) This iterates over the lines of all files listed in sys.argv[1:], defaulting to sys.stdin if the list is empty. If a filename is '-' it is also replaced by sys.stdin and the optional arguments mode and openhook are ignored. To specify an alternative list of filenames, pass it as the argument to input(). A single file name is also allowed. Functions filename(), lineno() return the filename and cumulative line number of the line that has just been read; filelineno() returns its line number in the current file; isfirstline() returns true iff the line just read is the first line of its file; isstdin() returns true iff the line was read from sys.stdin. Function nextfile() closes the current file so that the next iteration will read the first line from the next file (if any); lines not read from the file will not count towards the cumulative line count; the filename is not changed until after the first line of the next file has been read. Function close() closes the sequence. Before any lines have been read, filename() returns None and both line numbers are zero; nextfile() has no effect. After all lines have been read, filename() and the line number functions return the values pertaining to the last line read; nextfile() has no effect. All files are opened in text mode by default, you can override this by setting the mode parameter to input() or FileInput.__init__(). If an I/O error occurs during opening or reading a file, the OSError exception is raised. If sys.stdin is used more than once, the second and further use will return no lines, except perhaps for interactive use, or if it has been explicitly reset (e.g. using sys.stdin.seek(0)). Empty files are opened and immediately closed; the only time their presence in the list of filenames is noticeable at all is when the last file opened is empty. It is possible that the last line of a file doesn't end in a newline character; otherwise lines are returned including the trailing newline. Class FileInput is the implementation; its methods filename(), lineno(), fileline(), isfirstline(), isstdin(), nextfile() and close() correspond to the functions in the module. In addition it has a readline() method which returns the next input line, and a __getitem__() method which implements the sequence behavior. The sequence must be accessed in strictly sequential order; sequence access and readline() cannot be mixed. Optional in-place filtering: if the keyword argument inplace=1 is passed to input() or to the FileInput constructor, the file is moved to a backup file and standard output is directed to the input file. This makes it possible to write a filter that rewrites its input file in place. If the keyword argument backup=".<some extension>" is also given, it specifies the extension for the backup file, and the backup file remains around; by default, the extension is ".bak" and it is deleted when the output file is closed. In-place filtering is disabled when standard input is read. XXX The current implementation does not work for MS-DOS 8+3 filesystems. XXX Possible additions: - optional getopt argument processing - isatty() - read(), read(size), even readlines() """ import sys, os __all__ = ["input", "close", "nextfile", "filename", "lineno", "filelineno", "fileno", "isfirstline", "isstdin", "FileInput", "hook_compressed", "hook_encoded"] _state = None def input(files=None, inplace=False, backup="", *, mode="r", openhook=None): """Return an instance of the FileInput class, which can be iterated. The parameters are passed to the constructor of the FileInput class. The returned instance, in addition to being an iterator, keeps global state for the functions of this module,. """ global _state if _state and _state._file: raise RuntimeError("input() already active") _state = FileInput(files, inplace, backup, mode=mode, openhook=openhook) return _state def close(): """Close the sequence.""" global _state state = _state _state = None if state: state.close() def nextfile(): """ Close the current file so that the next iteration will read the first line from the next file (if any); lines not read from the file will not count towards the cumulative line count. The filename is not changed until after the first line of the next file has been read. Before the first line has been read, this function has no effect; it cannot be used to skip the first file. After the last line of the last file has been read, this function has no effect. """ if not _state: raise RuntimeError("no active input()") return _state.nextfile() def filename(): """ Return the name of the file currently being read. Before the first line has been read, returns None. """ if not _state: raise RuntimeError("no active input()") return _state.filename() def lineno(): """ Return the cumulative line number of the line that has just been read. Before the first line has been read, returns 0. After the last line of the last file has been read, returns the line number of that line. """ if not _state: raise RuntimeError("no active input()") return _state.lineno() def filelineno(): """ Return the line number in the current file. Before the first line has been read, returns 0. After the last line of the last file has been read, returns the line number of that line within the file. """ if not _state: raise RuntimeError("no active input()") return _state.filelineno() def fileno(): """ Return the file number of the current file. When no file is currently opened, returns -1. """ if not _state: raise RuntimeError("no active input()") return _state.fileno() def isfirstline(): """ Returns true the line just read is the first line of its file, otherwise returns false. """ if not _state: raise RuntimeError("no active input()") return _state.isfirstline() def isstdin(): """ Returns true if the last line was read from sys.stdin, otherwise returns false. """ if not _state: raise RuntimeError("no active input()") return _state.isstdin() class FileInput: """FileInput([files[, inplace[, backup]]], *, mode=None, openhook=None) Class FileInput is the implementation of the module; its methods filename(), lineno(), fileline(), isfirstline(), isstdin(), fileno(), nextfile() and close() correspond to the functions of the same name in the module. In addition it has a readline() method which returns the next input line, and a __getitem__() method which implements the sequence behavior. The sequence must be accessed in strictly sequential order; random access and readline() cannot be mixed. """ def __init__(self, files=None, inplace=False, backup="", *, mode="r", openhook=None): if isinstance(files, str): files = (files,) elif isinstance(files, os.PathLike): files = (os.fspath(files), ) else: if files is None: files = sys.argv[1:] if not files: files = ('-',) else: files = tuple(files) self._files = files self._inplace = inplace self._backup = backup self._savestdout = None self._output = None self._filename = None self._startlineno = 0 self._filelineno = 0 self._file = None self._isstdin = False self._backupfilename = None if mode not in ('r', 'rU', 'U', 'rb'): raise ValueError("FileInput opening mode must be one of " "'r', 'rU', 'U' and 'rb'") if 'U' in mode: import warnings warnings.warn("'U' mode is deprecated", DeprecationWarning, 2) self._mode = mode self._write_mode = mode.replace('r', 'w') if 'U' not in mode else 'w' if openhook: if inplace: raise ValueError("FileInput cannot use an opening hook in inplace mode") if not callable(openhook): raise ValueError("FileInput openhook must be callable") self._openhook = openhook def __del__(self): self.close() def close(self): try: self.nextfile() finally: self._files = () def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def __iter__(self): return self def __next__(self): while True: line = self._readline() if line: self._filelineno += 1 return line if not self._file: raise StopIteration self.nextfile() def __getitem__(self, i): import warnings warnings.warn( "Support for indexing FileInput objects is deprecated. " "Use iterator protocol instead.", DeprecationWarning, stacklevel=2 ) if i != self.lineno(): raise RuntimeError("accessing lines out of order") try: return self.__next__() except StopIteration: raise IndexError("end of input reached") def nextfile(self): savestdout = self._savestdout self._savestdout = None if savestdout: sys.stdout = savestdout output = self._output self._output = None try: if output: output.close() finally: file = self._file self._file = None try: del self._readline except AttributeError: pass try: if file and not self._isstdin: file.close() finally: backupfilename = self._backupfilename self._backupfilename = None if backupfilename and not self._backup: try: os.unlink(backupfilename) except OSError: pass self._isstdin = False def readline(self): while True: line = self._readline() if line: self._filelineno += 1 return line if not self._file: return line self.nextfile() def _readline(self): if not self._files: if 'b' in self._mode: return b'' else: return '' self._filename = self._files[0] self._files = self._files[1:] self._startlineno = self.lineno() self._filelineno = 0 self._file = None self._isstdin = False self._backupfilename = 0 if self._filename == '-': self._filename = '<stdin>' if 'b' in self._mode: self._file = getattr(sys.stdin, 'buffer', sys.stdin) else: self._file = sys.stdin self._isstdin = True else: if self._inplace: self._backupfilename = ( os.fspath(self._filename) + (self._backup or ".bak")) try: os.unlink(self._backupfilename) except OSError: pass os.rename(self._filename, self._backupfilename) self._file = open(self._backupfilename, self._mode) try: perm = os.fstat(self._file.fileno()).st_mode except OSError: self._output = open(self._filename, self._write_mode) else: mode = os.O_CREAT | os.O_WRONLY | os.O_TRUNC if hasattr(os, 'O_BINARY'): mode |= os.O_BINARY fd = os.open(self._filename, mode, perm) self._output = os.fdopen(fd, self._write_mode) try: os.chmod(self._filename, perm) except OSError: pass self._savestdout = sys.stdout sys.stdout = self._output else: if self._openhook: self._file = self._openhook(self._filename, self._mode) else: self._file = open(self._filename, self._mode) self._readline = self._file.readline return self._readline() def filename(self): return self._filename def lineno(self): return self._startlineno + self._filelineno def filelineno(self): return self._filelineno def fileno(self): if self._file: try: return self._file.fileno() except ValueError: return -1 else: return -1 def isfirstline(self): return self._filelineno == 1 def isstdin(self): return self._isstdin def hook_compressed(filename, mode): ext = os.path.splitext(filename)[1] if ext == '.gz': import gzip return gzip.open(filename, mode) elif ext == '.bz2': import bz2 return bz2.BZ2File(filename, mode) else: return open(filename, mode) def hook_encoded(encoding, errors=None): def openhook(filename, mode): return open(filename, mode, encoding=encoding, errors=errors) return openhook def _test(): import getopt inplace = False backup = False opts, args = getopt.getopt(sys.argv[1:], "ib:") for o, a in opts: if o == '-i': inplace = True if o == '-b': backup = a for line in input(args, inplace=inplace, backup=backup): if line[-1:] == '\n': line = line[:-1] if line[-1:] == '\r': line = line[:-1] print("%d: %s[%d]%s %s" % (lineno(), filename(), filelineno(), isfirstline() and "*" or "", line)) print("%d: %s[%d]" % (lineno(), filename(), filelineno())) if __name__ == '__main__': _test() """Filename matching with shell patterns. fnmatch(FILENAME, PATTERN) matches according to the local convention. fnmatchcase(FILENAME, PATTERN) always takes case in account. The functions operate by translating the pattern into a regular expression. They cache the compiled regular expressions for speed. The function translate(PATTERN) returns a regular expression corresponding to PATTERN. (It does not compile it.) """ import os import posixpath import re import functools __all__ = ["filter", "fnmatch", "fnmatchcase", "translate"] def fnmatch(name, pat): """Test whether FILENAME matches PATTERN. Patterns are Unix shell style: * matches everything ? matches any single character [seq] matches any character in seq [!seq] matches any char not in seq An initial period in FILENAME is not special. Both FILENAME and PATTERN are first case-normalized if the operating system requires it. If you don't want this, use fnmatchcase(FILENAME, PATTERN). """ name = os.path.normcase(name) pat = os.path.normcase(pat) return fnmatchcase(name, pat) @functools.lru_cache(maxsize=256, typed=True) def _compile_pattern(pat): if isinstance(pat, bytes): pat_str = str(pat, 'ISO-8859-1') res_str = translate(pat_str) res = bytes(res_str, 'ISO-8859-1') else: res = translate(pat) return re.compile(res).match def filter(names, pat): """Construct a list from those elements of the iterable NAMES that match PAT.""" result = [] pat = os.path.normcase(pat) match = _compile_pattern(pat) if os.path is posixpath: for name in names: if match(name): result.append(name) else: for name in names: if match(os.path.normcase(name)): result.append(name) return result def fnmatchcase(name, pat): """Test whether FILENAME matches PATTERN, including case. This is a version of fnmatch() which doesn't case-normalize its arguments. """ match = _compile_pattern(pat) return match(name) is not None def translate(pat): """Translate a shell PATTERN to a regular expression. There is no way to quote meta-characters. """ i, n = 0, len(pat) res = '' while i < n: c = pat[i] i = i+1 if c == '*': res = res + '.*' elif c == '?': res = res + '.' elif c == '[': j = i if j < n and pat[j] == '!': j = j+1 if j < n and pat[j] == ']': j = j+1 while j < n and pat[j] != ']': j = j+1 if j >= n: res = res + '\\[' else: stuff = pat[i:j] if '--' not in stuff: stuff = stuff.replace('\\', r'\\') else: chunks = [] k = i+2 if pat[i] == '!' else i+1 while True: k = pat.find('-', k, j) if k < 0: break chunks.append(pat[i:k]) i = k+1 k = k+3 chunks.append(pat[i:j]) stuff = '-'.join(s.replace('\\', r'\\').replace('-', r'\-') for s in chunks) stuff = re.sub(r'([&~|])', r'\\\1', stuff) i = j+1 if stuff[0] == '!': stuff = '^' + stuff[1:] elif stuff[0] in ('^', '['): stuff = '\\' + stuff res = '%s[%s]' % (res, stuff) else: res = res + re.escape(c) return r'(?s:%s)\Z' % res """Generic output formatting. Formatter objects transform an abstract flow of formatting events into specific output events on writer objects. Formatters manage several stack structures to allow various properties of a writer object to be changed and restored; writers need not be able to handle relative changes nor any sort of ``change back'' operation. Specific writer properties which may be controlled via formatter objects are horizontal alignment, font, and left margin indentations. A mechanism is provided which supports providing arbitrary, non-exclusive style settings to a writer as well. Additional interfaces facilitate formatting events which are not reversible, such as paragraph separation. Writer objects encapsulate device interfaces. Abstract devices, such as file formats, are supported as well as physical devices. The provided implementations all work with abstract devices. The interface makes available mechanisms for setting the properties which formatter objects manage and inserting data into the output. """ import sys import warnings warnings.warn('the formatter module is deprecated', DeprecationWarning, stacklevel=2) AS_IS = None class NullFormatter: """A formatter which does nothing. If the writer parameter is omitted, a NullWriter instance is created. No methods of the writer are called by NullFormatter instances. Implementations should inherit from this class if implementing a writer interface but don't need to inherit any implementation. """ def __init__(self, writer=None): if writer is None: writer = NullWriter() self.writer = writer def end_paragraph(self, blankline): pass def add_line_break(self): pass def add_hor_rule(self, *args, **kw): pass def add_label_data(self, format, counter, blankline=None): pass def add_flowing_data(self, data): pass def add_literal_data(self, data): pass def flush_softspace(self): pass def push_alignment(self, align): pass def pop_alignment(self): pass def push_font(self, x): pass def pop_font(self): pass def push_margin(self, margin): pass def pop_margin(self): pass def set_spacing(self, spacing): pass def push_style(self, *styles): pass def pop_style(self, n=1): pass def assert_line_data(self, flag=1): pass class AbstractFormatter: """The standard formatter. This implementation has demonstrated wide applicability to many writers, and may be used directly in most circumstances. It has been used to implement a full-featured World Wide Web browser. """ def __init__(self, writer): self.writer = writer self.align = None self.align_stack = [] self.font_stack = [] self.margin_stack = [] self.spacing = None self.style_stack = [] self.nospace = 1 self.softspace = 0 self.para_end = 1 self.parskip = 0 self.hard_break = 1 self.have_label = 0 def end_paragraph(self, blankline): if not self.hard_break: self.writer.send_line_break() self.have_label = 0 if self.parskip < blankline and not self.have_label: self.writer.send_paragraph(blankline - self.parskip) self.parskip = blankline self.have_label = 0 self.hard_break = self.nospace = self.para_end = 1 self.softspace = 0 def add_line_break(self): if not (self.hard_break or self.para_end): self.writer.send_line_break() self.have_label = self.parskip = 0 self.hard_break = self.nospace = 1 self.softspace = 0 def add_hor_rule(self, *args, **kw): if not self.hard_break: self.writer.send_line_break() self.writer.send_hor_rule(*args, **kw) self.hard_break = self.nospace = 1 self.have_label = self.para_end = self.softspace = self.parskip = 0 def add_label_data(self, format, counter, blankline = None): if self.have_label or not self.hard_break: self.writer.send_line_break() if not self.para_end: self.writer.send_paragraph((blankline and 1) or 0) if isinstance(format, str): self.writer.send_label_data(self.format_counter(format, counter)) else: self.writer.send_label_data(format) self.nospace = self.have_label = self.hard_break = self.para_end = 1 self.softspace = self.parskip = 0 def format_counter(self, format, counter): label = '' for c in format: if c == '1': label = label + ('%d' % counter) elif c in 'aA': if counter > 0: label = label + self.format_letter(c, counter) elif c in 'iI': if counter > 0: label = label + self.format_roman(c, counter) else: label = label + c return label def format_letter(self, case, counter): label = '' while counter > 0: counter, x = divmod(counter-1, 26) s = chr(ord(case) + x) label = s + label return label def format_roman(self, case, counter): ones = ['i', 'x', 'c', 'm'] fives = ['v', 'l', 'd'] label, index = '', 0 while counter > 0: counter, x = divmod(counter, 10) if x == 9: label = ones[index] + ones[index+1] + label elif x == 4: label = ones[index] + fives[index] + label else: if x >= 5: s = fives[index] x = x-5 else: s = '' s = s + ones[index]*x label = s + label index = index + 1 if case == 'I': return label.upper() return label def add_flowing_data(self, data): if not data: return prespace = data[:1].isspace() postspace = data[-1:].isspace() data = " ".join(data.split()) if self.nospace and not data: return elif prespace or self.softspace: if not data: if not self.nospace: self.softspace = 1 self.parskip = 0 return if not self.nospace: data = ' ' + data self.hard_break = self.nospace = self.para_end = \ self.parskip = self.have_label = 0 self.softspace = postspace self.writer.send_flowing_data(data) def add_literal_data(self, data): if not data: return if self.softspace: self.writer.send_flowing_data(" ") self.hard_break = data[-1:] == '\n' self.nospace = self.para_end = self.softspace = \ self.parskip = self.have_label = 0 self.writer.send_literal_data(data) def flush_softspace(self): if self.softspace: self.hard_break = self.para_end = self.parskip = \ self.have_label = self.softspace = 0 self.nospace = 1 self.writer.send_flowing_data(' ') def push_alignment(self, align): if align and align != self.align: self.writer.new_alignment(align) self.align = align self.align_stack.append(align) else: self.align_stack.append(self.align) def pop_alignment(self): if self.align_stack: del self.align_stack[-1] if self.align_stack: self.align = align = self.align_stack[-1] self.writer.new_alignment(align) else: self.align = None self.writer.new_alignment(None) def push_font(self, font): size, i, b, tt = font if self.softspace: self.hard_break = self.para_end = self.softspace = 0 self.nospace = 1 self.writer.send_flowing_data(' ') if self.font_stack: csize, ci, cb, ctt = self.font_stack[-1] if size is AS_IS: size = csize if i is AS_IS: i = ci if b is AS_IS: b = cb if tt is AS_IS: tt = ctt font = (size, i, b, tt) self.font_stack.append(font) self.writer.new_font(font) def pop_font(self): if self.font_stack: del self.font_stack[-1] if self.font_stack: font = self.font_stack[-1] else: font = None self.writer.new_font(font) def push_margin(self, margin): self.margin_stack.append(margin) fstack = [m for m in self.margin_stack if m] if not margin and fstack: margin = fstack[-1] self.writer.new_margin(margin, len(fstack)) def pop_margin(self): if self.margin_stack: del self.margin_stack[-1] fstack = [m for m in self.margin_stack if m] if fstack: margin = fstack[-1] else: margin = None self.writer.new_margin(margin, len(fstack)) def set_spacing(self, spacing): self.spacing = spacing self.writer.new_spacing(spacing) def push_style(self, *styles): if self.softspace: self.hard_break = self.para_end = self.softspace = 0 self.nospace = 1 self.writer.send_flowing_data(' ') for style in styles: self.style_stack.append(style) self.writer.new_styles(tuple(self.style_stack)) def pop_style(self, n=1): del self.style_stack[-n:] self.writer.new_styles(tuple(self.style_stack)) def assert_line_data(self, flag=1): self.nospace = self.hard_break = not flag self.para_end = self.parskip = self.have_label = 0 class NullWriter: """Minimal writer interface to use in testing & inheritance. A writer which only provides the interface definition; no actions are taken on any methods. This should be the base class for all writers which do not need to inherit any implementation methods. """ def __init__(self): pass def flush(self): pass def new_alignment(self, align): pass def new_font(self, font): pass def new_margin(self, margin, level): pass def new_spacing(self, spacing): pass def new_styles(self, styles): pass def send_paragraph(self, blankline): pass def send_line_break(self): pass def send_hor_rule(self, *args, **kw): pass def send_label_data(self, data): pass def send_flowing_data(self, data): pass def send_literal_data(self, data): pass class AbstractWriter(NullWriter): """A writer which can be used in debugging formatters, but not much else. Each method simply announces itself by printing its name and arguments on standard output. """ def new_alignment(self, align): print("new_alignment(%r)" % (align,)) def new_font(self, font): print("new_font(%r)" % (font,)) def new_margin(self, margin, level): print("new_margin(%r, %d)" % (margin, level)) def new_spacing(self, spacing): print("new_spacing(%r)" % (spacing,)) def new_styles(self, styles): print("new_styles(%r)" % (styles,)) def send_paragraph(self, blankline): print("send_paragraph(%r)" % (blankline,)) def send_line_break(self): print("send_line_break()") def send_hor_rule(self, *args, **kw): print("send_hor_rule()") def send_label_data(self, data): print("send_label_data(%r)" % (data,)) def send_flowing_data(self, data): print("send_flowing_data(%r)" % (data,)) def send_literal_data(self, data): print("send_literal_data(%r)" % (data,)) class DumbWriter(NullWriter): """Simple writer class which writes output on the file object passed in as the file parameter or, if file is omitted, on standard output. The output is simply word-wrapped to the number of columns specified by the maxcol parameter. This class is suitable for reflowing a sequence of paragraphs. """ def __init__(self, file=None, maxcol=72): self.file = file or sys.stdout self.maxcol = maxcol NullWriter.__init__(self) self.reset() def reset(self): self.col = 0 self.atbreak = 0 def send_paragraph(self, blankline): self.file.write('\n'*blankline) self.col = 0 self.atbreak = 0 def send_line_break(self): self.file.write('\n') self.col = 0 self.atbreak = 0 def send_hor_rule(self, *args, **kw): self.file.write('\n') self.file.write('-'*self.maxcol) self.file.write('\n') self.col = 0 self.atbreak = 0 def send_literal_data(self, data): self.file.write(data) i = data.rfind('\n') if i >= 0: self.col = 0 data = data[i+1:] data = data.expandtabs() self.col = self.col + len(data) self.atbreak = 0 def send_flowing_data(self, data): if not data: return atbreak = self.atbreak or data[0].isspace() col = self.col maxcol = self.maxcol write = self.file.write for word in data.split(): if atbreak: if col + len(word) >= maxcol: write('\n') col = 0 else: write(' ') col = col + 1 write(word) col = col + len(word) atbreak = 1 self.col = col self.atbreak = data[-1].isspace() def test(file = None): w = DumbWriter() f = AbstractFormatter(w) if file is not None: fp = open(file) elif sys.argv[1:]: fp = open(sys.argv[1]) else: fp = sys.stdin try: for line in fp: if line == '\n': f.end_paragraph(1) else: f.add_flowing_data(line) finally: if fp is not sys.stdin: fp.close() f.end_paragraph(0) if __name__ == '__main__': test() """Fraction, infinite-precision, real numbers.""" from decimal import Decimal import math import numbers import operator import re import sys __all__ = ['Fraction', 'gcd'] def gcd(a, b): """Calculate the Greatest Common Divisor of a and b. Unless b==0, the result will have the same sign as b (so that when b is divided by it, the result comes out positive). """ import warnings warnings.warn('fractions.gcd() is deprecated. Use math.gcd() instead.', DeprecationWarning, 2) if type(a) is int is type(b): if (b or a) < 0: return -math.gcd(a, b) return math.gcd(a, b) return _gcd(a, b) def _gcd(a, b): while b: a, b = b, a%b return a _PyHASH_MODULUS = sys.hash_info.modulus _PyHASH_INF = sys.hash_info.inf _RATIONAL_FORMAT = re.compile(r""" \A\s* (?P<sign>[-+]?) (?=\d|\.\d) (?P<num>\d*) (?: (?:/(?P<denom>\d+))? | (?:\.(?P<decimal>\d*))? (?:E(?P<exp>[-+]?\d+))? ) \s*\Z """, re.VERBOSE | re.IGNORECASE) class Fraction(numbers.Rational): """This class implements rational numbers. In the two-argument form of the constructor, Fraction(8, 6) will produce a rational number equivalent to 4/3. Both arguments must be Rational. The numerator defaults to 0 and the denominator defaults to 1 so that Fraction(3) == 3 and Fraction() == 0. Fractions can also be constructed from: - numeric strings similar to those accepted by the float constructor (for example, '-2.3' or '1e10') - strings of the form '123/456' - float and Decimal instances - other Rational instances (including integers) """ __slots__ = ('_numerator', '_denominator') def __new__(cls, numerator=0, denominator=None, *, _normalize=True): """Constructs a Rational. Takes a string like '3/2' or '1.5', another Rational instance, a numerator/denominator pair, or a float. Examples -------- >>> Fraction(10, -8) Fraction(-5, 4) >>> Fraction(Fraction(1, 7), 5) Fraction(1, 35) >>> Fraction(Fraction(1, 7), Fraction(2, 3)) Fraction(3, 14) >>> Fraction('314') Fraction(314, 1) >>> Fraction('-35/4') Fraction(-35, 4) >>> Fraction('3.1415') Fraction(6283, 2000) >>> Fraction('-47e-2') Fraction(-47, 100) >>> Fraction(1.47) Fraction(6620291452234629, 4503599627370496) >>> Fraction(2.25) Fraction(9, 4) >>> Fraction(Decimal('1.47')) Fraction(147, 100) """ self = super(Fraction, cls).__new__(cls) if denominator is None: if type(numerator) is int: self._numerator = numerator self._denominator = 1 return self elif isinstance(numerator, numbers.Rational): self._numerator = numerator.numerator self._denominator = numerator.denominator return self elif isinstance(numerator, (float, Decimal)): self._numerator, self._denominator = numerator.as_integer_ratio() return self elif isinstance(numerator, str): m = _RATIONAL_FORMAT.match(numerator) if m is None: raise ValueError('Invalid literal for Fraction: %r' % numerator) numerator = int(m.group('num') or '0') denom = m.group('denom') if denom: denominator = int(denom) else: denominator = 1 decimal = m.group('decimal') if decimal: scale = 10**len(decimal) numerator = numerator * scale + int(decimal) denominator *= scale exp = m.group('exp') if exp: exp = int(exp) if exp >= 0: numerator *= 10**exp else: denominator *= 10**-exp if m.group('sign') == '-': numerator = -numerator else: raise TypeError("argument should be a string " "or a Rational instance") elif type(numerator) is int is type(denominator): pass elif (isinstance(numerator, numbers.Rational) and isinstance(denominator, numbers.Rational)): numerator, denominator = ( numerator.numerator * denominator.denominator, denominator.numerator * numerator.denominator ) else: raise TypeError("both arguments should be " "Rational instances") if denominator == 0: raise ZeroDivisionError('Fraction(%s, 0)' % numerator) if _normalize: if type(numerator) is int is type(denominator): g = math.gcd(numerator, denominator) if denominator < 0: g = -g else: g = _gcd(numerator, denominator) numerator //= g denominator //= g self._numerator = numerator self._denominator = denominator return self @classmethod def from_float(cls, f): """Converts a finite float to a rational number, exactly. Beware that Fraction.from_float(0.3) != Fraction(3, 10). """ if isinstance(f, numbers.Integral): return cls(f) elif not isinstance(f, float): raise TypeError("%s.from_float() only takes floats, not %r (%s)" % (cls.__name__, f, type(f).__name__)) return cls(*f.as_integer_ratio()) @classmethod def from_decimal(cls, dec): """Converts a finite Decimal instance to a rational number, exactly.""" from decimal import Decimal if isinstance(dec, numbers.Integral): dec = Decimal(int(dec)) elif not isinstance(dec, Decimal): raise TypeError( "%s.from_decimal() only takes Decimals, not %r (%s)" % (cls.__name__, dec, type(dec).__name__)) return cls(*dec.as_integer_ratio()) def as_integer_ratio(self): """Return the integer ratio as a tuple. Return a tuple of two integers, whose ratio is equal to the Fraction and with a positive denominator. """ return (self._numerator, self._denominator) def limit_denominator(self, max_denominator=1000000): """Closest Fraction to self with denominator at most max_denominator. >>> Fraction('3.141592653589793').limit_denominator(10) Fraction(22, 7) >>> Fraction('3.141592653589793').limit_denominator(100) Fraction(311, 99) >>> Fraction(4321, 8765).limit_denominator(10000) Fraction(4321, 8765) """ if max_denominator < 1: raise ValueError("max_denominator should be at least 1") if self._denominator <= max_denominator: return Fraction(self) p0, q0, p1, q1 = 0, 1, 1, 0 n, d = self._numerator, self._denominator while True: a = n//d q2 = q0+a*q1 if q2 > max_denominator: break p0, q0, p1, q1 = p1, q1, p0+a*p1, q2 n, d = d, n-a*d k = (max_denominator-q0)//q1 bound1 = Fraction(p0+k*p1, q0+k*q1) bound2 = Fraction(p1, q1) if abs(bound2 - self) <= abs(bound1-self): return bound2 else: return bound1 @property def numerator(a): return a._numerator @property def denominator(a): return a._denominator def __repr__(self): """repr(self)""" return '%s(%s, %s)' % (self.__class__.__name__, self._numerator, self._denominator) def __str__(self): """str(self)""" if self._denominator == 1: return str(self._numerator) else: return '%s/%s' % (self._numerator, self._denominator) def _operator_fallbacks(monomorphic_operator, fallback_operator): """Generates forward and reverse operators given a purely-rational operator and a function from the operator module. Use this like: __op__, __rop__ = _operator_fallbacks(just_rational_op, operator.op) In general, we want to implement the arithmetic operations so that mixed-mode operations either call an implementation whose author knew about the types of both arguments, or convert both to the nearest built in type and do the operation there. In Fraction, that means that we define __add__ and __radd__ as: def __add__(self, other): if isinstance(other, (int, Fraction)): return Fraction(self.numerator * other.denominator + other.numerator * self.denominator, self.denominator * other.denominator) elif isinstance(other, float): return float(self) + other elif isinstance(other, complex): return complex(self) + other return NotImplemented def __radd__(self, other): if isinstance(other, numbers.Rational): return Fraction(self.numerator * other.denominator + other.numerator * self.denominator, self.denominator * other.denominator) elif isinstance(other, Real): return float(other) + float(self) elif isinstance(other, Complex): return complex(other) + complex(self) return NotImplemented There are 5 different cases for a mixed-type addition on Fraction. I'll refer to all of the above code that doesn't refer to Fraction, float, or complex as "boilerplate". 'r' will be an instance of Fraction, which is a subtype of Rational (r : Fraction <: Rational), and b : B <: Complex. The first three involve 'r + b': 1. If B <: Fraction, int, float, or complex, we handle that specially, and all is well. 2. If Fraction falls back to the boilerplate code, and it were to return a value from __add__, we'd miss the possibility that B defines a more intelligent __radd__, so the boilerplate should return NotImplemented from __add__. In particular, we don't handle Rational here, even though we could get an exact answer, in case the other type wants to do something special. 3. If B <: Fraction, Python tries B.__radd__ before Fraction.__add__. This is ok, because it was implemented with knowledge of Fraction, so it can handle those instances before delegating to Real or Complex. The next two situations describe 'b + r'. We assume that b didn't know about Fraction in its implementation, and that it uses similar boilerplate code: 4. If B <: Rational, then __radd_ converts both to the builtin rational type (hey look, that's us) and proceeds. 5. Otherwise, __radd__ tries to find the nearest common base ABC, and fall back to its builtin type. Since this class doesn't subclass a concrete type, there's no implementation to fall back to, so we need to try as hard as possible to return an actual value, or the user will get a TypeError. """ def forward(a, b): if isinstance(b, (int, Fraction)): return monomorphic_operator(a, b) elif isinstance(b, float): return fallback_operator(float(a), b) elif isinstance(b, complex): return fallback_operator(complex(a), b) else: return NotImplemented forward.__name__ = '__' + fallback_operator.__name__ + '__' forward.__doc__ = monomorphic_operator.__doc__ def reverse(b, a): if isinstance(a, numbers.Rational): return monomorphic_operator(a, b) elif isinstance(a, numbers.Real): return fallback_operator(float(a), float(b)) elif isinstance(a, numbers.Complex): return fallback_operator(complex(a), complex(b)) else: return NotImplemented reverse.__name__ = '__r' + fallback_operator.__name__ + '__' reverse.__doc__ = monomorphic_operator.__doc__ return forward, reverse def _add(a, b): """a + b""" da, db = a.denominator, b.denominator return Fraction(a.numerator * db + b.numerator * da, da * db) __add__, __radd__ = _operator_fallbacks(_add, operator.add) def _sub(a, b): """a - b""" da, db = a.denominator, b.denominator return Fraction(a.numerator * db - b.numerator * da, da * db) __sub__, __rsub__ = _operator_fallbacks(_sub, operator.sub) def _mul(a, b): """a * b""" return Fraction(a.numerator * b.numerator, a.denominator * b.denominator) __mul__, __rmul__ = _operator_fallbacks(_mul, operator.mul) def _div(a, b): """a / b""" return Fraction(a.numerator * b.denominator, a.denominator * b.numerator) __truediv__, __rtruediv__ = _operator_fallbacks(_div, operator.truediv) def _floordiv(a, b): """a // b""" return (a.numerator * b.denominator) // (a.denominator * b.numerator) __floordiv__, __rfloordiv__ = _operator_fallbacks(_floordiv, operator.floordiv) def _divmod(a, b): """(a // b, a % b)""" da, db = a.denominator, b.denominator div, n_mod = divmod(a.numerator * db, da * b.numerator) return div, Fraction(n_mod, da * db) __divmod__, __rdivmod__ = _operator_fallbacks(_divmod, divmod) def _mod(a, b): """a % b""" da, db = a.denominator, b.denominator return Fraction((a.numerator * db) % (b.numerator * da), da * db) __mod__, __rmod__ = _operator_fallbacks(_mod, operator.mod) def __pow__(a, b): """a ** b If b is not an integer, the result will be a float or complex since roots are generally irrational. If b is an integer, the result will be rational. """ if isinstance(b, numbers.Rational): if b.denominator == 1: power = b.numerator if power >= 0: return Fraction(a._numerator ** power, a._denominator ** power, _normalize=False) elif a._numerator >= 0: return Fraction(a._denominator ** -power, a._numerator ** -power, _normalize=False) else: return Fraction((-a._denominator) ** -power, (-a._numerator) ** -power, _normalize=False) else: return float(a) ** float(b) else: return float(a) ** b def __rpow__(b, a): """a ** b""" if b._denominator == 1 and b._numerator >= 0: return a ** b._numerator if isinstance(a, numbers.Rational): return Fraction(a.numerator, a.denominator) ** b if b._denominator == 1: return a ** b._numerator return a ** float(b) def __pos__(a): """+a: Coerces a subclass instance to Fraction""" return Fraction(a._numerator, a._denominator, _normalize=False) def __neg__(a): """-a""" return Fraction(-a._numerator, a._denominator, _normalize=False) def __abs__(a): """abs(a)""" return Fraction(abs(a._numerator), a._denominator, _normalize=False) def __trunc__(a): """trunc(a)""" if a._numerator < 0: return -(-a._numerator // a._denominator) else: return a._numerator // a._denominator def __floor__(a): """math.floor(a)""" return a.numerator // a.denominator def __ceil__(a): """math.ceil(a)""" return -(-a.numerator // a.denominator) def __round__(self, ndigits=None): """round(self, ndigits) Rounds half toward even. """ if ndigits is None: floor, remainder = divmod(self.numerator, self.denominator) if remainder * 2 < self.denominator: return floor elif remainder * 2 > self.denominator: return floor + 1 elif floor % 2 == 0: return floor else: return floor + 1 shift = 10**abs(ndigits) if ndigits > 0: return Fraction(round(self * shift), shift) else: return Fraction(round(self / shift) * shift) def __hash__(self): """hash(self)""" dinv = pow(self._denominator, _PyHASH_MODULUS - 2, _PyHASH_MODULUS) if not dinv: hash_ = _PyHASH_INF else: hash_ = abs(self._numerator) * dinv % _PyHASH_MODULUS result = hash_ if self >= 0 else -hash_ return -2 if result == -1 else result def __eq__(a, b): """a == b""" if type(b) is int: return a._numerator == b and a._denominator == 1 if isinstance(b, numbers.Rational): return (a._numerator == b.numerator and a._denominator == b.denominator) if isinstance(b, numbers.Complex) and b.imag == 0: b = b.real if isinstance(b, float): if math.isnan(b) or math.isinf(b): return 0.0 == b else: return a == a.from_float(b) else: return NotImplemented def _richcmp(self, other, op): """Helper for comparison operators, for internal use only. Implement comparison between a Rational instance `self`, and either another Rational instance or a float `other`. If `other` is not a Rational instance or a float, return NotImplemented. `op` should be one of the six standard comparison operators. """ if isinstance(other, numbers.Rational): return op(self._numerator * other.denominator, self._denominator * other.numerator) if isinstance(other, float): if math.isnan(other) or math.isinf(other): return op(0.0, other) else: return op(self, self.from_float(other)) else: return NotImplemented def __lt__(a, b): """a < b""" return a._richcmp(b, operator.lt) def __gt__(a, b): """a > b""" return a._richcmp(b, operator.gt) def __le__(a, b): """a <= b""" return a._richcmp(b, operator.le) def __ge__(a, b): """a >= b""" return a._richcmp(b, operator.ge) def __bool__(a): """a != 0""" return bool(a._numerator) def __reduce__(self): return (self.__class__, (str(self),)) def __copy__(self): if type(self) == Fraction: return self return self.__class__(self._numerator, self._denominator) def __deepcopy__(self, memo): if type(self) == Fraction: return self return self.__class__(self._numerator, self._denominator) """An FTP client class and some helper functions. Based on RFC 959: File Transfer Protocol (FTP), by J. Postel and J. Reynolds Example: >>> from ftplib import FTP >>> ftp = FTP('ftp.python.org') >>> ftp.login() '230 Guest login ok, access restrictions apply.' >>> ftp.retrlines('LIST') total 9 drwxr-xr-x 8 root wheel 1024 Jan 3 1994 . drwxr-xr-x 8 root wheel 1024 Jan 3 1994 .. drwxr-xr-x 2 root wheel 1024 Jan 3 1994 bin drwxr-xr-x 2 root wheel 1024 Jan 3 1994 etc d-wxrwxr-x 2 ftp wheel 1024 Sep 5 13:43 incoming drwxr-xr-x 2 root wheel 1024 Nov 17 1993 lib drwxr-xr-x 6 1094 wheel 1024 Sep 13 19:07 pub drwxr-xr-x 3 root wheel 1024 Jan 3 1994 usr -rw-r--r-- 1 root root 312 Aug 1 1994 welcome.msg '226 Transfer complete.' >>> ftp.quit() '221 Goodbye.' >>> A nice test that reveals some of the network dialogue would be: python ftplib.py -d localhost -l -p -l """ import sys import socket from socket import _GLOBAL_DEFAULT_TIMEOUT __all__ = ["FTP", "error_reply", "error_temp", "error_perm", "error_proto", "all_errors"] MSG_OOB = 0x1 FTP_PORT = 21 MAXLINE = 8192 class Error(Exception): pass class error_reply(Error): pass class error_temp(Error): pass class error_perm(Error): pass class error_proto(Error): pass all_errors = (Error, OSError, EOFError) CRLF = '\r\n' B_CRLF = b'\r\n' class FTP: '''An FTP client class. To create a connection, call the class using these arguments: host, user, passwd, acct, timeout The first four arguments are all strings, and have default value ''. timeout must be numeric and defaults to None if not passed, meaning that no timeout will be set on any ftp socket(s) If a timeout is passed, then this is now the default timeout for all ftp socket operations for this instance. Then use self.connect() with optional host and port argument. To download a file, use ftp.retrlines('RETR ' + filename), or ftp.retrbinary() with slightly different arguments. To upload a file, use ftp.storlines() or ftp.storbinary(), which have an open file as argument (see their definitions below for details). The download/upload functions first issue appropriate TYPE and PORT or PASV commands. ''' debugging = 0 host = '' port = FTP_PORT maxline = MAXLINE sock = None file = None welcome = None passiveserver = 1 encoding = "latin-1" trust_server_pasv_ipv4_address = False def __init__(self, host='', user='', passwd='', acct='', timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None): self.source_address = source_address self.timeout = timeout if host: self.connect(host) if user: self.login(user, passwd, acct) def __enter__(self): return self def __exit__(self, *args): if self.sock is not None: try: self.quit() except (OSError, EOFError): pass finally: if self.sock is not None: self.close() def connect(self, host='', port=0, timeout=-999, source_address=None): '''Connect to host. Arguments are: - host: hostname to connect to (string, default previous host) - port: port to connect to (integer, default previous port) - timeout: the timeout to set against the ftp socket(s) - source_address: a 2-tuple (host, port) for the socket to bind to as its source address before connecting. ''' if host != '': self.host = host if port > 0: self.port = port if timeout != -999: self.timeout = timeout if source_address is not None: self.source_address = source_address sys.audit("ftplib.connect", self, self.host, self.port) self.sock = socket.create_connection((self.host, self.port), self.timeout, source_address=self.source_address) self.af = self.sock.family self.file = self.sock.makefile('r', encoding=self.encoding) self.welcome = self.getresp() return self.welcome def getwelcome(self): '''Get the welcome message from the server. (this is read and squirreled away by connect())''' if self.debugging: print('*welcome*', self.sanitize(self.welcome)) return self.welcome def set_debuglevel(self, level): '''Set the debugging level. The required argument level means: 0: no debugging output (default) 1: print commands and responses but not body text etc. 2: also print raw lines read and sent before stripping CR/LF''' self.debugging = level debug = set_debuglevel def set_pasv(self, val): '''Use passive or active mode for data transfers. With a false argument, use the normal PORT mode, With a true argument, use the PASV command.''' self.passiveserver = val def sanitize(self, s): if s[:5] in {'pass ', 'PASS '}: i = len(s.rstrip('\r\n')) s = s[:5] + '*'*(i-5) + s[i:] return repr(s) def putline(self, line): if '\r' in line or '\n' in line: raise ValueError('an illegal newline character should not be contained') sys.audit("ftplib.sendcmd", self, line) line = line + CRLF if self.debugging > 1: print('*put*', self.sanitize(line)) self.sock.sendall(line.encode(self.encoding)) def putcmd(self, line): if self.debugging: print('*cmd*', self.sanitize(line)) self.putline(line) def getline(self): line = self.file.readline(self.maxline + 1) if len(line) > self.maxline: raise Error("got more than %d bytes" % self.maxline) if self.debugging > 1: print('*get*', self.sanitize(line)) if not line: raise EOFError if line[-2:] == CRLF: line = line[:-2] elif line[-1:] in CRLF: line = line[:-1] return line def getmultiline(self): line = self.getline() if line[3:4] == '-': code = line[:3] while 1: nextline = self.getline() line = line + ('\n' + nextline) if nextline[:3] == code and \ nextline[3:4] != '-': break return line def getresp(self): resp = self.getmultiline() if self.debugging: print('*resp*', self.sanitize(resp)) self.lastresp = resp[:3] c = resp[:1] if c in {'1', '2', '3'}: return resp if c == '4': raise error_temp(resp) if c == '5': raise error_perm(resp) raise error_proto(resp) def voidresp(self): """Expect a response beginning with '2'.""" resp = self.getresp() if resp[:1] != '2': raise error_reply(resp) return resp def abort(self): '''Abort a file transfer. Uses out-of-band data. This does not follow the procedure from the RFC to send Telnet IP and Synch; that doesn't seem to work with the servers I've tried. Instead, just send the ABOR command as OOB data.''' line = b'ABOR' + B_CRLF if self.debugging > 1: print('*put urgent*', self.sanitize(line)) self.sock.sendall(line, MSG_OOB) resp = self.getmultiline() if resp[:3] not in {'426', '225', '226'}: raise error_proto(resp) return resp def sendcmd(self, cmd): '''Send a command and return the response.''' self.putcmd(cmd) return self.getresp() def voidcmd(self, cmd): """Send a command and expect a response beginning with '2'.""" self.putcmd(cmd) return self.voidresp() def sendport(self, host, port): '''Send a PORT command with the current host and the given port number. ''' hbytes = host.split('.') pbytes = [repr(port//256), repr(port%256)] bytes = hbytes + pbytes cmd = 'PORT ' + ','.join(bytes) return self.voidcmd(cmd) def sendeprt(self, host, port): '''Send an EPRT command with the current host and the given port number.''' af = 0 if self.af == socket.AF_INET: af = 1 if self.af == socket.AF_INET6: af = 2 if af == 0: raise error_proto('unsupported address family') fields = ['', repr(af), host, repr(port), ''] cmd = 'EPRT ' + '|'.join(fields) return self.voidcmd(cmd) def makeport(self): '''Create a new socket and send a PORT command for it.''' sock = socket.create_server(("", 0), family=self.af, backlog=1) port = sock.getsockname()[1] host = self.sock.getsockname()[0] if self.af == socket.AF_INET: resp = self.sendport(host, port) else: resp = self.sendeprt(host, port) if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT: sock.settimeout(self.timeout) return sock def makepasv(self): """Internal: Does the PASV or EPSV handshake -> (address, port)""" if self.af == socket.AF_INET: untrusted_host, port = parse227(self.sendcmd('PASV')) if self.trust_server_pasv_ipv4_address: host = untrusted_host else: host = self.sock.getpeername()[0] else: host, port = parse229(self.sendcmd('EPSV'), self.sock.getpeername()) return host, port def ntransfercmd(self, cmd, rest=None): """Initiate a transfer over the data connection. If the transfer is active, send a port command and the transfer command, and accept the connection. If the server is passive, send a pasv command, connect to it, and start the transfer command. Either way, return the socket for the connection and the expected size of the transfer. The expected size may be None if it could not be determined. Optional `rest' argument can be a string that is sent as the argument to a REST command. This is essentially a server marker used to tell the server to skip over any data up to the given marker. """ size = None if self.passiveserver: host, port = self.makepasv() conn = socket.create_connection((host, port), self.timeout, source_address=self.source_address) try: if rest is not None: self.sendcmd("REST %s" % rest) resp = self.sendcmd(cmd) if resp[0] == '2': resp = self.getresp() if resp[0] != '1': raise error_reply(resp) except: conn.close() raise else: with self.makeport() as sock: if rest is not None: self.sendcmd("REST %s" % rest) resp = self.sendcmd(cmd) if resp[0] == '2': resp = self.getresp() if resp[0] != '1': raise error_reply(resp) conn, sockaddr = sock.accept() if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT: conn.settimeout(self.timeout) if resp[:3] == '150': size = parse150(resp) return conn, size def transfercmd(self, cmd, rest=None): """Like ntransfercmd() but returns only the socket.""" return self.ntransfercmd(cmd, rest)[0] def login(self, user = '', passwd = '', acct = ''): '''Login, default anonymous.''' if not user: user = 'anonymous' if not passwd: passwd = '' if not acct: acct = '' if user == 'anonymous' and passwd in {'', '-'}: passwd = passwd + 'anonymous@' resp = self.sendcmd('USER ' + user) if resp[0] == '3': resp = self.sendcmd('PASS ' + passwd) if resp[0] == '3': resp = self.sendcmd('ACCT ' + acct) if resp[0] != '2': raise error_reply(resp) return resp def retrbinary(self, cmd, callback, blocksize=8192, rest=None): """Retrieve data in binary mode. A new port is created for you. Args: cmd: A RETR command. callback: A single parameter callable to be called on each block of data read. blocksize: The maximum number of bytes to read from the socket at one time. [default: 8192] rest: Passed to transfercmd(). [default: None] Returns: The response code. """ self.voidcmd('TYPE I') with self.transfercmd(cmd, rest) as conn: while 1: data = conn.recv(blocksize) if not data: break callback(data) if _SSLSocket is not None and isinstance(conn, _SSLSocket): conn.unwrap() return self.voidresp() def retrlines(self, cmd, callback = None): """Retrieve data in line mode. A new port is created for you. Args: cmd: A RETR, LIST, or NLST command. callback: An optional single parameter callable that is called for each line with the trailing CRLF stripped. [default: print_line()] Returns: The response code. """ if callback is None: callback = print_line resp = self.sendcmd('TYPE A') with self.transfercmd(cmd) as conn, \ conn.makefile('r', encoding=self.encoding) as fp: while 1: line = fp.readline(self.maxline + 1) if len(line) > self.maxline: raise Error("got more than %d bytes" % self.maxline) if self.debugging > 2: print('*retr*', repr(line)) if not line: break if line[-2:] == CRLF: line = line[:-2] elif line[-1:] == '\n': line = line[:-1] callback(line) if _SSLSocket is not None and isinstance(conn, _SSLSocket): conn.unwrap() return self.voidresp() def storbinary(self, cmd, fp, blocksize=8192, callback=None, rest=None): """Store a file in binary mode. A new port is created for you. Args: cmd: A STOR command. fp: A file-like object with a read(num_bytes) method. blocksize: The maximum data size to read from fp and send over the connection at once. [default: 8192] callback: An optional single parameter callable that is called on each block of data after it is sent. [default: None] rest: Passed to transfercmd(). [default: None] Returns: The response code. """ self.voidcmd('TYPE I') with self.transfercmd(cmd, rest) as conn: while 1: buf = fp.read(blocksize) if not buf: break conn.sendall(buf) if callback: callback(buf) if _SSLSocket is not None and isinstance(conn, _SSLSocket): conn.unwrap() return self.voidresp() def storlines(self, cmd, fp, callback=None): """Store a file in line mode. A new port is created for you. Args: cmd: A STOR command. fp: A file-like object with a readline() method. callback: An optional single parameter callable that is called on each line after it is sent. [default: None] Returns: The response code. """ self.voidcmd('TYPE A') with self.transfercmd(cmd) as conn: while 1: buf = fp.readline(self.maxline + 1) if len(buf) > self.maxline: raise Error("got more than %d bytes" % self.maxline) if not buf: break if buf[-2:] != B_CRLF: if buf[-1] in B_CRLF: buf = buf[:-1] buf = buf + B_CRLF conn.sendall(buf) if callback: callback(buf) if _SSLSocket is not None and isinstance(conn, _SSLSocket): conn.unwrap() return self.voidresp() def acct(self, password): '''Send new account name.''' cmd = 'ACCT ' + password return self.voidcmd(cmd) def nlst(self, *args): '''Return a list of files in a given directory (default the current).''' cmd = 'NLST' for arg in args: cmd = cmd + (' ' + arg) files = [] self.retrlines(cmd, files.append) return files def dir(self, *args): '''List a directory in long form. By default list current directory to stdout. Optional last argument is callback function; all non-empty arguments before it are concatenated to the LIST command. (This *should* only be used for a pathname.)''' cmd = 'LIST' func = None if args[-1:] and type(args[-1]) != type(''): args, func = args[:-1], args[-1] for arg in args: if arg: cmd = cmd + (' ' + arg) self.retrlines(cmd, func) def mlsd(self, path="", facts=[]): '''List a directory in a standardized format by using MLSD command (RFC-3659). If path is omitted the current directory is assumed. "facts" is a list of strings representing the type of information desired (e.g. ["type", "size", "perm"]). Return a generator object yielding a tuple of two elements for every file found in path. First element is the file name, the second one is a dictionary including a variable number of "facts" depending on the server and whether "facts" argument has been provided. ''' if facts: self.sendcmd("OPTS MLST " + ";".join(facts) + ";") if path: cmd = "MLSD %s" % path else: cmd = "MLSD" lines = [] self.retrlines(cmd, lines.append) for line in lines: facts_found, _, name = line.rstrip(CRLF).partition(' ') entry = {} for fact in facts_found[:-1].split(";"): key, _, value = fact.partition("=") entry[key.lower()] = value yield (name, entry) def rename(self, fromname, toname): '''Rename a file.''' resp = self.sendcmd('RNFR ' + fromname) if resp[0] != '3': raise error_reply(resp) return self.voidcmd('RNTO ' + toname) def delete(self, filename): '''Delete a file.''' resp = self.sendcmd('DELE ' + filename) if resp[:3] in {'250', '200'}: return resp else: raise error_reply(resp) def cwd(self, dirname): '''Change to a directory.''' if dirname == '..': try: return self.voidcmd('CDUP') except error_perm as msg: if msg.args[0][:3] != '500': raise elif dirname == '': dirname = '.' cmd = 'CWD ' + dirname return self.voidcmd(cmd) def size(self, filename): '''Retrieve the size of a file.''' resp = self.sendcmd('SIZE ' + filename) if resp[:3] == '213': s = resp[3:].strip() return int(s) def mkd(self, dirname): '''Make a directory, return its full pathname.''' resp = self.voidcmd('MKD ' + dirname) if not resp.startswith('257'): return '' return parse257(resp) def rmd(self, dirname): '''Remove a directory.''' return self.voidcmd('RMD ' + dirname) def pwd(self): '''Return current working directory.''' resp = self.voidcmd('PWD') if not resp.startswith('257'): return '' return parse257(resp) def quit(self): '''Quit, and close the connection.''' resp = self.voidcmd('QUIT') self.close() return resp def close(self): '''Close the connection without assuming anything about it.''' try: file = self.file self.file = None if file is not None: file.close() finally: sock = self.sock self.sock = None if sock is not None: sock.close() try: import ssl except ImportError: _SSLSocket = None else: _SSLSocket = ssl.SSLSocket class FTP_TLS(FTP): '''A FTP subclass which adds TLS support to FTP as described in RFC-4217. Connect as usual to port 21 implicitly securing the FTP control connection before authenticating. Securing the data connection requires user to explicitly ask for it by calling prot_p() method. Usage example: >>> from ftplib import FTP_TLS >>> ftps = FTP_TLS('ftp.python.org') >>> ftps.login() '230 Guest login ok, access restrictions apply.' >>> ftps.prot_p() '200 Protection level set to P' >>> ftps.retrlines('LIST') total 9 drwxr-xr-x 8 root wheel 1024 Jan 3 1994 . drwxr-xr-x 8 root wheel 1024 Jan 3 1994 .. drwxr-xr-x 2 root wheel 1024 Jan 3 1994 bin drwxr-xr-x 2 root wheel 1024 Jan 3 1994 etc d-wxrwxr-x 2 ftp wheel 1024 Sep 5 13:43 incoming drwxr-xr-x 2 root wheel 1024 Nov 17 1993 lib drwxr-xr-x 6 1094 wheel 1024 Sep 13 19:07 pub drwxr-xr-x 3 root wheel 1024 Jan 3 1994 usr -rw-r--r-- 1 root root 312 Aug 1 1994 welcome.msg '226 Transfer complete.' >>> ftps.quit() '221 Goodbye.' >>> ''' ssl_version = ssl.PROTOCOL_TLS_CLIENT def __init__(self, host='', user='', passwd='', acct='', keyfile=None, certfile=None, context=None, timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None): if context is not None and keyfile is not None: raise ValueError("context and keyfile arguments are mutually " "exclusive") if context is not None and certfile is not None: raise ValueError("context and certfile arguments are mutually " "exclusive") if keyfile is not None or certfile is not None: import warnings warnings.warn("keyfile and certfile are deprecated, use a " "custom context instead", DeprecationWarning, 2) self.keyfile = keyfile self.certfile = certfile if context is None: context = ssl._create_stdlib_context(self.ssl_version, certfile=certfile, keyfile=keyfile) self.context = context self._prot_p = False FTP.__init__(self, host, user, passwd, acct, timeout, source_address) def login(self, user='', passwd='', acct='', secure=True): if secure and not isinstance(self.sock, ssl.SSLSocket): self.auth() return FTP.login(self, user, passwd, acct) def auth(self): '''Set up secure control connection by using TLS/SSL.''' if isinstance(self.sock, ssl.SSLSocket): raise ValueError("Already using TLS") if self.ssl_version >= ssl.PROTOCOL_TLS: resp = self.voidcmd('AUTH TLS') else: resp = self.voidcmd('AUTH SSL') self.sock = self.context.wrap_socket(self.sock, server_hostname=self.host) self.file = self.sock.makefile(mode='r', encoding=self.encoding) return resp def ccc(self): '''Switch back to a clear-text control connection.''' if not isinstance(self.sock, ssl.SSLSocket): raise ValueError("not using TLS") resp = self.voidcmd('CCC') self.sock = self.sock.unwrap() return resp def prot_p(self): '''Set up secure data connection.''' self.voidcmd('PBSZ 0') resp = self.voidcmd('PROT P') self._prot_p = True return resp def prot_c(self): '''Set up clear text data connection.''' resp = self.voidcmd('PROT C') self._prot_p = False return resp def ntransfercmd(self, cmd, rest=None): conn, size = FTP.ntransfercmd(self, cmd, rest) if self._prot_p: conn = self.context.wrap_socket(conn, server_hostname=self.host) return conn, size def abort(self): line = b'ABOR' + B_CRLF self.sock.sendall(line) resp = self.getmultiline() if resp[:3] not in {'426', '225', '226'}: raise error_proto(resp) return resp __all__.append('FTP_TLS') all_errors = (Error, OSError, EOFError, ssl.SSLError) _150_re = None def parse150(resp): '''Parse the '150' response for a RETR request. Returns the expected transfer size or None; size is not guaranteed to be present in the 150 message. ''' if resp[:3] != '150': raise error_reply(resp) global _150_re if _150_re is None: import re _150_re = re.compile( r"150 .* \((\d+) bytes\)", re.IGNORECASE | re.ASCII) m = _150_re.match(resp) if not m: return None return int(m.group(1)) _227_re = None def parse227(resp): '''Parse the '227' response for a PASV request. Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)' Return ('host.addr.as.numbers', port if resp[:3] != '227': raise error_reply(resp) global _227_re if _227_re is None: import re _227_re = re.compile(r'(\d+),(\d+),(\d+),(\d+),(\d+),(\d+)', re.ASCII) m = _227_re.search(resp) if not m: raise error_proto(resp) numbers = m.groups() host = '.'.join(numbers[:4]) port = (int(numbers[4]) << 8) + int(numbers[5]) return host, port def parse229(resp, peer): '''Parse the '229' response for an EPSV request. Raises error_proto if it does not contain '(|||port|)' Return ('host.addr.as.numbers', port if resp[:3] != '229': raise error_reply(resp) left = resp.find('(') if left < 0: raise error_proto(resp) right = resp.find(')', left + 1) if right < 0: raise error_proto(resp) if resp[left + 1] != resp[right - 1]: raise error_proto(resp) parts = resp[left + 1:right].split(resp[left+1]) if len(parts) != 5: raise error_proto(resp) host = peer[0] port = int(parts[3]) return host, port def parse257(resp): '''Parse the '257' response for a MKD or PWD request. This is a response to a MKD or PWD request: a directory name. Returns the directoryname in the 257 reply.''' if resp[:3] != '257': raise error_reply(resp) if resp[3:5] != ' "': return '' dirname = '' i = 5 n = len(resp) while i < n: c = resp[i] i = i+1 if c == '"': if i >= n or resp[i] != '"': break i = i+1 dirname = dirname + c return dirname def print_line(line): '''Default retrlines callback to print a line.''' print(line) def ftpcp(source, sourcename, target, targetname = '', type = 'I'): '''Copy file from one FTP-instance to another.''' if not targetname: targetname = sourcename type = 'TYPE ' + type source.voidcmd(type) target.voidcmd(type) sourcehost, sourceport = parse227(source.sendcmd('PASV')) target.sendport(sourcehost, sourceport) treply = target.sendcmd('STOR ' + targetname) if treply[:3] not in {'125', '150'}: raise error_proto sreply = source.sendcmd('RETR ' + sourcename) if sreply[:3] not in {'125', '150'}: raise error_proto source.voidresp() target.voidresp() def test(): '''Test program. Usage: ftp [-d] [-r[file]] host [-l[dir]] [-d[dir]] [-p] [file] ... -d dir -l list -p password ''' if len(sys.argv) < 2: print(test.__doc__) sys.exit(0) import netrc debugging = 0 rcfile = None while sys.argv[1] == '-d': debugging = debugging+1 del sys.argv[1] if sys.argv[1][:2] == '-r': rcfile = sys.argv[1][2:] del sys.argv[1] host = sys.argv[1] ftp = FTP(host) ftp.set_debuglevel(debugging) userid = passwd = acct = '' try: netrcobj = netrc.netrc(rcfile) except OSError: if rcfile is not None: sys.stderr.write("Could not open account file" " -- using anonymous login.") else: try: userid, acct, passwd = netrcobj.authenticators(host) except KeyError: sys.stderr.write( "No account -- using anonymous login.") ftp.login(userid, passwd, acct) for file in sys.argv[2:]: if file[:2] == '-l': ftp.dir(file[2:]) elif file[:2] == '-d': cmd = 'CWD' if file[2:]: cmd = cmd + ' ' + file[2:] resp = ftp.sendcmd(cmd) elif file == '-p': ftp.set_pasv(not ftp.passiveserver) else: ftp.retrbinary('RETR ' + file, \ sys.stdout.write, 1024) ftp.quit() if __name__ == '__main__': test() """functools.py - Tools for working with functions and callable objects """ __all__ = ['update_wrapper', 'wraps', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES', 'total_ordering', 'cmp_to_key', 'lru_cache', 'reduce', 'partial', 'partialmethod', 'singledispatch', 'singledispatchmethod', "cached_property"] from abc import get_cache_token from collections import namedtuple from reprlib import recursive_repr from _thread import RLock WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__', '__annotations__') WRAPPER_UPDATES = ('__dict__',) def update_wrapper(wrapper, wrapped, assigned = WRAPPER_ASSIGNMENTS, updated = WRAPPER_UPDATES): """Update a wrapper function to look like the wrapped function wrapper is the function to be updated wrapped is the original function assigned is a tuple naming the attributes assigned directly from the wrapped function to the wrapper function (defaults to functools.WRAPPER_ASSIGNMENTS) updated is a tuple naming the attributes of the wrapper that are updated with the corresponding attribute from the wrapped function (defaults to functools.WRAPPER_UPDATES) """ for attr in assigned: try: value = getattr(wrapped, attr) except AttributeError: pass else: setattr(wrapper, attr, value) for attr in updated: getattr(wrapper, attr).update(getattr(wrapped, attr, {})) wrapper.__wrapped__ = wrapped return wrapper def wraps(wrapped, assigned = WRAPPER_ASSIGNMENTS, updated = WRAPPER_UPDATES): """Decorator factory to apply update_wrapper() to a wrapper function Returns a decorator that invokes update_wrapper() with the decorated function as the wrapper argument and the arguments to wraps() as the remaining arguments. Default arguments are as for update_wrapper(). This is a convenience function to simplify applying partial() to update_wrapper(). """ return partial(update_wrapper, wrapped=wrapped, assigned=assigned, updated=updated) def _gt_from_lt(self, other, NotImplemented=NotImplemented): 'Return a > b. Computed by @total_ordering from (not a < b) and (a != b).' op_result = self.__lt__(other) if op_result is NotImplemented: return op_result return not op_result and self != other def _le_from_lt(self, other, NotImplemented=NotImplemented): 'Return a <= b. Computed by @total_ordering from (a < b) or (a == b).' op_result = self.__lt__(other) return op_result or self == other def _ge_from_lt(self, other, NotImplemented=NotImplemented): 'Return a >= b. Computed by @total_ordering from (not a < b).' op_result = self.__lt__(other) if op_result is NotImplemented: return op_result return not op_result def _ge_from_le(self, other, NotImplemented=NotImplemented): 'Return a >= b. Computed by @total_ordering from (not a <= b) or (a == b).' op_result = self.__le__(other) if op_result is NotImplemented: return op_result return not op_result or self == other def _lt_from_le(self, other, NotImplemented=NotImplemented): 'Return a < b. Computed by @total_ordering from (a <= b) and (a != b).' op_result = self.__le__(other) if op_result is NotImplemented: return op_result return op_result and self != other def _gt_from_le(self, other, NotImplemented=NotImplemented): 'Return a > b. Computed by @total_ordering from (not a <= b).' op_result = self.__le__(other) if op_result is NotImplemented: return op_result return not op_result def _lt_from_gt(self, other, NotImplemented=NotImplemented): 'Return a < b. Computed by @total_ordering from (not a > b) and (a != b).' op_result = self.__gt__(other) if op_result is NotImplemented: return op_result return not op_result and self != other def _ge_from_gt(self, other, NotImplemented=NotImplemented): 'Return a >= b. Computed by @total_ordering from (a > b) or (a == b).' op_result = self.__gt__(other) return op_result or self == other def _le_from_gt(self, other, NotImplemented=NotImplemented): 'Return a <= b. Computed by @total_ordering from (not a > b).' op_result = self.__gt__(other) if op_result is NotImplemented: return op_result return not op_result def _le_from_ge(self, other, NotImplemented=NotImplemented): 'Return a <= b. Computed by @total_ordering from (not a >= b) or (a == b).' op_result = self.__ge__(other) if op_result is NotImplemented: return op_result return not op_result or self == other def _gt_from_ge(self, other, NotImplemented=NotImplemented): 'Return a > b. Computed by @total_ordering from (a >= b) and (a != b).' op_result = self.__ge__(other) if op_result is NotImplemented: return op_result return op_result and self != other def _lt_from_ge(self, other, NotImplemented=NotImplemented): 'Return a < b. Computed by @total_ordering from (not a >= b).' op_result = self.__ge__(other) if op_result is NotImplemented: return op_result return not op_result _convert = { '__lt__': [('__gt__', _gt_from_lt), ('__le__', _le_from_lt), ('__ge__', _ge_from_lt)], '__le__': [('__ge__', _ge_from_le), ('__lt__', _lt_from_le), ('__gt__', _gt_from_le)], '__gt__': [('__lt__', _lt_from_gt), ('__ge__', _ge_from_gt), ('__le__', _le_from_gt)], '__ge__': [('__le__', _le_from_ge), ('__gt__', _gt_from_ge), ('__lt__', _lt_from_ge)] } def total_ordering(cls): """Class decorator that fills in missing ordering methods""" roots = {op for op in _convert if getattr(cls, op, None) is not getattr(object, op, None)} if not roots: raise ValueError('must define at least one ordering operation: < > <= >=') root = max(roots) for opname, opfunc in _convert[root]: if opname not in roots: opfunc.__name__ = opname setattr(cls, opname, opfunc) return cls def cmp_to_key(mycmp): """Convert a cmp= function into a key= function""" class K(object): __slots__ = ['obj'] def __init__(self, obj): self.obj = obj def __lt__(self, other): return mycmp(self.obj, other.obj) < 0 def __gt__(self, other): return mycmp(self.obj, other.obj) > 0 def __eq__(self, other): return mycmp(self.obj, other.obj) == 0 def __le__(self, other): return mycmp(self.obj, other.obj) <= 0 def __ge__(self, other): return mycmp(self.obj, other.obj) >= 0 __hash__ = None return K try: from _functools import cmp_to_key except ImportError: pass _initial_missing = object() def reduce(function, sequence, initial=_initial_missing): """ reduce(function, sequence[, initial]) -> value Apply a function of two arguments cumulatively to the items of a sequence, from left to right, so as to reduce the sequence to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5). If initial is present, it is placed before the items of the sequence in the calculation, and serves as a default when the sequence is empty. """ it = iter(sequence) if initial is _initial_missing: try: value = next(it) except StopIteration: raise TypeError("reduce() of empty sequence with no initial value") from None else: value = initial for element in it: value = function(value, element) return value try: from _functools import reduce except ImportError: pass class partial: """New function with partial application of the given arguments and keywords. """ __slots__ = "func", "args", "keywords", "__dict__", "__weakref__" def __new__(cls, func, /, *args, **keywords): if not callable(func): raise TypeError("the first argument must be callable") if hasattr(func, "func"): args = func.args + args keywords = {**func.keywords, **keywords} func = func.func self = super(partial, cls).__new__(cls) self.func = func self.args = args self.keywords = keywords return self def __call__(self, /, *args, **keywords): keywords = {**self.keywords, **keywords} return self.func(*self.args, *args, **keywords) @recursive_repr() def __repr__(self): qualname = type(self).__qualname__ args = [repr(self.func)] args.extend(repr(x) for x in self.args) args.extend(f"{k}={v!r}" for (k, v) in self.keywords.items()) if type(self).__module__ == "functools": return f"functools.{qualname}({', '.join(args)})" return f"{qualname}({', '.join(args)})" def __reduce__(self): return type(self), (self.func,), (self.func, self.args, self.keywords or None, self.__dict__ or None) def __setstate__(self, state): if not isinstance(state, tuple): raise TypeError("argument to __setstate__ must be a tuple") if len(state) != 4: raise TypeError(f"expected 4 items in state, got {len(state)}") func, args, kwds, namespace = state if (not callable(func) or not isinstance(args, tuple) or (kwds is not None and not isinstance(kwds, dict)) or (namespace is not None and not isinstance(namespace, dict))): raise TypeError("invalid partial state") args = tuple(args) if kwds is None: kwds = {} elif type(kwds) is not dict: kwds = dict(kwds) if namespace is None: namespace = {} self.__dict__ = namespace self.func = func self.args = args self.keywords = kwds try: from _functools import partial except ImportError: pass class partialmethod(object): """Method descriptor with partial application of the given arguments and keywords. Supports wrapping existing descriptors and handles non-descriptor callables as instance methods. """ def __init__(*args, **keywords): if len(args) >= 2: self, func, *args = args elif not args: raise TypeError("descriptor '__init__' of partialmethod " "needs an argument") elif 'func' in keywords: func = keywords.pop('func') self, *args = args import warnings warnings.warn("Passing 'func' as keyword argument is deprecated", DeprecationWarning, stacklevel=2) else: raise TypeError("type 'partialmethod' takes at least one argument, " "got %d" % (len(args)-1)) args = tuple(args) if not callable(func) and not hasattr(func, "__get__"): raise TypeError("{!r} is not callable or a descriptor" .format(func)) if isinstance(func, partialmethod): self.func = func.func self.args = func.args + args self.keywords = {**func.keywords, **keywords} else: self.func = func self.args = args self.keywords = keywords __init__.__text_signature__ = '($self, func, /, *args, **keywords)' def __repr__(self): args = ", ".join(map(repr, self.args)) keywords = ", ".join("{}={!r}".format(k, v) for k, v in self.keywords.items()) format_string = "{module}.{cls}({func}, {args}, {keywords})" return format_string.format(module=self.__class__.__module__, cls=self.__class__.__qualname__, func=self.func, args=args, keywords=keywords) def _make_unbound_method(self): def _method(cls_or_self, /, *args, **keywords): keywords = {**self.keywords, **keywords} return self.func(cls_or_self, *self.args, *args, **keywords) _method.__isabstractmethod__ = self.__isabstractmethod__ _method._partialmethod = self return _method def __get__(self, obj, cls=None): get = getattr(self.func, "__get__", None) result = None if get is not None: new_func = get(obj, cls) if new_func is not self.func: result = partial(new_func, *self.args, **self.keywords) try: result.__self__ = new_func.__self__ except AttributeError: pass if result is None: result = self._make_unbound_method().__get__(obj, cls) return result @property def __isabstractmethod__(self): return getattr(self.func, "__isabstractmethod__", False) def _unwrap_partial(func): while isinstance(func, partial): func = func.func return func _CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"]) class _HashedSeq(list): """ This class guarantees that hash() will be called no more than once per element. This is important because the lru_cache() will hash the key multiple times on a cache miss. """ __slots__ = 'hashvalue' def __init__(self, tup, hash=hash): self[:] = tup self.hashvalue = hash(tup) def __hash__(self): return self.hashvalue def _make_key(args, kwds, typed, kwd_mark = (object(),), fasttypes = {int, str}, tuple=tuple, type=type, len=len): """Make a cache key from optionally typed positional and keyword arguments The key is constructed in a way that is flat as possible rather than as a nested structure that would take more memory. If there is only a single argument and its data type is known to cache its hash value, then that argument is returned without a wrapper. This saves space and improves lookup speed. """ key = args if kwds: key += kwd_mark for item in kwds.items(): key += item if typed: key += tuple(type(v) for v in args) if kwds: key += tuple(type(v) for v in kwds.values()) elif len(key) == 1 and type(key[0]) in fasttypes: return key[0] return _HashedSeq(key) def lru_cache(maxsize=128, typed=False): """Least-recently-used cache decorator. If *maxsize* is set to None, the LRU features are disabled and the cache can grow without bound. If *typed* is True, arguments of different types will be cached separately. For example, f(3.0) and f(3) will be treated as distinct calls with distinct results. Arguments to the cached function must be hashable. View the cache statistics named tuple (hits, misses, maxsize, currsize) with f.cache_info(). Clear the cache and statistics with f.cache_clear(). Access the underlying function with f.__wrapped__. See: http://en.wikipedia.org/wiki/Cache_replacement_policies """ if isinstance(maxsize, int): if maxsize < 0: maxsize = 0 elif callable(maxsize) and isinstance(typed, bool): user_function, maxsize = maxsize, 128 wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) return update_wrapper(wrapper, user_function) elif maxsize is not None: raise TypeError( 'Expected first argument to be an integer, a callable, or None') def decorating_function(user_function): wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) return update_wrapper(wrapper, user_function) return decorating_function def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo): sentinel = object() make_key = _make_key PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 cache = {} hits = misses = 0 full = False cache_get = cache.get cache_len = cache.__len__ lock = RLock() root = [] root[:] = [root, root, None, None] if maxsize == 0: def wrapper(*args, **kwds): nonlocal misses misses += 1 result = user_function(*args, **kwds) return result elif maxsize is None: def wrapper(*args, **kwds): nonlocal hits, misses key = make_key(args, kwds, typed) result = cache_get(key, sentinel) if result is not sentinel: hits += 1 return result misses += 1 result = user_function(*args, **kwds) cache[key] = result return result else: def wrapper(*args, **kwds): nonlocal root, hits, misses, full key = make_key(args, kwds, typed) with lock: link = cache_get(key) if link is not None: link_prev, link_next, _key, result = link link_prev[NEXT] = link_next link_next[PREV] = link_prev last = root[PREV] last[NEXT] = root[PREV] = link link[PREV] = last link[NEXT] = root hits += 1 return result misses += 1 result = user_function(*args, **kwds) with lock: if key in cache: pass elif full: oldroot = root oldroot[KEY] = key oldroot[RESULT] = result root = oldroot[NEXT] oldkey = root[KEY] oldresult = root[RESULT] root[KEY] = root[RESULT] = None del cache[oldkey] cache[key] = oldroot else: last = root[PREV] link = [last, root, key, result] last[NEXT] = root[PREV] = cache[key] = link full = (cache_len() >= maxsize) return result def cache_info(): """Report cache statistics""" with lock: return _CacheInfo(hits, misses, maxsize, cache_len()) def cache_clear(): """Clear the cache and cache statistics""" nonlocal hits, misses, full with lock: cache.clear() root[:] = [root, root, None, None] hits = misses = 0 full = False wrapper.cache_info = cache_info wrapper.cache_clear = cache_clear return wrapper try: from _functools import _lru_cache_wrapper except ImportError: pass def _c3_merge(sequences): """Merges MROs in *sequences* to a single MRO using the C3 algorithm. Adapted from http://www.python.org/download/releases/2.3/mro/. """ result = [] while True: sequences = [s for s in sequences if s] if not sequences: return result for s1 in sequences: candidate = s1[0] for s2 in sequences: if candidate in s2[1:]: candidate = None break else: break if candidate is None: raise RuntimeError("Inconsistent hierarchy") result.append(candidate) for seq in sequences: if seq[0] == candidate: del seq[0] def _c3_mro(cls, abcs=None): """Computes the method resolution order using extended C3 linearization. If no *abcs* are given, the algorithm works exactly like the built-in C3 linearization used for method resolution. If given, *abcs* is a list of abstract base classes that should be inserted into the resulting MRO. Unrelated ABCs are ignored and don't end up in the result. The algorithm inserts ABCs where their functionality is introduced, i.e. issubclass(cls, abc) returns True for the class itself but returns False for all its direct base classes. Implicit ABCs for a given class (either registered or inferred from the presence of a special method like __len__) are inserted directly after the last ABC explicitly listed in the MRO of said class. If two implicit ABCs end up next to each other in the resulting MRO, their ordering depends on the order of types in *abcs*. """ for i, base in enumerate(reversed(cls.__bases__)): if hasattr(base, '__abstractmethods__'): boundary = len(cls.__bases__) - i break else: boundary = 0 abcs = list(abcs) if abcs else [] explicit_bases = list(cls.__bases__[:boundary]) abstract_bases = [] other_bases = list(cls.__bases__[boundary:]) for base in abcs: if issubclass(cls, base) and not any( issubclass(b, base) for b in cls.__bases__ ): abstract_bases.append(base) for base in abstract_bases: abcs.remove(base) explicit_c3_mros = [_c3_mro(base, abcs=abcs) for base in explicit_bases] abstract_c3_mros = [_c3_mro(base, abcs=abcs) for base in abstract_bases] other_c3_mros = [_c3_mro(base, abcs=abcs) for base in other_bases] return _c3_merge( [[cls]] + explicit_c3_mros + abstract_c3_mros + other_c3_mros + [explicit_bases] + [abstract_bases] + [other_bases] ) def _compose_mro(cls, types): """Calculates the method resolution order for a given class *cls*. Includes relevant abstract base classes (with their respective bases) from the *types* iterable. Uses a modified C3 linearization algorithm. """ bases = set(cls.__mro__) def is_related(typ): return (typ not in bases and hasattr(typ, '__mro__') and issubclass(cls, typ)) types = [n for n in types if is_related(n)] def is_strict_base(typ): for other in types: if typ != other and typ in other.__mro__: return True return False types = [n for n in types if not is_strict_base(n)] type_set = set(types) mro = [] for typ in types: found = [] for sub in typ.__subclasses__(): if sub not in bases and issubclass(cls, sub): found.append([s for s in sub.__mro__ if s in type_set]) if not found: mro.append(typ) continue found.sort(key=len, reverse=True) for sub in found: for subcls in sub: if subcls not in mro: mro.append(subcls) return _c3_mro(cls, abcs=mro) def _find_impl(cls, registry): """Returns the best matching implementation from *registry* for type *cls*. Where there is no registered implementation for a specific type, its method resolution order is used to find a more generic implementation. Note: if *registry* does not contain an implementation for the base *object* type, this function may return None. """ mro = _compose_mro(cls, registry.keys()) match = None for t in mro: if match is not None: if (t in registry and t not in cls.__mro__ and match not in cls.__mro__ and not issubclass(match, t)): raise RuntimeError("Ambiguous dispatch: {} or {}".format( match, t)) break if t in registry: match = t return registry.get(match) def singledispatch(func): """Single-dispatch generic function decorator. Transforms a function into a generic function, which can have different behaviours depending upon the type of its first argument. The decorated function acts as the default implementation, and additional implementations can be registered using the register() attribute of the generic function. """ import types, weakref registry = {} dispatch_cache = weakref.WeakKeyDictionary() cache_token = None def dispatch(cls): """generic_func.dispatch(cls) -> <function implementation> Runs the dispatch algorithm to return the best available implementation for the given *cls* registered on *generic_func*. """ nonlocal cache_token if cache_token is not None: current_token = get_cache_token() if cache_token != current_token: dispatch_cache.clear() cache_token = current_token try: impl = dispatch_cache[cls] except KeyError: try: impl = registry[cls] except KeyError: impl = _find_impl(cls, registry) dispatch_cache[cls] = impl return impl def register(cls, func=None): """generic_func.register(cls, func) -> func Registers a new implementation for the given *cls* on a *generic_func*. """ nonlocal cache_token if func is None: if isinstance(cls, type): return lambda f: register(cls, f) ann = getattr(cls, '__annotations__', {}) if not ann: raise TypeError( f"Invalid first argument to `register()`: {cls!r}. " f"Use either `@register(some_class)` or plain `@register` " f"on an annotated function." ) func = cls from typing import get_type_hints argname, cls = next(iter(get_type_hints(func).items())) if not isinstance(cls, type): raise TypeError( f"Invalid annotation for {argname!r}. " f"{cls!r} is not a class." ) registry[cls] = func if cache_token is None and hasattr(cls, '__abstractmethods__'): cache_token = get_cache_token() dispatch_cache.clear() return func def wrapper(*args, **kw): if not args: raise TypeError(f'{funcname} requires at least ' '1 positional argument') return dispatch(args[0].__class__)(*args, **kw) funcname = getattr(func, '__name__', 'singledispatch function') registry[object] = func wrapper.register = register wrapper.dispatch = dispatch wrapper.registry = types.MappingProxyType(registry) wrapper._clear_cache = dispatch_cache.clear update_wrapper(wrapper, func) return wrapper class singledispatchmethod: """Single-dispatch generic method descriptor. Supports wrapping existing descriptors and handles non-descriptor callables as instance methods. """ def __init__(self, func): if not callable(func) and not hasattr(func, "__get__"): raise TypeError(f"{func!r} is not callable or a descriptor") self.dispatcher = singledispatch(func) self.func = func def register(self, cls, method=None): """generic_method.register(cls, func) -> func Registers a new implementation for the given *cls* on a *generic_method*. """ return self.dispatcher.register(cls, func=method) def __get__(self, obj, cls=None): def _method(*args, **kwargs): method = self.dispatcher.dispatch(args[0].__class__) return method.__get__(obj, cls)(*args, **kwargs) _method.__isabstractmethod__ = self.__isabstractmethod__ _method.register = self.register update_wrapper(_method, self.func) return _method @property def __isabstractmethod__(self): return getattr(self.func, '__isabstractmethod__', False) _NOT_FOUND = object() class cached_property: def __init__(self, func): self.func = func self.attrname = None self.__doc__ = func.__doc__ self.lock = RLock() def __set_name__(self, owner, name): if self.attrname is None: self.attrname = name elif name != self.attrname: raise TypeError( "Cannot assign the same cached_property to two different names " f"({self.attrname!r} and {name!r})." ) def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val