thirdparty/google_appengine/google/appengine/ext/remote_api/remote_api_stub.py
changeset 2864 2e0b0af889be
parent 2413 d0b7dac5325c
child 3031 7678f72140e6
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
    69 import sys
    69 import sys
    70 import thread
    70 import thread
    71 import threading
    71 import threading
    72 import yaml
    72 import yaml
    73 
    73 
       
    74 from google.appengine.api import datastore
       
    75 from google.appengine.api import apiproxy_rpc
    74 from google.appengine.api import apiproxy_stub_map
    76 from google.appengine.api import apiproxy_stub_map
    75 from google.appengine.datastore import datastore_pb
    77 from google.appengine.datastore import datastore_pb
    76 from google.appengine.ext.remote_api import remote_api_pb
    78 from google.appengine.ext.remote_api import remote_api_pb
    77 from google.appengine.runtime import apiproxy_errors
    79 from google.appengine.runtime import apiproxy_errors
    78 from google.appengine.tools import appengine_rpc
    80 from google.appengine.tools import appengine_rpc
    82   """Base class for exceptions in this module."""
    84   """Base class for exceptions in this module."""
    83 
    85 
    84 
    86 
    85 class ConfigurationError(Error):
    87 class ConfigurationError(Error):
    86   """Exception for configuration errors."""
    88   """Exception for configuration errors."""
       
    89 
       
    90 
       
    91 class UnknownJavaServerError(Error):
       
    92   """Exception for exceptions returned from a Java remote_api handler."""
    87 
    93 
    88 
    94 
    89 def GetUserAgent():
    95 def GetUserAgent():
    90   """Determines the value of the 'User-agent' header to use for HTTP requests.
    96   """Determines the value of the 'User-agent' header to use for HTTP requests.
    91 
    97 
   134       path: The path to the handler this stub should send requests to.
   140       path: The path to the handler this stub should send requests to.
   135     """
   141     """
   136     self._server = server
   142     self._server = server
   137     self._path = path
   143     self._path = path
   138 
   144 
       
   145   def _PreHookHandler(self, service, call, request, response):
       
   146     pass
       
   147 
       
   148   def _PostHookHandler(self, service, call, request, response):
       
   149     pass
       
   150 
   139   def MakeSyncCall(self, service, call, request, response):
   151   def MakeSyncCall(self, service, call, request, response):
       
   152     self._PreHookHandler(service, call, request, response)
   140     request_pb = remote_api_pb.Request()
   153     request_pb = remote_api_pb.Request()
   141     request_pb.set_service_name(service)
   154     request_pb.set_service_name(service)
   142     request_pb.set_method(call)
   155     request_pb.set_method(call)
   143     request_pb.mutable_request().set_contents(request.Encode())
   156     request_pb.mutable_request().set_contents(request.Encode())
   144 
   157 
   145     response_pb = remote_api_pb.Response()
   158     response_pb = remote_api_pb.Response()
   146     response_pb.ParseFromString(self._server.Send(self._path,
   159     encoded_request = request_pb.Encode()
   147                                                   request_pb.Encode()))
   160     encoded_response = self._server.Send(self._path, encoded_request)
   148 
   161     response_pb.ParseFromString(encoded_response)
   149     if response_pb.has_exception():
   162 
   150       raise pickle.loads(response_pb.exception().contents())
   163     try:
   151     else:
   164       if response_pb.has_application_error():
   152       response.ParseFromString(response_pb.response().contents())
   165         error_pb = response_pb.application_error()
       
   166         raise datastore._ToDatastoreError(
       
   167             apiproxy_errors.ApplicationError(error_pb.code(), error_pb.detail()))
       
   168       elif response_pb.has_exception():
       
   169         raise pickle.loads(response_pb.exception().contents())
       
   170       elif response_pb.has_java_exception():
       
   171         raise UnknownJavaServerError("An unknown error has occured in the "
       
   172                                      "Java remote_api handler for this call.")
       
   173       else:
       
   174         response.ParseFromString(response_pb.response().contents())
       
   175     finally:
       
   176       self._PostHookHandler(service, call, request, response)
       
   177 
       
   178   def CreateRPC(self):
       
   179     return apiproxy_rpc.RPC(stub=self)
   153 
   180 
   154 
   181 
   155 class RemoteDatastoreStub(RemoteStub):
   182 class RemoteDatastoreStub(RemoteStub):
   156   """A specialised stub for accessing the App Engine datastore remotely.
   183   """A specialised stub for accessing the App Engine datastore remotely.
   157 
   184 
   190     try:
   217     try:
   191       cursor_id = self.__next_local_cursor
   218       cursor_id = self.__next_local_cursor
   192       self.__next_local_cursor += 1
   219       self.__next_local_cursor += 1
   193     finally:
   220     finally:
   194       self.__local_cursor_lock.release()
   221       self.__local_cursor_lock.release()
       
   222     query.clear_count()
   195     self.__queries[cursor_id] = query
   223     self.__queries[cursor_id] = query
   196 
   224 
   197     query_result.mutable_cursor().set_cursor(cursor_id)
   225     query_result.mutable_cursor().set_cursor(cursor_id)
   198     query_result.set_more_results(True)
   226     query_result.set_more_results(True)
       
   227     query_result.set_keys_only(query.keys_only())
   199 
   228 
   200   def _Dynamic_Next(self, next_request, query_result):
   229   def _Dynamic_Next(self, next_request, query_result):
   201     cursor = next_request.cursor().cursor()
   230     cursor = next_request.cursor().cursor()
   202     if cursor not in self.__queries:
   231     if cursor not in self.__queries:
   203       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
   232       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
   212     request.CopyFrom(query)
   241     request.CopyFrom(query)
   213     if request.has_limit():
   242     if request.has_limit():
   214       request.set_limit(min(request.limit(), next_request.count()))
   243       request.set_limit(min(request.limit(), next_request.count()))
   215     else:
   244     else:
   216       request.set_limit(next_request.count())
   245       request.set_limit(next_request.count())
       
   246     request.set_count(request.limit())
   217 
   247 
   218     super(RemoteDatastoreStub, self).MakeSyncCall(
   248     super(RemoteDatastoreStub, self).MakeSyncCall(
   219         'remote_datastore', 'RunQuery', request, query_result)
   249         'remote_datastore', 'RunQuery', request, query_result)
   220 
   250 
   221     query.set_offset(query.offset() + query_result.result_size())
   251     query.set_offset(query.offset() + query_result.result_size())
   227   def _Dynamic_Get(self, get_request, get_response):
   257   def _Dynamic_Get(self, get_request, get_response):
   228     txid = None
   258     txid = None
   229     if get_request.has_transaction():
   259     if get_request.has_transaction():
   230       txid = get_request.transaction().handle()
   260       txid = get_request.transaction().handle()
   231       txdata = self.__transactions[txid]
   261       txdata = self.__transactions[txid]
   232       assert (txdata.thread_id == thread.get_ident(),
   262       assert (txdata.thread_id ==
   233               "Transactions are single-threaded.")
   263           thread.get_ident()), "Transactions are single-threaded."
   234 
   264 
   235       keys = [(k, k.Encode()) for k in get_request.key_list()]
   265       keys = [(k, k.Encode()) for k in get_request.key_list()]
   236 
   266 
   237       new_request = datastore_pb.GetRequest()
   267       new_request = datastore_pb.GetRequest()
   238       for key, enckey in keys:
   268       for key, enckey in keys:
   294           ent.mutable_entity_group().add_element().CopyFrom(
   324           ent.mutable_entity_group().add_element().CopyFrom(
   295               key.path().element(0))
   325               key.path().element(0))
   296 
   326 
   297       txid = put_request.transaction().handle()
   327       txid = put_request.transaction().handle()
   298       txdata = self.__transactions[txid]
   328       txdata = self.__transactions[txid]
   299       assert (txdata.thread_id == thread.get_ident(),
   329       assert (txdata.thread_id ==
   300               "Transactions are single-threaded.")
   330           thread.get_ident()), "Transactions are single-threaded."
   301       for entity in entities:
   331       for entity in entities:
   302         txdata.entities[entity.key().Encode()] = (entity.key(), entity)
   332         txdata.entities[entity.key().Encode()] = (entity.key(), entity)
   303         put_response.add_key().CopyFrom(entity.key())
   333         put_response.add_key().CopyFrom(entity.key())
   304     else:
   334     else:
   305       super(RemoteDatastoreStub, self).MakeSyncCall(
   335       super(RemoteDatastoreStub, self).MakeSyncCall(
   307 
   337 
   308   def _Dynamic_Delete(self, delete_request, response):
   338   def _Dynamic_Delete(self, delete_request, response):
   309     if delete_request.has_transaction():
   339     if delete_request.has_transaction():
   310       txid = delete_request.transaction().handle()
   340       txid = delete_request.transaction().handle()
   311       txdata = self.__transactions[txid]
   341       txdata = self.__transactions[txid]
   312       assert (txdata.thread_id == thread.get_ident(),
   342       assert (txdata.thread_id ==
   313               "Transactions are single-threaded.")
   343           thread.get_ident()), "Transactions are single-threaded."
   314       for key in delete_request.key_list():
   344       for key in delete_request.key_list():
   315         txdata.entities[key.Encode()] = (key, None)
   345         txdata.entities[key.Encode()] = (key, None)
   316     else:
   346     else:
   317       super(RemoteDatastoreStub, self).MakeSyncCall(
   347       super(RemoteDatastoreStub, self).MakeSyncCall(
   318           'datastore_v3', 'Delete', delete_request, response)
   348           'datastore_v3', 'Delete', delete_request, response)
   333       raise apiproxy_errors.ApplicationError(
   363       raise apiproxy_errors.ApplicationError(
   334           datastore_pb.Error.BAD_REQUEST,
   364           datastore_pb.Error.BAD_REQUEST,
   335           'Transaction %d not found.' % (txid,))
   365           'Transaction %d not found.' % (txid,))
   336 
   366 
   337     txdata = self.__transactions[txid]
   367     txdata = self.__transactions[txid]
   338     assert (txdata.thread_id == thread.get_ident(),
   368     assert (txdata.thread_id ==
   339             "Transactions are single-threaded.")
   369         thread.get_ident()), "Transactions are single-threaded."
   340     del self.__transactions[txid]
   370     del self.__transactions[txid]
   341 
   371 
   342     tx = remote_api_pb.TransactionRequest()
   372     tx = remote_api_pb.TransactionRequest()
   343     for key, hash in txdata.preconditions.values():
   373     for key, hash in txdata.preconditions.values():
   344       precond = tx.add_precondition()
   374       precond = tx.add_precondition()
   365       if txid not in self.__transactions:
   395       if txid not in self.__transactions:
   366         raise apiproxy_errors.ApplicationError(
   396         raise apiproxy_errors.ApplicationError(
   367             datastore_pb.Error.BAD_REQUEST,
   397             datastore_pb.Error.BAD_REQUEST,
   368             'Transaction %d not found.' % (txid,))
   398             'Transaction %d not found.' % (txid,))
   369 
   399 
   370       assert (txdata[txid].thread_id == thread.get_ident(),
   400       assert (txdata[txid].thread_id ==
   371               "Transactions are single-threaded.")
   401           thread.get_ident()), "Transactions are single-threaded."
   372       del self.__transactions[txid]
   402       del self.__transactions[txid]
   373     finally:
   403     finally:
   374       self.__local_tx_lock.release()
   404       self.__local_tx_lock.release()
   375 
   405 
   376   def _Dynamic_CreateIndex(self, index, id_response):
   406   def _Dynamic_CreateIndex(self, index, id_response):
   384   def _Dynamic_DeleteIndex(self, index, void):
   414   def _Dynamic_DeleteIndex(self, index, void):
   385     raise apiproxy_errors.CapabilityDisabledError(
   415     raise apiproxy_errors.CapabilityDisabledError(
   386         'The remote datastore does not support index manipulation.')
   416         'The remote datastore does not support index manipulation.')
   387 
   417 
   388 
   418 
   389 def ConfigureRemoteDatastore(app_id,
   419 def ConfigureRemoteApi(app_id,
   390                              path,
   420                        path,
   391                              auth_func,
   421                        auth_func,
   392                              servername=None,
   422                        servername=None,
   393                              rpc_server_factory=appengine_rpc.HttpRpcServer,
   423                        rpc_server_factory=appengine_rpc.HttpRpcServer,
   394                              rtok=None,
   424                        rtok=None,
   395                              secure=False):
   425                        secure=False):
   396   """Does necessary setup to allow easy remote access to an AppEngine datastore.
   426   """Does necessary setup to allow easy remote access to App Engine APIs.
   397 
   427 
   398   Either servername must be provided or app_id must not be None.  If app_id
   428   Either servername must be provided or app_id must not be None.  If app_id
   399   is None and a servername is provided, this function will send a request
   429   is None and a servername is provided, this function will send a request
   400   to the server to retrieve the app_id.
   430   to the server to retrieve the app_id.
   401 
   431 
   436           'Invalid response recieved from server: %s' % response)
   466           'Invalid response recieved from server: %s' % response)
   437     app_info = yaml.load(response)
   467     app_info = yaml.load(response)
   438     if not app_info or 'rtok' not in app_info or 'app_id' not in app_info:
   468     if not app_info or 'rtok' not in app_info or 'app_id' not in app_info:
   439       raise ConfigurationError('Error parsing app_id lookup response')
   469       raise ConfigurationError('Error parsing app_id lookup response')
   440     if app_info['rtok'] != rtok:
   470     if app_info['rtok'] != rtok:
   441       raise ConfigurationError('Token validation failed during app_id lookup.')
   471       raise ConfigurationError('Token validation failed during app_id lookup. '
       
   472                                '(sent %s, got %s)' % (repr(rtok),
       
   473                                                       repr(app_info['rtok'])))
   442     app_id = app_info['app_id']
   474     app_id = app_info['app_id']
   443 
   475 
   444   os.environ['APPLICATION_ID'] = app_id
   476   os.environ['APPLICATION_ID'] = app_id
   445   apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
   477   apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
   446   stub = RemoteDatastoreStub(server, path)
   478   datastore_stub = RemoteDatastoreStub(server, path)
   447   apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)
   479   apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', datastore_stub)
       
   480   stub = RemoteStub(server, path)
       
   481   for service in ['capability_service', 'images', 'mail', 'memcache',
       
   482                   'urlfetch']:
       
   483     apiproxy_stub_map.apiproxy.RegisterStub(service, stub)
       
   484 
       
   485 
       
   486 def MaybeInvokeAuthentication():
       
   487   """Sends an empty request through to the configured end-point.
       
   488 
       
   489   If authentication is necessary, this will cause the rpc_server to invoke
       
   490   interactive authentication.
       
   491   """
       
   492   datastore_stub = apiproxy_stub_map.apiproxy.GetStub('datastore_v3')
       
   493   if isinstance(datastore_stub, RemoteStub):
       
   494     datastore_stub._server.Send(datastore_stub._path, payload=None)
       
   495   else:
       
   496     raise ConfigurationError('remote_api is not configured.')
       
   497 
       
   498 
       
   499 ConfigureRemoteDatastore = ConfigureRemoteApi