1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
|
import os
import re
import sys
from distutils import log
import xml.dom.pulldom
import shlex
import locale
import unicodedata
from setuptools.compat import unicode, bytes
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
from subprocess import Popen as _Popen, PIPE as _PIPE
#NOTE: Use of the command line options require SVN 1.3 or newer (December 2005)
# and SVN 1.3 hasn't been supported by the developers since mid 2008.
#subprocess is called several times with shell=(sys.platform=='win32')
#see the follow for more information:
# http://bugs.python.org/issue8557
# http://stackoverflow.com/questions/5658622/
# python-subprocess-popen-environment-path
def _run_command(args, stdout=_PIPE, stderr=_PIPE):
#regarding the shell argument, see: http://bugs.python.org/issue8557
try:
args = [fsdecode(x) for x in args]
proc = _Popen(args, stdout=stdout, stderr=stderr,
shell=(sys.platform == 'win32'))
data = proc.communicate()[0]
except OSError:
return 1, ''
data = consoledecode(data)
#communciate calls wait()
return proc.returncode, data
def _get_entry_schedule(entry):
schedule = entry.getElementsByTagName('schedule')[0]
return "".join([t.nodeValue
for t in schedule.childNodes
if t.nodeType == t.TEXT_NODE])
def _get_target_property(target):
property_text = target.getElementsByTagName('property')[0]
return "".join([t.nodeValue
for t in property_text.childNodes
if t.nodeType == t.TEXT_NODE])
def _get_xml_data(decoded_str):
if sys.version_info < (3, 0):
#old versions want an encoded string
data = decoded_str.encode('utf-8')
else:
data = decoded_str
return data
def joinpath(prefix, suffix):
if not prefix or prefix == '.':
return suffix
return os.path.join(prefix, suffix)
def fsencode(path):
"Path must be unicode or in file system encoding already"
encoding = sys.getfilesystemencoding()
if isinstance(path, unicode):
path = path.encode()
elif not isinstance(path, bytes):
raise TypeError('%s is not a string or byte type'
% type(path).__name__)
#getfilessystemencoding doesn't have the mac-roman issue
if encoding == 'utf-8' and sys.platform == 'darwin':
path = path.decode('utf-8')
path = unicodedata.normalize('NFD', path)
path = path.encode('utf-8')
return path
def fsdecode(path):
"Path must be unicode or in file system encoding already"
encoding = sys.getfilesystemencoding()
if isinstance(path, bytes):
path = path.decode(encoding)
elif not isinstance(path, unicode):
raise TypeError('%s is not a byte type'
% type(path).__name__)
return unicodedata.normalize('NFC', path)
def consoledecode(text):
encoding = locale.getpreferredencoding()
return text.decode(encoding)
def parse_dir_entries(decoded_str):
'''Parse the entries from a recursive info xml'''
doc = xml.dom.pulldom.parseString(_get_xml_data(decoded_str))
entries = list()
for event, node in doc:
if event == 'START_ELEMENT' and node.nodeName == 'entry':
doc.expandNode(node)
if not _get_entry_schedule(node).startswith('delete'):
entries.append((node.getAttribute('path'),
node.getAttribute('kind')))
return entries[1:] # do not want the root directory
def parse_externals_xml(decoded_str, prefix=''):
'''Parse a propget svn:externals xml'''
prefix = os.path.normpath(prefix)
doc = xml.dom.pulldom.parseString(_get_xml_data(decoded_str))
externals = list()
for event, node in doc:
if event == 'START_ELEMENT' and node.nodeName == 'target':
doc.expandNode(node)
path = os.path.normpath(node.getAttribute('path'))
if os.path.normcase(path).startswith(prefix):
path = path[len(prefix)+1:]
data = _get_target_property(node)
for external in parse_external_prop(data):
externals.append(joinpath(path, external))
return externals # do not want the root directory
def parse_external_prop(lines):
"""
Parse the value of a retrieved svn:externals entry.
possible token setups (with quotng and backscaping in laters versions)
URL[@#] EXT_FOLDERNAME
[-r#] URL EXT_FOLDERNAME
EXT_FOLDERNAME [-r#] URL
"""
externals = []
for line in lines.splitlines():
line = line.lstrip() #there might be a "\ "
if not line:
continue
if sys.version_info < (3, 0):
#shlex handles NULLs just fine and shlex in 2.7 tries to encode
#as ascii automatiically
line = line.encode('utf-8')
line = shlex.split(line)
if sys.version_info < (3, 0):
line = [x.decode('utf-8') for x in line]
#EXT_FOLDERNAME is either the first or last depending on where
#the URL falls
if urlparse.urlsplit(line[-1])[0]:
external = line[0]
else:
external = line[-1]
externals.append(os.path.normpath(external))
return externals
class SvnInfo(object):
'''
Generic svn_info object. No has little knowledge of how to extract
information. Use cls.load to instatiate according svn version.
Paths are not filesystem encoded.
'''
@staticmethod
def get_svn_version():
code, data = _run_command(['svn', '--version', '--quiet'])
if code == 0 and data:
return unicode(data).strip()
else:
return unicode('')
#svnversion return values (previous implementations return max revision)
# 4123:4168 mixed revision working copy
# 4168M modified working copy
# 4123S switched working copy
# 4123:4168MS mixed revision, modified, switched working copy
revision_re = re.compile(r'(?:([\-0-9]+):)?(\d+)([a-z]*)\s*$', re.I)
@classmethod
def load(cls, dirname=''):
code, data = _run_command(['svn', 'info', os.path.normpath(dirname)])
svn_version = tuple(cls.get_svn_version().split('.'))
base_svn_version = tuple(int(x) for x in svn_version[:2])
if code and base_svn_version:
#Not an SVN repository or compatible one
return SvnInfo(dirname)
elif base_svn_version < (1, 3):
log.warn('Insufficent version of SVN found')
return SvnInfo(dirname)
elif base_svn_version < (1, 5):
return Svn13Info(dirname)
else:
return Svn15Info(dirname)
def __init__(self, path=''):
self.path = path
self._entries = None
self._externals = None
def get_revision(self):
'Retrieve the directory revision informatino using svnversion'
code, data = _run_command(['svnversion', '-c', self.path])
if code:
log.warn("svnversion failed")
return 0
parsed = self.revision_re.match(data)
if parsed:
return int(parsed.group(2))
else:
return 0
@property
def entries(self):
if self._entries is None:
self._entries = self.get_entries()
return self._entries
@property
def externals(self):
if self._externals is None:
self._externals = self.get_externals()
return self._externals
def iter_externals(self):
'''
Iterate over the svn:external references in the repository path.
'''
for item in self.externals:
yield item
def iter_files(self):
'''
Iterate over the non-deleted file entries in the repository path
'''
for item, kind in self.entries:
if kind.lower()=='file':
yield item
def iter_dirs(self, include_root=True):
'''
Iterate over the non-deleted file entries in the repository path
'''
if include_root:
yield self.path
for item, kind in self.entries:
if kind.lower()=='dir':
yield item
def get_entries(self):
return []
def get_externals(self):
return []
class Svn13Info(SvnInfo):
def get_entries(self):
code, data = _run_command(['svn', 'info', '-R', '--xml', self.path])
if code:
log.debug("svn info failed")
return []
return parse_dir_entries(data)
def get_externals(self):
#Previous to 1.5 --xml was not supported for svn propget and the -R
#output format breaks the shlex compatible semantics.
cmd = ['svn', 'propget', 'svn:externals']
result = []
for folder in self.iter_dirs():
code, lines = _run_command(cmd + [folder])
if code != 0:
log.warn("svn propget failed")
return []
for external in parse_external_prop(lines):
if folder:
external = os.path.join(folder, external)
result.append(os.path.normpath(external))
return result
class Svn15Info(Svn13Info):
def get_externals(self):
cmd = ['svn', 'propget', 'svn:externals', self.path, '-R', '--xml']
code, lines = _run_command(cmd)
if code:
log.debug("svn propget failed")
return []
return parse_externals_xml(lines, prefix=os.path.abspath(self.path))
def svn_finder(dirname=''):
#combined externals due to common interface
#combined externals and entries due to lack of dir_props in 1.7
info = SvnInfo.load(dirname)
for path in info.iter_files():
yield fsencode(path)
for path in info.iter_externals():
sub_info = SvnInfo.load(path)
for sub_path in sub_info.iter_files():
yield fsencode(sub_path)
if __name__ == '__main__':
for name in svn_finder(sys.argv[1]):
print(name)
|