diff -r 5ff1fc726848 -r c6bca38c1cbf eggs/py-1.4.0-py2.6.egg/py/_path/svnurl.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/eggs/py-1.4.0-py2.6.egg/py/_path/svnurl.py Sat Jan 08 11:20:57 2011 +0530 @@ -0,0 +1,378 @@ +""" +module defining a subversion path object based on the external +command 'svn'. This modules aims to work with svn 1.3 and higher +but might also interact well with earlier versions. +""" + +import os, sys, time, re +import py +from py import path, process +from py._path import common +from py._path import svnwc as svncommon +from py._path.cacheutil import BuildcostAccessCache, AgingCache + +DEBUG=False + +class SvnCommandPath(svncommon.SvnPathBase): + """ path implementation that offers access to (possibly remote) subversion + repositories. """ + + _lsrevcache = BuildcostAccessCache(maxentries=128) + _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0) + + def __new__(cls, path, rev=None, auth=None): + self = object.__new__(cls) + if isinstance(path, cls): + rev = path.rev + auth = path.auth + path = path.strpath + svncommon.checkbadchars(path) + path = path.rstrip('/') + self.strpath = path + self.rev = rev + self.auth = auth + return self + + def __repr__(self): + if self.rev == -1: + return 'svnurl(%r)' % self.strpath + else: + return 'svnurl(%r, %r)' % (self.strpath, self.rev) + + def _svnwithrev(self, cmd, *args): + """ execute an svn command, append our own url and revision """ + if self.rev is None: + return self._svnwrite(cmd, *args) + else: + args = ['-r', self.rev] + list(args) + return self._svnwrite(cmd, *args) + + def _svnwrite(self, cmd, *args): + """ execute an svn command, append our own url """ + l = ['svn %s' % cmd] + args = ['"%s"' % self._escape(item) for item in args] + l.extend(args) + l.append('"%s"' % self._encodedurl()) + # fixing the locale because we can't otherwise parse + string = " ".join(l) + if DEBUG: + print("execing %s" % string) + out = self._svncmdexecauth(string) + return out + + def _svncmdexecauth(self, cmd): + """ execute an svn command 'as is' """ + cmd = svncommon.fixlocale() + cmd + if self.auth is not None: + cmd += ' ' + self.auth.makecmdoptions() + return self._cmdexec(cmd) + + def _cmdexec(self, cmd): + try: + out = process.cmdexec(cmd) + except py.process.cmdexec.Error: + e = sys.exc_info()[1] + if (e.err.find('File Exists') != -1 or + e.err.find('File already exists') != -1): + raise py.error.EEXIST(self) + raise + return out + + def _svnpopenauth(self, cmd): + """ execute an svn command, return a pipe for reading stdin """ + cmd = svncommon.fixlocale() + cmd + if self.auth is not None: + cmd += ' ' + self.auth.makecmdoptions() + return self._popen(cmd) + + def _popen(self, cmd): + return os.popen(cmd) + + def _encodedurl(self): + return self._escape(self.strpath) + + def _norev_delentry(self, path): + auth = self.auth and self.auth.makecmdoptions() or None + self._lsnorevcache.delentry((str(path), auth)) + + def open(self, mode='r'): + """ return an opened file with the given mode. """ + if mode not in ("r", "rU",): + raise ValueError("mode %r not supported" % (mode,)) + assert self.check(file=1) # svn cat returns an empty file otherwise + if self.rev is None: + return self._svnpopenauth('svn cat "%s"' % ( + self._escape(self.strpath), )) + else: + return self._svnpopenauth('svn cat -r %s "%s"' % ( + self.rev, self._escape(self.strpath))) + + def dirpath(self, *args, **kwargs): + """ return the directory path of the current path joined + with any given path arguments. + """ + l = self.strpath.split(self.sep) + if len(l) < 4: + raise py.error.EINVAL(self, "base is not valid") + elif len(l) == 4: + return self.join(*args, **kwargs) + else: + return self.new(basename='').join(*args, **kwargs) + + # modifying methods (cache must be invalidated) + def mkdir(self, *args, **kwargs): + """ create & return the directory joined with args. + pass a 'msg' keyword argument to set the commit message. + """ + commit_msg = kwargs.get('msg', "mkdir by py lib invocation") + createpath = self.join(*args) + createpath._svnwrite('mkdir', '-m', commit_msg) + self._norev_delentry(createpath.dirpath()) + return createpath + + def copy(self, target, msg='copied by py lib invocation'): + """ copy path to target with checkin message msg.""" + if getattr(target, 'rev', None) is not None: + raise py.error.EINVAL(target, "revisions are immutable") + self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg, + self._escape(self), self._escape(target))) + self._norev_delentry(target.dirpath()) + + def rename(self, target, msg="renamed by py lib invocation"): + """ rename this path to target with checkin message msg. """ + if getattr(self, 'rev', None) is not None: + raise py.error.EINVAL(self, "revisions are immutable") + self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %( + msg, self._escape(self), self._escape(target))) + self._norev_delentry(self.dirpath()) + self._norev_delentry(self) + + def remove(self, rec=1, msg='removed by py lib invocation'): + """ remove a file or directory (or a directory tree if rec=1) with +checkin message msg.""" + if self.rev is not None: + raise py.error.EINVAL(self, "revisions are immutable") + self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self))) + self._norev_delentry(self.dirpath()) + + def export(self, topath): + """ export to a local path + + topath should not exist prior to calling this, returns a + py.path.local instance + """ + topath = py.path.local(topath) + args = ['"%s"' % (self._escape(self),), + '"%s"' % (self._escape(topath),)] + if self.rev is not None: + args = ['-r', str(self.rev)] + args + self._svncmdexecauth('svn export %s' % (' '.join(args),)) + return topath + + def ensure(self, *args, **kwargs): + """ ensure that an args-joined path exists (by default as + a file). If you specify a keyword argument 'dir=True' + then the path is forced to be a directory path. + """ + if getattr(self, 'rev', None) is not None: + raise py.error.EINVAL(self, "revisions are immutable") + target = self.join(*args) + dir = kwargs.get('dir', 0) + for x in target.parts(reverse=True): + if x.check(): + break + else: + raise py.error.ENOENT(target, "has not any valid base!") + if x == target: + if not x.check(dir=dir): + raise dir and py.error.ENOTDIR(x) or py.error.EISDIR(x) + return x + tocreate = target.relto(x) + basename = tocreate.split(self.sep, 1)[0] + tempdir = py.path.local.mkdtemp() + try: + tempdir.ensure(tocreate, dir=dir) + cmd = 'svn import -m "%s" "%s" "%s"' % ( + "ensure %s" % self._escape(tocreate), + self._escape(tempdir.join(basename)), + x.join(basename)._encodedurl()) + self._svncmdexecauth(cmd) + self._norev_delentry(x) + finally: + tempdir.remove() + return target + + # end of modifying methods + def _propget(self, name): + res = self._svnwithrev('propget', name) + return res[:-1] # strip trailing newline + + def _proplist(self): + res = self._svnwithrev('proplist') + lines = res.split('\n') + lines = [x.strip() for x in lines[1:]] + return svncommon.PropListDict(self, lines) + + def info(self): + """ return an Info structure with svn-provided information. """ + parent = self.dirpath() + nameinfo_seq = parent._listdir_nameinfo() + bn = self.basename + for name, info in nameinfo_seq: + if name == bn: + return info + raise py.error.ENOENT(self) + + + def _listdir_nameinfo(self): + """ return sequence of name-info directory entries of self """ + def builder(): + try: + res = self._svnwithrev('ls', '-v') + except process.cmdexec.Error: + e = sys.exc_info()[1] + if e.err.find('non-existent in that revision') != -1: + raise py.error.ENOENT(self, e.err) + elif e.err.find('File not found') != -1: + raise py.error.ENOENT(self, e.err) + elif e.err.find('not part of a repository')!=-1: + raise py.error.ENOENT(self, e.err) + elif e.err.find('Unable to open')!=-1: + raise py.error.ENOENT(self, e.err) + elif e.err.lower().find('method not allowed')!=-1: + raise py.error.EACCES(self, e.err) + raise py.error.Error(e.err) + lines = res.split('\n') + nameinfo_seq = [] + for lsline in lines: + if lsline: + info = InfoSvnCommand(lsline) + if info._name != '.': # svn 1.5 produces '.' dirs, + nameinfo_seq.append((info._name, info)) + nameinfo_seq.sort() + return nameinfo_seq + auth = self.auth and self.auth.makecmdoptions() or None + if self.rev is not None: + return self._lsrevcache.getorbuild((self.strpath, self.rev, auth), + builder) + else: + return self._lsnorevcache.getorbuild((self.strpath, auth), + builder) + + def listdir(self, fil=None, sort=None): + """ list directory contents, possibly filter by the given fil func + and possibly sorted. + """ + if isinstance(fil, str): + fil = common.FNMatcher(fil) + nameinfo_seq = self._listdir_nameinfo() + if len(nameinfo_seq) == 1: + name, info = nameinfo_seq[0] + if name == self.basename and info.kind == 'file': + #if not self.check(dir=1): + raise py.error.ENOTDIR(self) + paths = [self.join(name) for (name, info) in nameinfo_seq] + if fil: + paths = [x for x in paths if fil(x)] + self._sortlist(paths, sort) + return paths + + + def log(self, rev_start=None, rev_end=1, verbose=False): + """ return a list of LogEntry instances for this path. +rev_start is the starting revision (defaulting to the first one). +rev_end is the last revision (defaulting to HEAD). +if verbose is True, then the LogEntry instances also know which files changed. +""" + assert self.check() #make it simpler for the pipe + rev_start = rev_start is None and "HEAD" or rev_start + rev_end = rev_end is None and "HEAD" or rev_end + + if rev_start == "HEAD" and rev_end == 1: + rev_opt = "" + else: + rev_opt = "-r %s:%s" % (rev_start, rev_end) + verbose_opt = verbose and "-v" or "" + xmlpipe = self._svnpopenauth('svn log --xml %s %s "%s"' % + (rev_opt, verbose_opt, self.strpath)) + from xml.dom import minidom + tree = minidom.parse(xmlpipe) + result = [] + for logentry in filter(None, tree.firstChild.childNodes): + if logentry.nodeType == logentry.ELEMENT_NODE: + result.append(svncommon.LogEntry(logentry)) + return result + +#01234567890123456789012345678901234567890123467 +# 2256 hpk 165 Nov 24 17:55 __init__.py +# XXX spotted by Guido, SVN 1.3.0 has different aligning, breaks the code!!! +# 1312 johnny 1627 May 05 14:32 test_decorators.py +# +class InfoSvnCommand: + # the '0?' part in the middle is an indication of whether the resource is + # locked, see 'svn help ls' + lspattern = re.compile( + r'^ *(?P\d+) +(?P.+?) +(0? *(?P\d+))? ' + '*(?P\w+ +\d{2} +[\d:]+) +(?P.*)$') + def __init__(self, line): + # this is a typical line from 'svn ls http://...' + #_ 1127 jum 0 Jul 13 15:28 branch/ + match = self.lspattern.match(line) + data = match.groupdict() + self._name = data['file'] + if self._name[-1] == '/': + self._name = self._name[:-1] + self.kind = 'dir' + else: + self.kind = 'file' + #self.has_props = l.pop(0) == 'P' + self.created_rev = int(data['rev']) + self.last_author = data['author'] + self.size = data['size'] and int(data['size']) or 0 + self.mtime = parse_time_with_missing_year(data['date']) + self.time = self.mtime * 1000000 + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + +#____________________________________________________ +# +# helper functions +#____________________________________________________ +def parse_time_with_missing_year(timestr): + """ analyze the time part from a single line of "svn ls -v" + the svn output doesn't show the year makes the 'timestr' + ambigous. + """ + import calendar + t_now = time.gmtime() + + tparts = timestr.split() + month = time.strptime(tparts.pop(0), '%b')[1] + day = time.strptime(tparts.pop(0), '%d')[2] + last = tparts.pop(0) # year or hour:minute + try: + if ":" in last: + raise ValueError() + year = time.strptime(last, '%Y')[0] + hour = minute = 0 + except ValueError: + hour, minute = time.strptime(last, '%H:%M')[3:5] + year = t_now[0] + + t_result = (year, month, day, hour, minute, 0,0,0,0) + if t_result > t_now: + year -= 1 + t_result = (year, month, day, hour, minute, 0,0,0,0) + return calendar.timegm(t_result) + +class PathEntry: + def __init__(self, ppart): + self.strpath = ppart.firstChild.nodeValue.encode('UTF-8') + self.action = ppart.getAttribute('action').encode('UTF-8') + if self.action == 'A': + self.copyfrom_path = ppart.getAttribute('copyfrom-path').encode('UTF-8') + if self.copyfrom_path: + self.copyfrom_rev = int(ppart.getAttribute('copyfrom-rev')) +