import os
import re
import sys
import codecs
from distutils import log
from xml.sax.saxutils import unescape
import xml.dom.pulldom
#requires python >= 2.4
from subprocess import Popen as _Popen, PIPE as _PIPE
#NOTE: Use of the command line options
# require SVN 1.3 or newer (December 2005)
# and SVN 1.3 hsan't been supported by the
# developers since mid 2008.
#subprocess is called several times with shell=(sys.platform=='win32')
#see the follow for more information:
# http://bugs.python.org/issue8557
# http://stackoverflow.com/questions/5658622/
# python-subprocess-popen-environment-path
def _run_command(args, stdout=_PIPE, stderr=_PIPE):
#regarding the shell argument, see: http://bugs.python.org/issue8557
proc = _Popen(args, stdout=stdout, stderr=stderr,
shell=(sys.platform=='win32'))
data = proc.communicate()[0]
#TODO: this is probably NOT always utf-8
try:
data = unicode(data, encoding='utf-8')
except NameError:
data = str(data, encoding='utf-8')
return proc.returncode, data
#svnversion return values (previous implementations return max revision)
# 4123:4168 mixed revision working copy
# 4168M modified working copy
# 4123S switched working copy
# 4123:4168MS mixed revision, modified, switched working copy
_SVN_VER_RE = re.compile(r'(?:(\d+):)?(\d+)([a-z]*)\s*$', re.I)
def parse_revision(path):
code, data = _run_command(['svnversion', path])
if code:
log.warn("svnversion failed")
return []
parsed = _SVN_VER_RE.match(data)
if parsed:
try:
#No max needed this command summarizes working copy since 1.0
return int(parsed.group(2))
except ValueError:
#This should only happen if the revision is WAY too big.
pass
return 0
def parse_dir_entries(path):
code, data = _run_command(['svn', 'info',
'--depth', 'immediates', '--xml', path])
if code:
log.warn("svn info failed")
return []
doc = xml.dom.pulldom.parseString(data)
entries = list()
for event, node in doc:
if event=='START_ELEMENT' and node.nodeName=='entry':
doc.expandNode(node)
entries.append(node)
if entries:
return [
_get_entry_name(element)
for element in entries[1:]
if _get_entry_schedule(element).lower() != 'deleted'
]
else:
return []
def _get_entry_name(entry):
return entry.getAttribute('path')
def _get_entry_schedule(entry):
schedule = entry.getElementsByTagName('schedule')[0]
return "".join([t.nodeValue for t in schedule.childNodes
if t.nodeType == t.TEXT_NODE])
#--xml wasn't supported until 1.5.x
#-R without --xml parses a bit funny
def parse_externals(path):
try:
_, lines = _run_command(['svn',
'propget', 'svn:externals', path])
if code:
log.warn("svn propget failed")
return []
lines = [line for line in lines.splitlines() if line]
except ValueError:
lines = []
externals = []
for line in lines:
line = line.split()
if not line:
continue
#TODO: urlparse?
if "://" in line[-1] or ":\\\\" in line[-1]:
externals.append(line[0])
else:
externals.append(line[-1])
return externals
#TODO add the text entry back, and make its use dependent on the
# non existence of svn?
class SVNEntries(object):
svn_tool_version = ''
def __init__(self, path, data):
self.path = path
self.data = data
if not self.svn_tool_version:
self.svn_tool_version = self.get_svn_tool_version()
@staticmethod
def get_svn_tool_version():
_, data = _run_command(['svn', '--version', '--quiet'])
if data:
return data.strip()
else:
return ''
@classmethod
def load_dir(class_, base):
filename = os.path.join(base, '.svn', 'entries')
f = open(filename)
result = SVNEntries.read(f, base)
f.close()
return result
@classmethod
def read(class_, file, path=None):
data = file.read()
if data.startswith('revision_line_number
and section[revision_line_number]
]
return rev_numbers
def get_undeleted_records(self):
#Note for self this skip and only returns the children
undeleted = lambda s: s and s[0] and (len(s) < 6 or s[5] != 'delete')
result = [
section[0]
for section in self.get_sections()
if undeleted(section)
]
return result
class SVNEntriesXML(SVNEntries):
def is_valid(self):
return True
def parse_revision_numbers(self):
revre = re.compile(r'committed-rev="(\d+)"')
return [
int(m.group(1))
for m in revre.finditer(self.data)
if m.group(1)
]
def get_undeleted_records(self):
entries_pattern = re.compile(r'name="([^"]+)"(?![^>]+deleted="true")',
re.I)
results = [
unescape(match.group(1))
for match in entries_pattern.finditer(self.data)
if match.group(1)
]
return results
import xml.dom.pulldom
class SVNEntriesCMD(SVNEntries):
#
#
#
#
# ...
# ...
#
#
# ...
#
#
# ...
#
#
# ...
#
entrypathre = re.compile(r']*path="(\.+)">', re.I)
entryre = re.compile(r'', re.DOTALL or re.I)
namere = re.compile(r'(.*?)', re.I)
def __get_cached_entries(self):
return self.entries
def is_valid(self):
return bool(self.get_entries())
def get_dir_data(self):
#This returns the info entry for the directory ONLY
return self.get_entries()[0]
def get_entries(self):
#regarding the shell argument, see: http://bugs.python.org/issue8557
_, data = _run_command(['svn', 'info',
'--depth', 'immediates', '--xml', self.path])
doc = xml.dom.pulldom.parseString(data)
self.entries = list()
for event, node in doc:
if event=='START_ELEMENT' and node.nodeName=='entry':
doc.expandNode(node)
self.entries.append(node)
self.get_entries = self.__get_cached_entries
return self.entries
def __get_cached_revision(self):
return self.revision
def parse_revision(self):
self.revision = parse_revision(self.path)
self.parse_revision = self.__get_cached_revision
return self.revision
def get_undeleted_records(self):
#NOTE: Need to parse entities?
if not self.is_valid():
return list()
else:
return [
_get_entry_name(element)
for element in self.get_entries()[1:]
if _get_entry_schedule(element).lower() != 'deleted'
]
def _get_externals_data(self, filename):
#othewise will be called twice.
if os.path.basename(filename).lower() != 'dir-props':
return ''
try:
_, lines = _run_command(['svn',
'propget', 'svn:externals', self.path])
lines = [line for line in lines.splitlines() if line]
return lines
except ValueError:
return ''