thirdparty/google_appengine/google/appengine/api/datastore_file_stub.py
changeset 109 620f9b141567
child 149 f2e327a7c5de
equal deleted inserted replaced
108:261778de26ff 109:620f9b141567
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """
       
    19 In-memory persistent stub for the Python datastore API. Gets, queries,
       
    20 and searches are implemented as in-memory scans over all entities.
       
    21 
       
    22 Stores entities across sessions as pickled proto bufs in a single file. On
       
    23 startup, all entities are read from the file and loaded into memory. On
       
    24 every Put(), the file is wiped and all entities are written from scratch.
       
    25 Clients can also manually Read() and Write() the file themselves.
       
    26 
       
    27 Transactions are serialized through __tx_lock. Each transaction acquires it
       
    28 when it begins and releases it when it commits or rolls back. This is
       
    29 important, since there are other member variables like __tx_snapshot that are
       
    30 per-transaction, so they should only be used by one tx at a time.
       
    31 """
       
    32 
       
    33 
       
    34 
       
    35 
       
    36 
       
    37 
       
    38 import datetime
       
    39 import logging
       
    40 import os
       
    41 import pickle
       
    42 import struct
       
    43 import sys
       
    44 import tempfile
       
    45 import threading
       
    46 import types
       
    47 import warnings
       
    48 
       
    49 from google.appengine.api import api_base_pb
       
    50 from google.appengine.api import datastore
       
    51 from google.appengine.api import datastore_admin
       
    52 from google.appengine.api import datastore_errors
       
    53 from google.appengine.api import datastore_types
       
    54 from google.appengine.api import users
       
    55 from google.appengine.datastore import datastore_pb
       
    56 from google.appengine.datastore import datastore_index
       
    57 from google.appengine.runtime import apiproxy_errors
       
    58 from google.net.proto import ProtocolBuffer
       
    59 from google.appengine.datastore import entity_pb
       
    60 
       
    61 warnings.filterwarnings('ignore', 'tempnam is a potential security risk')
       
    62 
       
    63 
       
    64 entity_pb.Reference.__hash__ = lambda self: hash(self.Encode())
       
    65 datastore_pb.Query.__hash__ = lambda self: hash(self.Encode())
       
    66 
       
    67 
       
    68 class DatastoreFileStub(object):
       
    69   """ Persistent stub for the Python datastore API.
       
    70 
       
    71   Stores all entities in memory, and persists them to a file as pickled
       
    72   protocol buffers. A DatastoreFileStub instance handles a single app's data
       
    73   and is backed by files on disk.
       
    74   """
       
    75 
       
    76   def __init__(self, app_id, datastore_file, history_file,
       
    77                require_indexes=False):
       
    78     """Constructor.
       
    79 
       
    80     Initializes and loads the datastore from the backing files, if they exist.
       
    81 
       
    82     Args:
       
    83       app_id: string
       
    84       datastore_file: string, stores all entities across sessions.  Use None
       
    85           not to use a file.
       
    86       history_file: string, stores query history.  Use None as with
       
    87           datastore_file.
       
    88       require_indexes: bool, default False.  If True, composite indexes must
       
    89           exist in index.yaml for queries that need them.
       
    90     """
       
    91 
       
    92     assert isinstance(app_id, types.StringTypes) and app_id != ''
       
    93     self.__app_id = app_id
       
    94     self.__datastore_file = datastore_file
       
    95     self.__history_file = history_file
       
    96 
       
    97     self.__entities = {}
       
    98 
       
    99     self.__tx_snapshot = {}
       
   100 
       
   101     self.__queries = {}
       
   102 
       
   103     self.__transactions = {}
       
   104 
       
   105     self.__indexes = {}
       
   106     self.__require_indexes = require_indexes
       
   107 
       
   108     self.__query_history = {}
       
   109 
       
   110     self.__next_id = 1
       
   111     self.__next_cursor = 1
       
   112     self.__next_tx_handle = 1
       
   113     self.__next_index_id = 1
       
   114     self.__id_lock = threading.Lock()
       
   115     self.__cursor_lock = threading.Lock()
       
   116     self.__tx_handle_lock = threading.Lock()
       
   117     self.__index_id_lock = threading.Lock()
       
   118     self.__tx_lock = threading.Lock()
       
   119     self.__entities_lock = threading.Lock()
       
   120     self.__file_lock = threading.Lock()
       
   121     self.__indexes_lock = threading.Lock()
       
   122 
       
   123     self.Read()
       
   124 
       
   125   def Clear(self):
       
   126     """ Clears the datastore by deleting all currently stored entities and
       
   127     queries. """
       
   128     self.__entities = {}
       
   129     self.__queries = {}
       
   130     self.__transactions = {}
       
   131     self.__query_history = {}
       
   132 
       
   133   def Read(self):
       
   134     """ Reads the datastore and history files into memory.
       
   135 
       
   136     The in-memory query history is cleared, but the datastore is *not*
       
   137     cleared; the entities in the files are merged into the entities in memory.
       
   138     If you want them to overwrite the in-memory datastore, call Clear() before
       
   139     calling Read().
       
   140 
       
   141     If the datastore file contains an entity with the same app name, kind, and
       
   142     key as an entity already in the datastore, the entity from the file
       
   143     overwrites the entity in the datastore.
       
   144 
       
   145     Also sets __next_id to one greater than the highest id allocated so far.
       
   146     """
       
   147     pb_exceptions = (ProtocolBuffer.ProtocolBufferDecodeError, LookupError,
       
   148                      TypeError, ValueError)
       
   149     error_msg = ('Data in %s is corrupt or a different version. '
       
   150                  'Try running with the --clear_datastore flag.\n%r')
       
   151 
       
   152     if self.__datastore_file and self.__datastore_file != '/dev/null':
       
   153       for encoded_entity in self.__ReadPickled(self.__datastore_file):
       
   154         try:
       
   155           entity = entity_pb.EntityProto(encoded_entity)
       
   156         except pb_exceptions, e:
       
   157           raise datastore_errors.InternalError(error_msg %
       
   158                                                (self.__datastore_file, e))
       
   159 
       
   160         last_path = entity.key().path().element_list()[-1]
       
   161         app_kind = (entity.key().app(), last_path.type())
       
   162         kind_dict = self.__entities.setdefault(app_kind, {})
       
   163         kind_dict[entity.key()] = entity
       
   164 
       
   165         if last_path.has_id() and last_path.id() >= self.__next_id:
       
   166           self.__next_id = last_path.id() + 1
       
   167 
       
   168       self.__query_history = {}
       
   169       for encoded_query, count in self.__ReadPickled(self.__history_file):
       
   170         try:
       
   171           query_pb = datastore_pb.Query(encoded_query)
       
   172         except pb_exceptions, e:
       
   173           raise datastore_errors.InternalError(error_msg %
       
   174                                                (self.__history_file, e))
       
   175 
       
   176         if query_pb in self.__query_history:
       
   177           self.__query_history[query_pb] += count
       
   178         else:
       
   179           self.__query_history[query_pb] = count
       
   180 
       
   181   def Write(self):
       
   182     """ Writes out the datastore and history files. Be careful! If the files
       
   183     already exist, this method overwrites them!
       
   184     """
       
   185     self.__WriteDatastore()
       
   186     self.__WriteHistory()
       
   187 
       
   188   def __WriteDatastore(self):
       
   189     """ Writes out the datastore file. Be careful! If the file already exist,
       
   190     this method overwrites it!
       
   191     """
       
   192     if self.__datastore_file and self.__datastore_file != '/dev/null':
       
   193       encoded = []
       
   194       for kind_dict in self.__entities.values():
       
   195         for entity in kind_dict.values():
       
   196           encoded.append(entity.Encode())
       
   197 
       
   198       self.__WritePickled(encoded, self.__datastore_file)
       
   199 
       
   200   def __WriteHistory(self):
       
   201     """ Writes out the history file. Be careful! If the file already exist,
       
   202     this method overwrites it!
       
   203     """
       
   204     if self.__history_file and self.__history_file != '/dev/null':
       
   205       encoded = [(query.Encode(), count)
       
   206                  for query, count in self.__query_history.items()]
       
   207 
       
   208       self.__WritePickled(encoded, self.__history_file)
       
   209 
       
   210   def __ReadPickled(self, filename):
       
   211     """Reads a pickled object from the given file and returns it.
       
   212     """
       
   213     self.__file_lock.acquire()
       
   214 
       
   215     try:
       
   216       try:
       
   217         if filename and filename != '/dev/null' and os.path.isfile(filename):
       
   218           return pickle.load(open(filename, 'rb'))
       
   219         else:
       
   220           logging.warning('Could not read datastore data from %s', filename)
       
   221       except (AttributeError, LookupError, NameError, TypeError,
       
   222               ValueError, struct.error, pickle.PickleError), e:
       
   223         raise datastore_errors.InternalError(
       
   224           'Could not read data from %s. Try running with the '
       
   225           '--clear_datastore flag. Cause:\n%r' % (filename, e))
       
   226     finally:
       
   227       self.__file_lock.release()
       
   228 
       
   229     return []
       
   230 
       
   231   def __WritePickled(self, obj, filename, openfile=file):
       
   232     """Pickles the object and writes it to the given file.
       
   233     """
       
   234     if not filename or filename == '/dev/null' or not obj:
       
   235       return
       
   236 
       
   237     tmpfile = openfile(os.tempnam(os.path.dirname(filename)), 'wb')
       
   238     pickle.dump(obj, tmpfile, 1)
       
   239     tmpfile.close()
       
   240 
       
   241     self.__file_lock.acquire()
       
   242     try:
       
   243       try:
       
   244         os.rename(tmpfile.name, filename)
       
   245       except OSError:
       
   246         try:
       
   247           os.remove(filename)
       
   248         except:
       
   249           pass
       
   250         os.rename(tmpfile.name, filename)
       
   251     finally:
       
   252       self.__file_lock.release()
       
   253 
       
   254   def MakeSyncCall(self, service, call, request, response):
       
   255     """ The main RPC entry point. service must be 'datastore_v3'. So far, the
       
   256     supported calls are 'Get', 'Put', 'RunQuery', 'Next', and 'Count'.
       
   257     """
       
   258 
       
   259     assert service == 'datastore_v3'
       
   260 
       
   261     explanation = []
       
   262     assert request.IsInitialized(explanation), explanation
       
   263 
       
   264     (getattr(self, "_Dynamic_" + call))(request, response)
       
   265 
       
   266     assert response.IsInitialized(explanation), explanation
       
   267 
       
   268   def ResolveAppId(self, app):
       
   269     """ If the given app name is the placeholder for the local app, returns
       
   270     our app_id. Otherwise returns the app name unchanged.
       
   271     """
       
   272     assert app != ''
       
   273     if app == datastore._LOCAL_APP_ID:
       
   274       return self.__app_id
       
   275     else:
       
   276       return app
       
   277 
       
   278   def QueryHistory(self):
       
   279     """Returns a dict that maps Query PBs to times they've been run.
       
   280     """
       
   281     return dict((pb, times) for pb, times in self.__query_history.items()
       
   282                 if pb.app() == self.__app_id)
       
   283 
       
   284   def _Dynamic_Put(self, put_request, put_response):
       
   285     clones = []
       
   286     for entity in put_request.entity_list():
       
   287       clone = entity_pb.EntityProto()
       
   288       clone.CopyFrom(entity)
       
   289       clones.append(clone)
       
   290 
       
   291       assert clone.has_key()
       
   292       assert clone.key().path().element_size() > 0
       
   293 
       
   294       app = self.ResolveAppId(clone.key().app())
       
   295       clone.mutable_key().set_app(app)
       
   296 
       
   297       last_path = clone.key().path().element_list()[-1]
       
   298       if last_path.id() == 0 and not last_path.has_name():
       
   299         self.__id_lock.acquire()
       
   300         last_path.set_id(self.__next_id)
       
   301         self.__next_id += 1
       
   302         self.__id_lock.release()
       
   303 
       
   304         assert clone.entity_group().element_size() == 0
       
   305         group = clone.mutable_entity_group()
       
   306         root = clone.key().path().element(0)
       
   307         group.add_element().CopyFrom(root)
       
   308 
       
   309       else:
       
   310         assert (clone.has_entity_group() and
       
   311                 clone.entity_group().element_size() > 0)
       
   312 
       
   313     self.__entities_lock.acquire()
       
   314 
       
   315     try:
       
   316       for clone in clones:
       
   317         last_path = clone.key().path().element_list()[-1]
       
   318         kind_dict = self.__entities.setdefault((app, last_path.type()), {})
       
   319         kind_dict[clone.key()] = clone
       
   320     finally:
       
   321       self.__entities_lock.release()
       
   322 
       
   323     if not put_request.has_transaction():
       
   324       self.__WriteDatastore()
       
   325 
       
   326     put_response.key_list().extend([c.key() for c in clones])
       
   327 
       
   328 
       
   329   def _Dynamic_Get(self, get_request, get_response):
       
   330     for key in get_request.key_list():
       
   331         app = self.ResolveAppId(key.app())
       
   332         key.set_app(app)
       
   333         last_path = key.path().element_list()[-1]
       
   334 
       
   335         group = get_response.add_entity()
       
   336         try:
       
   337           entity = self.__entities[app, last_path.type()][key]
       
   338         except KeyError:
       
   339           entity = None
       
   340 
       
   341         if entity:
       
   342           group.mutable_entity().CopyFrom(entity)
       
   343 
       
   344 
       
   345   def _Dynamic_Delete(self, delete_request, delete_response):
       
   346     self.__entities_lock.acquire()
       
   347     try:
       
   348       for key in delete_request.key_list():
       
   349         try:
       
   350           app = self.ResolveAppId(key.app())
       
   351           key.set_app(app)
       
   352           kind = key.path().element_list()[-1].type()
       
   353           del self.__entities[app, kind][key]
       
   354           if not self.__entities[app, kind]:
       
   355             del self.__entities[app, kind]
       
   356         except KeyError:
       
   357           pass
       
   358 
       
   359         if not delete_request.has_transaction():
       
   360           self.__WriteDatastore()
       
   361     finally:
       
   362       self.__entities_lock.release()
       
   363 
       
   364 
       
   365   def _Dynamic_RunQuery(self, query, query_result):
       
   366     if not self.__tx_lock.acquire(False):
       
   367       raise apiproxy_errors.ApplicationError(
       
   368         datastore_pb.Error.BAD_REQUEST, "Can't query inside a transaction.")
       
   369     else:
       
   370       self.__tx_lock.release()
       
   371 
       
   372     app = self.ResolveAppId(query.app())
       
   373 
       
   374     if self.__require_indexes:
       
   375       required_index = datastore_index.CompositeIndexForQuery(query)
       
   376       if required_index is not None:
       
   377         kind, ancestor, props, num_eq_filters = required_index
       
   378         required_key = kind, ancestor, props
       
   379         indexes = self.__indexes.get(app)
       
   380         if not indexes:
       
   381           raise apiproxy_errors.ApplicationError(
       
   382               datastore_pb.Error.NEED_INDEX,
       
   383               "This query requires a composite index, but none are defined. "
       
   384               "You must create an index.yaml file in your application root.")
       
   385         eq_filters_set = set(props[:num_eq_filters])
       
   386         remaining_filters = props[num_eq_filters:]
       
   387         for index in indexes:
       
   388           definition = datastore_admin.ProtoToIndexDefinition(index)
       
   389           index_key = datastore_index.IndexToKey(definition)
       
   390           if required_key == index_key:
       
   391             break
       
   392           if num_eq_filters > 1 and (kind, ancestor) == index_key[:2]:
       
   393             this_props = index_key[2]
       
   394             this_eq_filters_set = set(this_props[:num_eq_filters])
       
   395             this_remaining_filters = this_props[num_eq_filters:]
       
   396             if (eq_filters_set == this_eq_filters_set and
       
   397                 remaining_filters == this_remaining_filters):
       
   398               break
       
   399         else:
       
   400           raise apiproxy_errors.ApplicationError(
       
   401               datastore_pb.Error.NEED_INDEX,
       
   402               "This query requires a composite index that is not defined. "
       
   403               "You must update the index.yaml file in your application root.")
       
   404 
       
   405     try:
       
   406       query.set_app(app)
       
   407       results = self.__entities[app, query.kind()].values()
       
   408       results = [datastore.Entity._FromPb(pb) for pb in results]
       
   409     except KeyError:
       
   410       results = []
       
   411 
       
   412     if query.has_ancestor():
       
   413       ancestor_path = query.ancestor().path().element_list()
       
   414       def is_descendant(entity):
       
   415         path = entity.key()._Key__reference.path().element_list()
       
   416         return path[:len(ancestor_path)] == ancestor_path
       
   417       results = filter(is_descendant, results)
       
   418 
       
   419     operators = {datastore_pb.Query_Filter.LESS_THAN:             '<',
       
   420                  datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL:    '<=',
       
   421                  datastore_pb.Query_Filter.GREATER_THAN:          '>',
       
   422                  datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
       
   423                  datastore_pb.Query_Filter.EQUAL:                 '==',
       
   424                  }
       
   425 
       
   426     for filt in query.filter_list():
       
   427       assert filt.op() != datastore_pb.Query_Filter.IN
       
   428 
       
   429       prop = filt.property(0).name().decode('utf-8')
       
   430       op = operators[filt.op()]
       
   431 
       
   432       def passes(entity):
       
   433         """ Returns True if the entity passes the filter, False otherwise. """
       
   434         entity_vals = entity.get(prop, [])
       
   435         if type(entity_vals) is not types.ListType:
       
   436           entity_vals = [entity_vals]
       
   437 
       
   438         entity_property_list = [datastore_types.ToPropertyPb(prop, value)
       
   439                                 for value in entity_vals]
       
   440 
       
   441         for entity_prop in entity_property_list:
       
   442           fixed_entity_val = datastore_types.FromPropertyPb(entity_prop)
       
   443 
       
   444           if isinstance(fixed_entity_val, datastore_types._RAW_PROPERTY_TYPES):
       
   445             continue
       
   446 
       
   447           for filter_prop in filt.property_list():
       
   448             filter_val = datastore_types.FromPropertyPb(filter_prop)
       
   449 
       
   450             comp = u'%r %s %r' % (fixed_entity_val, op, filter_val)
       
   451 
       
   452             logging.log(logging.DEBUG - 1,
       
   453                         'Evaling filter expression "%s"', comp)
       
   454 
       
   455             try:
       
   456               ret = eval(comp)
       
   457               if ret and ret != NotImplementedError:
       
   458                 return True
       
   459             except TypeError:
       
   460               pass
       
   461 
       
   462         return False
       
   463 
       
   464       results = filter(passes, results)
       
   465 
       
   466     def has_prop_indexed(entity, prop):
       
   467       """Returns True if prop is in the entity and is not a raw property."""
       
   468       values = entity.get(prop, [])
       
   469       if not isinstance(values, (tuple, list)):
       
   470         values = [values]
       
   471 
       
   472       for value in values:
       
   473         if not isinstance(value, datastore_types._RAW_PROPERTY_TYPES):
       
   474           return True
       
   475       return False
       
   476 
       
   477     for order in query.order_list():
       
   478       prop = order.property().decode('utf-8')
       
   479       results = [entity for entity in results if has_prop_indexed(entity, prop)]
       
   480 
       
   481     def order_compare(a, b):
       
   482       """ Return a negative, zero or positive number depending on whether
       
   483       entity a is considered smaller than, equal to, or larger than b,
       
   484       according to the query's orderings. """
       
   485       cmped = 0
       
   486       for o in query.order_list():
       
   487         prop = o.property().decode('utf-8')
       
   488 
       
   489         if o.direction() is datastore_pb.Query_Order.ASCENDING:
       
   490           selector = min
       
   491         else:
       
   492           selector = max
       
   493 
       
   494         a_val = a[prop]
       
   495         if isinstance(a_val, list):
       
   496           a_val = selector(a_val)
       
   497 
       
   498         b_val = b[prop]
       
   499         if isinstance(b_val, list):
       
   500           b_val = selector(b_val)
       
   501 
       
   502         try:
       
   503           cmped = cmp(a_val, b_val)
       
   504         except TypeError:
       
   505           cmped = NotImplementedError
       
   506 
       
   507         if cmped == NotImplementedError:
       
   508           cmped = cmp(type(a_val), type(b_val))
       
   509 
       
   510         if o.direction() is datastore_pb.Query_Order.DESCENDING:
       
   511           cmped = -cmped
       
   512 
       
   513         if cmped != 0:
       
   514           return cmped
       
   515       if cmped == 0:
       
   516         return cmp(a.key(), b.key())
       
   517 
       
   518     results.sort(order_compare)
       
   519 
       
   520     offset = 0
       
   521     limit = len(results)
       
   522     if query.has_offset():
       
   523       offset = query.offset()
       
   524     if query.has_limit():
       
   525       limit = query.limit()
       
   526     results = results[offset:limit + offset]
       
   527 
       
   528     clone = datastore_pb.Query()
       
   529     clone.CopyFrom(query)
       
   530     clone.clear_hint()
       
   531     if clone in self.__query_history:
       
   532       self.__query_history[clone] += 1
       
   533     else:
       
   534       self.__query_history[clone] = 1
       
   535     self.__WriteHistory()
       
   536 
       
   537     results = [e._ToPb() for e in results]
       
   538     self.__cursor_lock.acquire()
       
   539     cursor = self.__next_cursor
       
   540     self.__next_cursor += 1
       
   541     self.__cursor_lock.release()
       
   542     self.__queries[cursor] = (results, len(results))
       
   543 
       
   544     query_result.mutable_cursor().set_cursor(cursor)
       
   545     query_result.set_more_results(len(results) > 0)
       
   546 
       
   547 
       
   548   def _Dynamic_Next(self, next_request, query_result):
       
   549     cursor = next_request.cursor().cursor()
       
   550 
       
   551     try:
       
   552       results, orig_count = self.__queries[cursor]
       
   553     except KeyError:
       
   554       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
       
   555                                              'Cursor %d not found' % cursor)
       
   556 
       
   557     count = next_request.count()
       
   558     for r in results[:count]:
       
   559       query_result.add_result().CopyFrom(r)
       
   560     del results[:count]
       
   561 
       
   562     query_result.set_more_results(len(results) > 0)
       
   563 
       
   564 
       
   565   def _Dynamic_Count(self, query, integer64proto):
       
   566     query_result = datastore_pb.QueryResult()
       
   567     self._Dynamic_RunQuery(query, query_result)
       
   568     cursor = query_result.cursor().cursor()
       
   569     results, count = self.__queries[cursor]
       
   570     integer64proto.set_value(count)
       
   571     del self.__queries[cursor]
       
   572 
       
   573 
       
   574   def _Dynamic_BeginTransaction(self, request, transaction):
       
   575     self.__tx_handle_lock.acquire()
       
   576     handle = self.__next_tx_handle
       
   577     self.__next_tx_handle += 1
       
   578     self.__tx_handle_lock.release()
       
   579 
       
   580     self.__transactions[handle] = None
       
   581     transaction.set_handle(handle)
       
   582 
       
   583     self.__tx_lock.acquire()
       
   584     snapshot = [(app_kind, dict(entities))
       
   585                 for app_kind, entities in self.__entities.items()]
       
   586     self.__tx_snapshot = dict(snapshot)
       
   587 
       
   588   def _Dynamic_Commit(self, transaction, transaction_response):
       
   589     if not self.__transactions.has_key(transaction.handle()):
       
   590       raise apiproxy_errors.ApplicationError(
       
   591         datastore_pb.Error.BAD_REQUEST,
       
   592         'Transaction handle %d not found' % transaction.handle())
       
   593 
       
   594     self.__tx_snapshot = {}
       
   595     try:
       
   596       self.__WriteDatastore()
       
   597     finally:
       
   598       self.__tx_lock.release()
       
   599 
       
   600   def _Dynamic_Rollback(self, transaction, transaction_response):
       
   601     if not self.__transactions.has_key(transaction.handle()):
       
   602       raise apiproxy_errors.ApplicationError(
       
   603         datastore_pb.Error.BAD_REQUEST,
       
   604         'Transaction handle %d not found' % transaction.handle())
       
   605 
       
   606     self.__entities = self.__tx_snapshot
       
   607     self.__tx_snapshot = {}
       
   608     self.__tx_lock.release()
       
   609 
       
   610   def _Dynamic_GetSchema(self, app_str, schema):
       
   611     minint = -sys.maxint - 1
       
   612 
       
   613     app_str = self.ResolveAppId(app_str.value())
       
   614 
       
   615     kinds = []
       
   616 
       
   617     for app, kind in self.__entities:
       
   618       if app == app_str:
       
   619         kind_pb = entity_pb.EntityProto()
       
   620         kind_pb.mutable_key().set_app('')
       
   621         kind_pb.mutable_key().mutable_path().add_element().set_type(kind)
       
   622         kind_pb.mutable_entity_group()
       
   623         kinds.append(kind_pb)
       
   624 
       
   625         props = {}
       
   626 
       
   627         for entity in self.__entities[(app, kind)].values():
       
   628           for prop in entity.property_list():
       
   629             if prop.name() not in props:
       
   630               props[prop.name()] = entity_pb.PropertyValue()
       
   631             props[prop.name()].MergeFrom(prop.value())
       
   632 
       
   633         for value_pb in props.values():
       
   634           if value_pb.has_int64value():
       
   635             value_pb.set_int64value(minint)
       
   636           if value_pb.has_booleanvalue():
       
   637             value_pb.set_booleanvalue(False)
       
   638           if value_pb.has_stringvalue():
       
   639             value_pb.set_stringvalue('')
       
   640           if value_pb.has_doublevalue():
       
   641             value_pb.set_doublevalue(float('-inf'))
       
   642           if value_pb.has_pointvalue():
       
   643             value_pb.mutable_pointvalue().set_x(float('-inf'))
       
   644             value_pb.mutable_pointvalue().set_y(float('-inf'))
       
   645           if value_pb.has_uservalue():
       
   646             value_pb.mutable_uservalue().set_gaiaid(minint)
       
   647             value_pb.mutable_uservalue().set_email('')
       
   648             value_pb.mutable_uservalue().set_auth_domain('')
       
   649             value_pb.mutable_uservalue().clear_nickname()
       
   650           elif value_pb.has_referencevalue():
       
   651             value_pb.clear_referencevalue()
       
   652             value_pb.mutable_referencevalue().set_app('')
       
   653 
       
   654         for name, value_pb in props.items():
       
   655           prop_pb = kind_pb.add_property()
       
   656           prop_pb.set_name(name)
       
   657           prop_pb.mutable_value().CopyFrom(value_pb)
       
   658 
       
   659     schema.kind_list().extend(kinds)
       
   660 
       
   661   def _Dynamic_CreateIndex(self, index, id_response):
       
   662     if index.id() != 0:
       
   663       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
       
   664                                              'New index id must be 0.')
       
   665     elif self.__FindIndex(index):
       
   666       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
       
   667                                              'Index already exists.')
       
   668 
       
   669     self.__index_id_lock.acquire()
       
   670     index.set_id(self.__next_index_id)
       
   671     id_response.set_value(self.__next_index_id)
       
   672     self.__next_index_id += 1
       
   673     self.__index_id_lock.release()
       
   674 
       
   675     clone = entity_pb.CompositeIndex()
       
   676     clone.CopyFrom(index)
       
   677     app = self.ResolveAppId(index.app_id())
       
   678     clone.set_app_id(app)
       
   679 
       
   680     self.__indexes_lock.acquire()
       
   681     try:
       
   682       if app not in self.__indexes:
       
   683         self.__indexes[app] = []
       
   684       self.__indexes[app].append(clone)
       
   685     finally:
       
   686       self.__indexes_lock.release()
       
   687 
       
   688   def _Dynamic_GetIndices(self, app_str, composite_indices):
       
   689     composite_indices.index_list().extend(
       
   690       self.__indexes.get(self.ResolveAppId(app_str.value()), []))
       
   691 
       
   692   def _Dynamic_UpdateIndex(self, index, void):
       
   693     stored_index = self.__FindIndex(index)
       
   694     if not stored_index:
       
   695       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
       
   696                                              "Index doesn't exist.")
       
   697     elif index.state() != stored_index.state() + 1:
       
   698       raise apiproxy_errors.ApplicationError(
       
   699         datastore_pb.Error.BAD_REQUEST,
       
   700         "cannot move index state from %s to %s" %
       
   701           (entity_pb.CompositeIndex.State_Name(stored_index.state()),
       
   702           (entity_pb.CompositeIndex.State_Name(index.state()))))
       
   703 
       
   704     self.__indexes_lock.acquire()
       
   705     try:
       
   706       stored_index.set_state(index.state())
       
   707     finally:
       
   708       self.__indexes_lock.release()
       
   709 
       
   710   def _Dynamic_DeleteIndex(self, index, void):
       
   711     stored_index = self.__FindIndex(index)
       
   712     if not stored_index:
       
   713       raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
       
   714                                              "Index doesn't exist.")
       
   715 
       
   716     app = self.ResolveAppId(index.app_id())
       
   717     self.__indexes_lock.acquire()
       
   718     try:
       
   719       self.__indexes[app].remove(stored_index)
       
   720     finally:
       
   721       self.__indexes_lock.release()
       
   722 
       
   723   def __FindIndex(self, index):
       
   724     """Finds an existing index by definition.
       
   725 
       
   726     Args:
       
   727       definition: entity_pb.CompositeIndex
       
   728 
       
   729     Returns:
       
   730       entity_pb.CompositeIndex, if it exists; otherwise None
       
   731     """
       
   732     app = self.ResolveAppId(index.app_id())
       
   733 
       
   734     if app in self.__indexes:
       
   735       for stored_index in self.__indexes[app]:
       
   736         if index.definition() == stored_index.definition():
       
   737           return stored_index
       
   738 
       
   739     return None