eggs/mercurial-1.7.3-py2.6-linux-x86_64.egg/mercurial/wireproto.py
changeset 69 c6bca38c1cbf
equal deleted inserted replaced
68:5ff1fc726848 69:c6bca38c1cbf
       
     1 # wireproto.py - generic wire protocol support functions
       
     2 #
       
     3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 import urllib, tempfile, os, sys
       
     9 from i18n import _
       
    10 from node import bin, hex
       
    11 import changegroup as changegroupmod
       
    12 import repo, error, encoding, util, store
       
    13 import pushkey as pushkeymod
       
    14 
       
    15 # list of nodes encoding / decoding
       
    16 
       
    17 def decodelist(l, sep=' '):
       
    18     return map(bin, l.split(sep))
       
    19 
       
    20 def encodelist(l, sep=' '):
       
    21     return sep.join(map(hex, l))
       
    22 
       
    23 # client side
       
    24 
       
    25 class wirerepository(repo.repository):
       
    26     def lookup(self, key):
       
    27         self.requirecap('lookup', _('look up remote revision'))
       
    28         d = self._call("lookup", key=key)
       
    29         success, data = d[:-1].split(" ", 1)
       
    30         if int(success):
       
    31             return bin(data)
       
    32         self._abort(error.RepoError(data))
       
    33 
       
    34     def heads(self):
       
    35         d = self._call("heads")
       
    36         try:
       
    37             return decodelist(d[:-1])
       
    38         except:
       
    39             self._abort(error.ResponseError(_("unexpected response:"), d))
       
    40 
       
    41     def branchmap(self):
       
    42         d = self._call("branchmap")
       
    43         try:
       
    44             branchmap = {}
       
    45             for branchpart in d.splitlines():
       
    46                 branchname, branchheads = branchpart.split(' ', 1)
       
    47                 branchname = urllib.unquote(branchname)
       
    48                 # Earlier servers (1.3.x) send branch names in (their) local
       
    49                 # charset. The best we can do is assume it's identical to our
       
    50                 # own local charset, in case it's not utf-8.
       
    51                 try:
       
    52                     branchname.decode('utf-8')
       
    53                 except UnicodeDecodeError:
       
    54                     branchname = encoding.fromlocal(branchname)
       
    55                 branchheads = decodelist(branchheads)
       
    56                 branchmap[branchname] = branchheads
       
    57             return branchmap
       
    58         except TypeError:
       
    59             self._abort(error.ResponseError(_("unexpected response:"), d))
       
    60 
       
    61     def branches(self, nodes):
       
    62         n = encodelist(nodes)
       
    63         d = self._call("branches", nodes=n)
       
    64         try:
       
    65             br = [tuple(decodelist(b)) for b in d.splitlines()]
       
    66             return br
       
    67         except:
       
    68             self._abort(error.ResponseError(_("unexpected response:"), d))
       
    69 
       
    70     def between(self, pairs):
       
    71         batch = 8 # avoid giant requests
       
    72         r = []
       
    73         for i in xrange(0, len(pairs), batch):
       
    74             n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
       
    75             d = self._call("between", pairs=n)
       
    76             try:
       
    77                 r.extend(l and decodelist(l) or [] for l in d.splitlines())
       
    78             except:
       
    79                 self._abort(error.ResponseError(_("unexpected response:"), d))
       
    80         return r
       
    81 
       
    82     def pushkey(self, namespace, key, old, new):
       
    83         if not self.capable('pushkey'):
       
    84             return False
       
    85         d = self._call("pushkey",
       
    86                       namespace=namespace, key=key, old=old, new=new)
       
    87         return bool(int(d))
       
    88 
       
    89     def listkeys(self, namespace):
       
    90         if not self.capable('pushkey'):
       
    91             return {}
       
    92         d = self._call("listkeys", namespace=namespace)
       
    93         r = {}
       
    94         for l in d.splitlines():
       
    95             k, v = l.split('\t')
       
    96             r[k.decode('string-escape')] = v.decode('string-escape')
       
    97         return r
       
    98 
       
    99     def stream_out(self):
       
   100         return self._callstream('stream_out')
       
   101 
       
   102     def changegroup(self, nodes, kind):
       
   103         n = encodelist(nodes)
       
   104         f = self._callstream("changegroup", roots=n)
       
   105         return changegroupmod.unbundle10(self._decompress(f), 'UN')
       
   106 
       
   107     def changegroupsubset(self, bases, heads, kind):
       
   108         self.requirecap('changegroupsubset', _('look up remote changes'))
       
   109         bases = encodelist(bases)
       
   110         heads = encodelist(heads)
       
   111         f = self._callstream("changegroupsubset",
       
   112                              bases=bases, heads=heads)
       
   113         return changegroupmod.unbundle10(self._decompress(f), 'UN')
       
   114 
       
   115     def unbundle(self, cg, heads, source):
       
   116         '''Send cg (a readable file-like object representing the
       
   117         changegroup to push, typically a chunkbuffer object) to the
       
   118         remote server as a bundle. Return an integer indicating the
       
   119         result of the push (see localrepository.addchangegroup()).'''
       
   120 
       
   121         ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
       
   122         if ret == "":
       
   123             raise error.ResponseError(
       
   124                 _('push failed:'), output)
       
   125         try:
       
   126             ret = int(ret)
       
   127         except ValueError:
       
   128             raise error.ResponseError(
       
   129                 _('push failed (unexpected response):'), ret)
       
   130 
       
   131         for l in output.splitlines(True):
       
   132             self.ui.status(_('remote: '), l)
       
   133         return ret
       
   134 
       
   135 # server side
       
   136 
       
   137 class streamres(object):
       
   138     def __init__(self, gen):
       
   139         self.gen = gen
       
   140 
       
   141 class pushres(object):
       
   142     def __init__(self, res):
       
   143         self.res = res
       
   144 
       
   145 class pusherr(object):
       
   146     def __init__(self, res):
       
   147         self.res = res
       
   148 
       
   149 def dispatch(repo, proto, command):
       
   150     func, spec = commands[command]
       
   151     args = proto.getargs(spec)
       
   152     return func(repo, proto, *args)
       
   153 
       
   154 def between(repo, proto, pairs):
       
   155     pairs = [decodelist(p, '-') for p in pairs.split(" ")]
       
   156     r = []
       
   157     for b in repo.between(pairs):
       
   158         r.append(encodelist(b) + "\n")
       
   159     return "".join(r)
       
   160 
       
   161 def branchmap(repo, proto):
       
   162     branchmap = repo.branchmap()
       
   163     heads = []
       
   164     for branch, nodes in branchmap.iteritems():
       
   165         branchname = urllib.quote(branch)
       
   166         branchnodes = encodelist(nodes)
       
   167         heads.append('%s %s' % (branchname, branchnodes))
       
   168     return '\n'.join(heads)
       
   169 
       
   170 def branches(repo, proto, nodes):
       
   171     nodes = decodelist(nodes)
       
   172     r = []
       
   173     for b in repo.branches(nodes):
       
   174         r.append(encodelist(b) + "\n")
       
   175     return "".join(r)
       
   176 
       
   177 def capabilities(repo, proto):
       
   178     caps = 'lookup changegroupsubset branchmap pushkey'.split()
       
   179     if _allowstream(repo.ui):
       
   180         requiredformats = repo.requirements & repo.supportedformats
       
   181         # if our local revlogs are just revlogv1, add 'stream' cap
       
   182         if not requiredformats - set(('revlogv1',)):
       
   183             caps.append('stream')
       
   184         # otherwise, add 'streamreqs' detailing our local revlog format
       
   185         else:
       
   186             caps.append('streamreqs=%s' % ','.join(requiredformats))
       
   187     caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
       
   188     return ' '.join(caps)
       
   189 
       
   190 def changegroup(repo, proto, roots):
       
   191     nodes = decodelist(roots)
       
   192     cg = repo.changegroup(nodes, 'serve')
       
   193     return streamres(proto.groupchunks(cg))
       
   194 
       
   195 def changegroupsubset(repo, proto, bases, heads):
       
   196     bases = decodelist(bases)
       
   197     heads = decodelist(heads)
       
   198     cg = repo.changegroupsubset(bases, heads, 'serve')
       
   199     return streamres(proto.groupchunks(cg))
       
   200 
       
   201 def heads(repo, proto):
       
   202     h = repo.heads()
       
   203     return encodelist(h) + "\n"
       
   204 
       
   205 def hello(repo, proto):
       
   206     '''the hello command returns a set of lines describing various
       
   207     interesting things about the server, in an RFC822-like format.
       
   208     Currently the only one defined is "capabilities", which
       
   209     consists of a line in the form:
       
   210 
       
   211     capabilities: space separated list of tokens
       
   212     '''
       
   213     return "capabilities: %s\n" % (capabilities(repo, proto))
       
   214 
       
   215 def listkeys(repo, proto, namespace):
       
   216     d = pushkeymod.list(repo, namespace).items()
       
   217     t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
       
   218                                v.encode('string-escape')) for k, v in d])
       
   219     return t
       
   220 
       
   221 def lookup(repo, proto, key):
       
   222     try:
       
   223         r = hex(repo.lookup(key))
       
   224         success = 1
       
   225     except Exception, inst:
       
   226         r = str(inst)
       
   227         success = 0
       
   228     return "%s %s\n" % (success, r)
       
   229 
       
   230 def pushkey(repo, proto, namespace, key, old, new):
       
   231     r = pushkeymod.push(repo, namespace, key, old, new)
       
   232     return '%s\n' % int(r)
       
   233 
       
   234 def _allowstream(ui):
       
   235     return ui.configbool('server', 'uncompressed', True, untrusted=True)
       
   236 
       
   237 def stream(repo, proto):
       
   238     '''If the server supports streaming clone, it advertises the "stream"
       
   239     capability with a value representing the version and flags of the repo
       
   240     it is serving. Client checks to see if it understands the format.
       
   241 
       
   242     The format is simple: the server writes out a line with the amount
       
   243     of files, then the total amount of bytes to be transfered (separated
       
   244     by a space). Then, for each file, the server first writes the filename
       
   245     and filesize (separated by the null character), then the file contents.
       
   246     '''
       
   247 
       
   248     if not _allowstream(repo.ui):
       
   249         return '1\n'
       
   250 
       
   251     entries = []
       
   252     total_bytes = 0
       
   253     try:
       
   254         # get consistent snapshot of repo, lock during scan
       
   255         lock = repo.lock()
       
   256         try:
       
   257             repo.ui.debug('scanning\n')
       
   258             for name, ename, size in repo.store.walk():
       
   259                 entries.append((name, size))
       
   260                 total_bytes += size
       
   261         finally:
       
   262             lock.release()
       
   263     except error.LockError:
       
   264         return '2\n' # error: 2
       
   265 
       
   266     def streamer(repo, entries, total):
       
   267         '''stream out all metadata files in repository.'''
       
   268         yield '0\n' # success
       
   269         repo.ui.debug('%d files, %d bytes to transfer\n' %
       
   270                       (len(entries), total_bytes))
       
   271         yield '%d %d\n' % (len(entries), total_bytes)
       
   272         for name, size in entries:
       
   273             repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
       
   274             # partially encode name over the wire for backwards compat
       
   275             yield '%s\0%d\n' % (store.encodedir(name), size)
       
   276             for chunk in util.filechunkiter(repo.sopener(name), limit=size):
       
   277                 yield chunk
       
   278 
       
   279     return streamres(streamer(repo, entries, total_bytes))
       
   280 
       
   281 def unbundle(repo, proto, heads):
       
   282     their_heads = decodelist(heads)
       
   283 
       
   284     def check_heads():
       
   285         heads = repo.heads()
       
   286         return their_heads == ['force'] or their_heads == heads
       
   287 
       
   288     proto.redirect()
       
   289 
       
   290     # fail early if possible
       
   291     if not check_heads():
       
   292         return pusherr('unsynced changes')
       
   293 
       
   294     # write bundle data to temporary file because it can be big
       
   295     fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
       
   296     fp = os.fdopen(fd, 'wb+')
       
   297     r = 0
       
   298     try:
       
   299         proto.getfile(fp)
       
   300         lock = repo.lock()
       
   301         try:
       
   302             if not check_heads():
       
   303                 # someone else committed/pushed/unbundled while we
       
   304                 # were transferring data
       
   305                 return pusherr('unsynced changes')
       
   306 
       
   307             # push can proceed
       
   308             fp.seek(0)
       
   309             gen = changegroupmod.readbundle(fp, None)
       
   310 
       
   311             try:
       
   312                 r = repo.addchangegroup(gen, 'serve', proto._client(),
       
   313                                         lock=lock)
       
   314             except util.Abort, inst:
       
   315                 sys.stderr.write("abort: %s\n" % inst)
       
   316         finally:
       
   317             lock.release()
       
   318         return pushres(r)
       
   319 
       
   320     finally:
       
   321         fp.close()
       
   322         os.unlink(tempname)
       
   323 
       
   324 commands = {
       
   325     'between': (between, 'pairs'),
       
   326     'branchmap': (branchmap, ''),
       
   327     'branches': (branches, 'nodes'),
       
   328     'capabilities': (capabilities, ''),
       
   329     'changegroup': (changegroup, 'roots'),
       
   330     'changegroupsubset': (changegroupsubset, 'bases heads'),
       
   331     'heads': (heads, ''),
       
   332     'hello': (hello, ''),
       
   333     'listkeys': (listkeys, 'namespace'),
       
   334     'lookup': (lookup, 'key'),
       
   335     'pushkey': (pushkey, 'namespace key old new'),
       
   336     'stream_out': (stream, ''),
       
   337     'unbundle': (unbundle, 'heads'),
       
   338 }