thirdparty/google_appengine/google/appengine/ext/remote_api/remote_api_stub.py
changeset 1278 a7766286a7be
child 2273 e4cb9c53db3e
equal deleted inserted replaced
1277:5c931bd3dc1e 1278:a7766286a7be
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """An apiproxy stub that calls a remote handler via HTTP.
       
    19 
       
    20 This allows easy remote access to the App Engine datastore, and potentially any
       
    21 of the other App Engine APIs, using the same interface you use when accessing
       
    22 the service locally.
       
    23 
       
    24 An example Python script:
       
    25 ---
       
    26 from google.appengine.ext import db
       
    27 from google.appengine.ext.remote_api import remote_api_stub
       
    28 from myapp import models
       
    29 import getpass
       
    30 
       
    31 def auth_func():
       
    32   return (raw_input('Username:'), getpass.getpass('Password:'))
       
    33 
       
    34 remote_api_stub.ConfigureRemoteDatastore('my-app', '/remote_api', auth_func)
       
    35 
       
    36 # Now you can access the remote datastore just as if your code was running on
       
    37 # App Engine!
       
    38 
       
    39 houses = models.House.all().fetch(100)
       
    40 for a_house in q:
       
    41   a_house.doors += 1
       
    42 db.put(houses)
       
    43 ---
       
    44 
       
    45 A few caveats:
       
    46 - Where possible, avoid iterating over queries directly. Fetching as many
       
    47   results as you will need is faster and more efficient.
       
    48 - If you need to iterate, consider instead fetching items in batches with a sort
       
    49   order and constructing a new query starting from where the previous one left
       
    50   off. The __key__ pseudo-property can be used as a sort key for this purpose,
       
    51   and does not even require a custom index if you are iterating over all
       
    52   entities of a given type.
       
    53 - Likewise, it's a good idea to put entities in batches. Instead of calling put
       
    54   for each individual entity, accumulate them and put them in batches using
       
    55   db.put(), if you can.
       
    56 - Requests and responses are still limited to 1MB each, so if you have large
       
    57   entities or try and fetch or put many of them at once, your requests may fail.
       
    58 """
       
    59 
       
    60 
       
    61 
       
    62 
       
    63 
       
    64 import os
       
    65 import pickle
       
    66 import sha
       
    67 import sys
       
    68 import thread
       
    69 import threading
       
    70 from google.appengine.api import apiproxy_stub_map
       
    71 from google.appengine.datastore import datastore_pb
       
    72 from google.appengine.ext.remote_api import remote_api_pb
       
    73 from google.appengine.runtime import apiproxy_errors
       
    74 from google.appengine.tools import appengine_rpc
       
    75 
       
    76 
       
    77 def GetUserAgent():
       
    78   """Determines the value of the 'User-agent' header to use for HTTP requests.
       
    79 
       
    80   Returns:
       
    81     String containing the 'user-agent' header value, which includes the SDK
       
    82     version, the platform information, and the version of Python;
       
    83     e.g., "remote_api/1.0.1 Darwin/9.2.0 Python/2.5.2".
       
    84   """
       
    85   product_tokens = []
       
    86 
       
    87   product_tokens.append("Google-remote_api/1.0")
       
    88 
       
    89   product_tokens.append(appengine_rpc.GetPlatformToken())
       
    90 
       
    91   python_version = ".".join(str(i) for i in sys.version_info)
       
    92   product_tokens.append("Python/%s" % python_version)
       
    93 
       
    94   return " ".join(product_tokens)
       
    95 
       
    96 
       
    97 def GetSourceName():
       
    98   return "Google-remote_api-1.0"
       
    99 
       
   100 
       
   101 class TransactionData(object):
       
   102   """Encapsulates data about an individual transaction."""
       
   103 
       
   104   def __init__(self, thread_id):
       
   105     self.thread_id = thread_id
       
   106     self.preconditions = {}
       
   107     self.entities = {}
       
   108 
       
   109 
       
   110 class RemoteStub(object):
       
   111   """A stub for calling services on a remote server over HTTP.
       
   112 
       
   113   You can use this to stub out any service that the remote server supports.
       
   114   """
       
   115 
       
   116   def __init__(self, server, path):
       
   117     """Constructs a new RemoteStub that communicates with the specified server.
       
   118 
       
   119     Args:
       
   120       server: An instance of a subclass of
       
   121         google.appengine.tools.appengine_rpc.AbstractRpcServer.
       
   122       path: The path to the handler this stub should send requests to.
       
   123     """
       
   124     self._server = server
       
   125     self._path = path
       
   126 
       
   127   def MakeSyncCall(self, service, call, request, response):
       
   128     request_pb = remote_api_pb.Request()
       
   129     request_pb.set_service_name(service)
       
   130     request_pb.set_method(call)
       
   131     request_pb.mutable_request().set_contents(request.Encode())
       
   132 
       
   133     response_pb = remote_api_pb.Response()
       
   134     response_pb.ParseFromString(self._server.Send(self._path,
       
   135                                                   request_pb.Encode()))
       
   136 
       
   137     if response_pb.has_exception():
       
   138       raise pickle.loads(response_pb.exception().contents())
       
   139     else:
       
   140       response.ParseFromString(response_pb.response().contents())
       
   141 
       
   142 
       
   143 class RemoteDatastoreStub(RemoteStub):
       
   144   """A specialised stub for accessing the App Engine datastore remotely.
       
   145 
       
   146   A specialised stub is required because there are some datastore operations
       
   147   that preserve state between calls. This stub makes queries possible.
       
   148   Transactions on the remote datastore are unfortunately still impossible.
       
   149   """
       
   150 
       
   151   def __init__(self, server, path):
       
   152     super(RemoteDatastoreStub, self).__init__(server, path)
       
   153     self.__queries = {}
       
   154     self.__transactions = {}
       
   155 
       
   156     self.__next_local_cursor = 1
       
   157     self.__local_cursor_lock = threading.Lock()
       
   158     self.__next_local_tx = 1
       
   159     self.__local_tx_lock = threading.Lock()
       
   160 
       
   161   def MakeSyncCall(self, service, call, request, response):
       
   162     assert service == 'datastore_v3'
       
   163 
       
   164     explanation = []
       
   165     assert request.IsInitialized(explanation), explanation
       
   166 
       
   167     handler = getattr(self, '_Dynamic_' + call, None)
       
   168     if handler:
       
   169       handler(request, response)
       
   170     else:
       
   171       super(RemoteDatastoreStub, self).MakeSyncCall(service, call, request,
       
   172                                                     response)
       
   173 
       
   174     assert response.IsInitialized(explanation), explanation
       
   175 
       
   176   def _Dynamic_RunQuery(self, query, query_result):
       
   177     self.__local_cursor_lock.acquire()
       
   178     try:
       
   179       cursor_id = self.__next_local_cursor
       
   180       self.__next_local_cursor += 1
       
   181     finally:
       
   182       self.__local_cursor_lock.release()
       
   183     self.__queries[cursor_id] = query
       
   184 
       
   185     query_result.mutable_cursor().set_cursor(cursor_id)
       
   186     query_result.set_more_results(True)
       
   187 
       
   188   def _Dynamic_Next(self, next_request, query_result):
       
   189     cursor = next_request.cursor().cursor()
       
   190     if cursor not in self.__queries:
       
   191       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
       
   192                                              'Cursor %d not found' % cursor)
       
   193     query = self.__queries[cursor]
       
   194 
       
   195     if query is None:
       
   196       query_result.set_more_results(False)
       
   197       return
       
   198 
       
   199     request = datastore_pb.Query()
       
   200     request.CopyFrom(query)
       
   201     if request.has_limit():
       
   202       request.set_limit(min(request.limit(), next_request.count()))
       
   203     else:
       
   204       request.set_limit(next_request.count())
       
   205 
       
   206     super(RemoteDatastoreStub, self).MakeSyncCall(
       
   207         'remote_datastore', 'RunQuery', request, query_result)
       
   208 
       
   209     query.set_offset(query.offset() + query_result.result_size())
       
   210     if query.has_limit():
       
   211       query.set_limit(query.limit() - query_result.result_size())
       
   212     if not query_result.more_results():
       
   213       self.__queries[cursor] = None
       
   214 
       
   215   def _Dynamic_Get(self, get_request, get_response):
       
   216     txid = None
       
   217     if get_request.has_transaction():
       
   218       txid = get_request.transaction().handle()
       
   219       txdata = self.__transactions[txid]
       
   220       assert (txdata.thread_id == thread.get_ident(),
       
   221               "Transactions are single-threaded.")
       
   222 
       
   223       keys = [(k, k.Encode()) for k in get_request.key_list()]
       
   224 
       
   225       new_request = datastore_pb.GetRequest()
       
   226       for key, enckey in keys:
       
   227         if enckey not in txdata.entities:
       
   228           new_request.add_key().CopyFrom(key)
       
   229     else:
       
   230       new_request = get_request
       
   231 
       
   232     if new_request.key_size() > 0:
       
   233       super(RemoteDatastoreStub, self).MakeSyncCall(
       
   234           'datastore_v3', 'Get', new_request, get_response)
       
   235 
       
   236     if txid is not None:
       
   237       newkeys = new_request.key_list()
       
   238       entities = get_response.entity_list()
       
   239       for key, entity in zip(newkeys, entities):
       
   240         entity_hash = None
       
   241         if entity.has_entity():
       
   242           entity_hash = sha.new(entity.entity().Encode()).digest()
       
   243         txdata.preconditions[key.Encode()] = (key, entity_hash)
       
   244 
       
   245       new_response = datastore_pb.GetResponse()
       
   246       it = iter(get_response.entity_list())
       
   247       for key, enckey in keys:
       
   248         if enckey in txdata.entities:
       
   249           cached_entity = txdata.entities[enckey][1]
       
   250           if cached_entity:
       
   251             new_response.add_entity().mutable_entity().CopyFrom(cached_entity)
       
   252           else:
       
   253             new_response.add_entity()
       
   254         else:
       
   255           new_entity = it.next()
       
   256           if new_entity.has_entity():
       
   257             assert new_entity.entity().key() == key
       
   258             new_response.add_entity().CopyFrom(new_entity)
       
   259           else:
       
   260             new_response.add_entity()
       
   261       get_response.CopyFrom(new_response)
       
   262 
       
   263   def _Dynamic_Put(self, put_request, put_response):
       
   264     if put_request.has_transaction():
       
   265       entities = put_request.entity_list()
       
   266 
       
   267       requires_id = lambda x: x.id() == 0 and not x.has_name()
       
   268       new_ents = [e for e in entities
       
   269                   if requires_id(e.key().path().element_list()[-1])]
       
   270       id_request = remote_api_pb.PutRequest()
       
   271       if new_ents:
       
   272         for ent in new_ents:
       
   273           e = id_request.add_entity()
       
   274           e.mutable_key().CopyFrom(ent.key())
       
   275           e.mutable_entity_group()
       
   276         id_response = datastore_pb.PutResponse()
       
   277         super(RemoteDatastoreStub, self).MakeSyncCall(
       
   278             'remote_datastore', 'GetIDs', id_request, id_response)
       
   279         assert id_request.entity_size() == id_response.key_size()
       
   280         for key, ent in zip(id_response.key_list(), new_ents):
       
   281           ent.mutable_key().CopyFrom(key)
       
   282           ent.mutable_entity_group().add_element().CopyFrom(
       
   283               key.path().element(0))
       
   284 
       
   285       txid = put_request.transaction().handle()
       
   286       txdata = self.__transactions[txid]
       
   287       assert (txdata.thread_id == thread.get_ident(),
       
   288               "Transactions are single-threaded.")
       
   289       for entity in entities:
       
   290         txdata.entities[entity.key().Encode()] = (entity.key(), entity)
       
   291         put_response.add_key().CopyFrom(entity.key())
       
   292     else:
       
   293       super(RemoteDatastoreStub, self).MakeSyncCall(
       
   294           'datastore_v3', 'Put', put_request, put_response)
       
   295 
       
   296   def _Dynamic_Delete(self, delete_request, response):
       
   297     if delete_request.has_transaction():
       
   298       txid = delete_request.transaction().handle()
       
   299       txdata = self.__transactions[txid]
       
   300       assert (txdata.thread_id == thread.get_ident(),
       
   301               "Transactions are single-threaded.")
       
   302       for key in delete_request.key_list():
       
   303         txdata.entities[key.Encode()] = (key, None)
       
   304     else:
       
   305       super(RemoteDatastoreStub, self).MakeSyncCall(
       
   306           'datastore_v3', 'Delete', delete_request, response)
       
   307 
       
   308   def _Dynamic_BeginTransaction(self, request, transaction):
       
   309     self.__local_tx_lock.acquire()
       
   310     try:
       
   311       txid = self.__next_local_tx
       
   312       self.__transactions[txid] = TransactionData(thread.get_ident())
       
   313       self.__next_local_tx += 1
       
   314     finally:
       
   315       self.__local_tx_lock.release()
       
   316     transaction.set_handle(txid)
       
   317 
       
   318   def _Dynamic_Commit(self, transaction, transaction_response):
       
   319     txid = transaction.handle()
       
   320     if txid not in self.__transactions:
       
   321       raise apiproxy_errors.ApplicationError(
       
   322           datastore_pb.Error.BAD_REQUEST,
       
   323           'Transaction %d not found.' % (txid,))
       
   324 
       
   325     txdata = self.__transactions[txid]
       
   326     assert (txdata.thread_id == thread.get_ident(),
       
   327             "Transactions are single-threaded.")
       
   328     del self.__transactions[txid]
       
   329 
       
   330     tx = remote_api_pb.TransactionRequest()
       
   331     for key, hash in txdata.preconditions.values():
       
   332       precond = tx.add_precondition()
       
   333       precond.mutable_key().CopyFrom(key)
       
   334       if hash:
       
   335         precond.set_hash(hash)
       
   336 
       
   337     puts = tx.mutable_puts()
       
   338     deletes = tx.mutable_deletes()
       
   339     for key, entity in txdata.entities.values():
       
   340       if entity:
       
   341         puts.add_entity().CopyFrom(entity)
       
   342       else:
       
   343         deletes.add_key().CopyFrom(key)
       
   344 
       
   345     super(RemoteDatastoreStub, self).MakeSyncCall(
       
   346         'remote_datastore', 'Transaction',
       
   347         tx, datastore_pb.PutResponse())
       
   348 
       
   349   def _Dynamic_Rollback(self, transaction, transaction_response):
       
   350     txid = transaction.handle()
       
   351     self.__local_tx_lock.acquire()
       
   352     try:
       
   353       if txid not in self.__transactions:
       
   354         raise apiproxy_errors.ApplicationError(
       
   355             datastore_pb.Error.BAD_REQUEST,
       
   356             'Transaction %d not found.' % (txid,))
       
   357 
       
   358       assert (txdata[txid].thread_id == thread.get_ident(),
       
   359               "Transactions are single-threaded.")
       
   360       del self.__transactions[txid]
       
   361     finally:
       
   362       self.__local_tx_lock.release()
       
   363 
       
   364   def _Dynamic_CreateIndex(self, index, id_response):
       
   365     raise apiproxy_errors.CapabilityDisabledError(
       
   366         'The remote datastore does not support index manipulation.')
       
   367 
       
   368   def _Dynamic_UpdateIndex(self, index, void):
       
   369     raise apiproxy_errors.CapabilityDisabledError(
       
   370         'The remote datastore does not support index manipulation.')
       
   371 
       
   372   def _Dynamic_DeleteIndex(self, index, void):
       
   373     raise apiproxy_errors.CapabilityDisabledError(
       
   374         'The remote datastore does not support index manipulation.')
       
   375 
       
   376 
       
   377 def ConfigureRemoteDatastore(app_id,
       
   378                              path,
       
   379                              auth_func,
       
   380                              servername=None,
       
   381                              rpc_server_factory=appengine_rpc.HttpRpcServer):
       
   382   """Does necessary setup to allow easy remote access to an AppEngine datastore.
       
   383 
       
   384   Args:
       
   385     app_id: The app_id of your app, as declared in app.yaml.
       
   386     path: The path to the remote_api handler for your app
       
   387       (for example, '/remote_api').
       
   388     auth_func: A function that takes no arguments and returns a
       
   389       (username, password) tuple. This will be called if your application
       
   390       requires authentication to access the remote_api handler (it should!)
       
   391       and you do not already have a valid auth cookie.
       
   392     servername: The hostname your app is deployed on. Defaults to
       
   393       <app_id>.appspot.com.
       
   394     rpc_server_factory: A factory to construct the rpc server for the datastore.
       
   395   """
       
   396   if not servername:
       
   397     servername = '%s.appspot.com' % (app_id,)
       
   398   os.environ['APPLICATION_ID'] = app_id
       
   399   apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
       
   400   server = rpc_server_factory(servername, auth_func, GetUserAgent(),
       
   401                                        GetSourceName())
       
   402   stub = RemoteDatastoreStub(server, path)
       
   403   apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)