import os import re import sys from distutils import log import xml.dom.pulldom import shlex import locale import unicodedata import warnings from setuptools.compat import unicode, bytes from xml.sax.saxutils import unescape try: import urlparse except ImportError: import urllib.parse as urlparse from subprocess import Popen as _Popen, PIPE as _PIPE #NOTE: Use of the command line options require SVN 1.3 or newer (December 2005) # and SVN 1.3 hasn't been supported by the developers since mid 2008. #subprocess is called several times with shell=(sys.platform=='win32') #see the follow for more information: # http://bugs.python.org/issue8557 # http://stackoverflow.com/questions/5658622/ # python-subprocess-popen-environment-path def _run_command(args, stdout=_PIPE, stderr=_PIPE, encoding=None, stream=0): #regarding the shell argument, see: http://bugs.python.org/issue8557 try: proc = _Popen(args, stdout=stdout, stderr=stderr, shell=(sys.platform == 'win32')) data = proc.communicate()[stream] except OSError: return 1, '' #doubled checked and data = decode_as_string(data, encoding) #communciate calls wait() return proc.returncode, data def _get_entry_schedule(entry): schedule = entry.getElementsByTagName('schedule')[0] return "".join([t.nodeValue for t in schedule.childNodes if t.nodeType == t.TEXT_NODE]) def _get_target_property(target): property_text = target.getElementsByTagName('property')[0] return "".join([t.nodeValue for t in property_text.childNodes if t.nodeType == t.TEXT_NODE]) def _get_xml_data(decoded_str): if sys.version_info < (3, 0): #old versions want an encoded string data = decoded_str.encode('utf-8') else: data = decoded_str return data def joinpath(prefix, *suffix): if not prefix or prefix == '.': return os.path.join(*suffix) return os.path.join(prefix, *suffix) def decode_as_string(text, encoding=None): """ Decode the console or file output explicitly using getpreferredencoding. The text paraemeter should be a encoded string, if not no decode occurs If no encoding is given, getpreferredencoding is used. If encoding is specified, that is used instead. This would be needed for SVN --xml output. Unicode is explicitly put in composed NFC form. --xml should be UTF-8 (SVN Issue 2938) the discussion on the Subversion DEV List from 2007 seems to indicate the same. """ #text should be a byte string if encoding is None: encoding = locale.getpreferredencoding() if not isinstance(text, unicode): text = text.decode(encoding) text = unicodedata.normalize('NFC', text) return text def parse_dir_entries(decoded_str): '''Parse the entries from a recursive info xml''' doc = xml.dom.pulldom.parseString(_get_xml_data(decoded_str)) entries = list() for event, node in doc: if event == 'START_ELEMENT' and node.nodeName == 'entry': doc.expandNode(node) if not _get_entry_schedule(node).startswith('delete'): entries.append((node.getAttribute('path'), node.getAttribute('kind'))) return entries[1:] # do not want the root directory def parse_externals_xml(decoded_str, prefix=''): '''Parse a propget svn:externals xml''' prefix = os.path.normpath(prefix) prefix = os.path.normcase(prefix) doc = xml.dom.pulldom.parseString(_get_xml_data(decoded_str)) externals = list() for event, node in doc: if event == 'START_ELEMENT' and node.nodeName == 'target': doc.expandNode(node) path = os.path.normpath(node.getAttribute('path')) if os.path.normcase(path).startswith(prefix): path = path[len(prefix)+1:] data = _get_target_property(node) #data should be decoded already for external in parse_external_prop(data): externals.append(joinpath(path, external)) return externals # do not want the root directory def parse_external_prop(lines): """ Parse the value of a retrieved svn:externals entry. possible token setups (with quotng and backscaping in laters versions) URL[@#] EXT_FOLDERNAME [-r#] URL EXT_FOLDERNAME EXT_FOLDERNAME [-r#] URL """ externals = [] for line in lines.splitlines(): line = line.lstrip() # there might be a "\ " if not line: continue if sys.version_info < (3, 0): #shlex handles NULLs just fine and shlex in 2.7 tries to encode #as ascii automatiically line = line.encode('utf-8') line = shlex.split(line) if sys.version_info < (3, 0): line = [x.decode('utf-8') for x in line] #EXT_FOLDERNAME is either the first or last depending on where #the URL falls if urlparse.urlsplit(line[-1])[0]: external = line[0] else: external = line[-1] external = decode_as_string(external, encoding="utf-8") externals.append(os.path.normpath(external)) return externals def parse_prop_file(filename, key): found = False f = open(filename, 'rt') data = '' try: for line in iter(f.readline, ''): # can't use direct iter! parts = line.split() if len(parts) == 2: kind, length = parts data = f.read(int(length)) if kind == 'K' and data == key: found = True elif kind == 'V' and found: break finally: f.close() return data class SvnInfo(object): ''' Generic svn_info object. No has little knowledge of how to extract information. Use cls.load to instatiate according svn version. Paths are not filesystem encoded. ''' @staticmethod def get_svn_version(): code, data = _run_command(['svn', '--version', '--quiet']) if code == 0 and data: return data.strip() else: return '' #svnversion return values (previous implementations return max revision) # 4123:4168 mixed revision working copy # 4168M modified working copy # 4123S switched working copy # 4123:4168MS mixed revision, modified, switched working copy revision_re = re.compile(r'(?:([\-0-9]+):)?(\d+)([a-z]*)\s*$', re.I) @classmethod def load(cls, dirname=''): normdir = os.path.normpath(dirname) code, data = _run_command(['svn', 'info', normdir]) has_svn = os.path.isdir(os.path.join(normdir, '.svn')) svn_version = tuple(cls.get_svn_version().split('.')) try: base_svn_version = tuple(int(x) for x in svn_version[:2]) except ValueError: base_svn_version = tuple() if has_svn and (code or not base_svn_version or base_svn_version < (1, 3)): log.warn('Fallback onto .svn parsing') warnings.warn(("No SVN 1.3+ command found: falling back " "on pre 1.7 .svn parsing"), DeprecationWarning) return SvnFileInfo(dirname) elif not has_svn: return SvnInfo(dirname) elif base_svn_version < (1, 5): return Svn13Info(dirname) else: return Svn15Info(dirname) def __init__(self, path=''): self.path = path self._entries = None self._externals = None def get_revision(self): 'Retrieve the directory revision informatino using svnversion' code, data = _run_command(['svnversion', '-c', self.path]) if code: log.warn("svnversion failed") return 0 parsed = self.revision_re.match(data) if parsed: return int(parsed.group(2)) else: return 0 @property def entries(self): if self._entries is None: self._entries = self.get_entries() return self._entries @property def externals(self): if self._externals is None: self._externals = self.get_externals() return self._externals def iter_externals(self): ''' Iterate over the svn:external references in the repository path. ''' for item in self.externals: yield item def iter_files(self): ''' Iterate over the non-deleted file entries in the repository path ''' for item, kind in self.entries: if kind.lower() == 'file': yield item def iter_dirs(self, include_root=True): ''' Iterate over the non-deleted file entries in the repository path ''' if include_root: yield self.path for item, kind in self.entries: if kind.lower() == 'dir': yield item def get_entries(self): return [] def get_externals(self): return [] class Svn13Info(SvnInfo): def get_entries(self): code, data = _run_command(['svn', 'info', '-R', '--xml', self.path], encoding="utf-8") if code: log.debug("svn info failed") return [] return parse_dir_entries(data) def get_externals(self): #Previous to 1.5 --xml was not supported for svn propget and the -R #output format breaks the shlex compatible semantics. cmd = ['svn', 'propget', 'svn:externals'] result = [] for folder in self.iter_dirs(): code, lines = _run_command(cmd + [folder], encoding="utf-8") if code != 0: log.warn("svn propget failed") return [] #lines should a str for external in parse_external_prop(lines): if folder: external = os.path.join(folder, external) result.append(os.path.normpath(external)) return result class Svn15Info(Svn13Info): def get_externals(self): cmd = ['svn', 'propget', 'svn:externals', self.path, '-R', '--xml'] code, lines = _run_command(cmd, encoding="utf-8") if code: log.debug("svn propget failed") return [] return parse_externals_xml(lines, prefix=os.path.abspath(self.path)) class SvnFileInfo(SvnInfo): def __init__(self, path=''): super(SvnFileInfo, self).__init__(path) self._directories = None self._revision = None def _walk_svn(self, base): entry_file = joinpath(base, '.svn', 'entries') if os.path.isfile(entry_file): entries = SVNEntriesFile.load(base) yield (base, False, entries.parse_revision()) for path in entries.get_undeleted_records(): path = decode_as_string(path) path = joinpath(base, path) if os.path.isfile(path): yield (path, True, None) elif os.path.isdir(path): for item in self._walk_svn(path): yield item def _build_entries(self): entries = list() rev = 0 for path, isfile, dir_rev in self._walk_svn(self.path): if isfile: entries.append((path, 'file')) else: entries.append((path, 'dir')) rev = max(rev, dir_rev) self._entries = entries self._revision = rev def get_entries(self): if self._entries is None: self._build_entries() return self._entries def get_revision(self): if self._revision is None: self._build_entries() return self._revision def get_externals(self): prop_files = [['.svn', 'dir-prop-base'], ['.svn', 'dir-props']] externals = [] for dirname in self.iter_dirs(): prop_file = None for rel_parts in prop_files: filename = joinpath(dirname, *rel_parts) if os.path.isfile(filename): prop_file = filename if prop_file is not None: ext_prop = parse_prop_file(prop_file, 'svn:externals') #ext_prop should be utf-8 coming from svn:externals ext_prop = decode_as_string(ext_prop, encoding="utf-8") externals.extend(parse_external_prop(ext_prop)) return externals def svn_finder(dirname=''): #combined externals due to common interface #combined externals and entries due to lack of dir_props in 1.7 info = SvnInfo.load(dirname) for path in info.iter_files(): yield path for path in info.iter_externals(): sub_info = SvnInfo.load(path) for sub_path in sub_info.iter_files(): yield sub_path class SVNEntriesFile(object): def __init__(self, data): self.data = data @classmethod def load(class_, base): filename = os.path.join(base, '.svn', 'entries') f = open(filename) try: result = SVNEntriesFile.read(f) finally: f.close() return result @classmethod def read(class_, fileobj): data = fileobj.read() is_xml = data.startswith(' revision_line_number and section[revision_line_number]) ] return rev_numbers def get_undeleted_records(self): undeleted = lambda s: s and s[0] and (len(s) < 6 or s[5] != 'delete') result = [ section[0] for section in self.get_sections() if undeleted(section) ] return result class SVNEntriesFileXML(SVNEntriesFile): def is_valid(self): return True def get_url(self): "Get repository URL" urlre = re.compile('url="([^"]+)"') return urlre.search(self.data).group(1) def parse_revision_numbers(self): revre = re.compile(r'committed-rev="(\d+)"') return [ int(m.group(1)) for m in revre.finditer(self.data) ] def get_undeleted_records(self): entries_pattern = \ re.compile(r'name="([^"]+)"(?![^>]+deleted="true")', re.I) results = [ unescape(match.group(1)) for match in entries_pattern.finditer(self.data) ] return results if __name__ == '__main__': for name in svn_finder(sys.argv[1]): print(name)