|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """ |
|
19 In-memory persistent stub for the Python datastore API. Gets, queries, |
|
20 and searches are implemented as in-memory scans over all entities. |
|
21 |
|
22 Stores entities across sessions as pickled proto bufs in a single file. On |
|
23 startup, all entities are read from the file and loaded into memory. On |
|
24 every Put(), the file is wiped and all entities are written from scratch. |
|
25 Clients can also manually Read() and Write() the file themselves. |
|
26 |
|
27 Transactions are serialized through __tx_lock. Each transaction acquires it |
|
28 when it begins and releases it when it commits or rolls back. This is |
|
29 important, since there are other member variables like __tx_snapshot that are |
|
30 per-transaction, so they should only be used by one tx at a time. |
|
31 """ |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 import datetime |
|
39 import logging |
|
40 import os |
|
41 import pickle |
|
42 import struct |
|
43 import sys |
|
44 import tempfile |
|
45 import threading |
|
46 import types |
|
47 import warnings |
|
48 |
|
49 from google.appengine.api import api_base_pb |
|
50 from google.appengine.api import datastore |
|
51 from google.appengine.api import datastore_admin |
|
52 from google.appengine.api import datastore_errors |
|
53 from google.appengine.api import datastore_types |
|
54 from google.appengine.api import users |
|
55 from google.appengine.datastore import datastore_pb |
|
56 from google.appengine.datastore import datastore_index |
|
57 from google.appengine.runtime import apiproxy_errors |
|
58 from google.net.proto import ProtocolBuffer |
|
59 from google.appengine.datastore import entity_pb |
|
60 |
|
61 warnings.filterwarnings('ignore', 'tempnam is a potential security risk') |
|
62 |
|
63 |
|
64 entity_pb.Reference.__hash__ = lambda self: hash(self.Encode()) |
|
65 datastore_pb.Query.__hash__ = lambda self: hash(self.Encode()) |
|
66 |
|
67 |
|
68 class DatastoreFileStub(object): |
|
69 """ Persistent stub for the Python datastore API. |
|
70 |
|
71 Stores all entities in memory, and persists them to a file as pickled |
|
72 protocol buffers. A DatastoreFileStub instance handles a single app's data |
|
73 and is backed by files on disk. |
|
74 """ |
|
75 |
|
76 def __init__(self, app_id, datastore_file, history_file, |
|
77 require_indexes=False): |
|
78 """Constructor. |
|
79 |
|
80 Initializes and loads the datastore from the backing files, if they exist. |
|
81 |
|
82 Args: |
|
83 app_id: string |
|
84 datastore_file: string, stores all entities across sessions. Use None |
|
85 not to use a file. |
|
86 history_file: string, stores query history. Use None as with |
|
87 datastore_file. |
|
88 require_indexes: bool, default False. If True, composite indexes must |
|
89 exist in index.yaml for queries that need them. |
|
90 """ |
|
91 |
|
92 assert isinstance(app_id, types.StringTypes) and app_id != '' |
|
93 self.__app_id = app_id |
|
94 self.__datastore_file = datastore_file |
|
95 self.__history_file = history_file |
|
96 |
|
97 self.__entities = {} |
|
98 |
|
99 self.__tx_snapshot = {} |
|
100 |
|
101 self.__queries = {} |
|
102 |
|
103 self.__transactions = {} |
|
104 |
|
105 self.__indexes = {} |
|
106 self.__require_indexes = require_indexes |
|
107 |
|
108 self.__query_history = {} |
|
109 |
|
110 self.__next_id = 1 |
|
111 self.__next_cursor = 1 |
|
112 self.__next_tx_handle = 1 |
|
113 self.__next_index_id = 1 |
|
114 self.__id_lock = threading.Lock() |
|
115 self.__cursor_lock = threading.Lock() |
|
116 self.__tx_handle_lock = threading.Lock() |
|
117 self.__index_id_lock = threading.Lock() |
|
118 self.__tx_lock = threading.Lock() |
|
119 self.__entities_lock = threading.Lock() |
|
120 self.__file_lock = threading.Lock() |
|
121 self.__indexes_lock = threading.Lock() |
|
122 |
|
123 self.Read() |
|
124 |
|
125 def Clear(self): |
|
126 """ Clears the datastore by deleting all currently stored entities and |
|
127 queries. """ |
|
128 self.__entities = {} |
|
129 self.__queries = {} |
|
130 self.__transactions = {} |
|
131 self.__query_history = {} |
|
132 |
|
133 def Read(self): |
|
134 """ Reads the datastore and history files into memory. |
|
135 |
|
136 The in-memory query history is cleared, but the datastore is *not* |
|
137 cleared; the entities in the files are merged into the entities in memory. |
|
138 If you want them to overwrite the in-memory datastore, call Clear() before |
|
139 calling Read(). |
|
140 |
|
141 If the datastore file contains an entity with the same app name, kind, and |
|
142 key as an entity already in the datastore, the entity from the file |
|
143 overwrites the entity in the datastore. |
|
144 |
|
145 Also sets __next_id to one greater than the highest id allocated so far. |
|
146 """ |
|
147 pb_exceptions = (ProtocolBuffer.ProtocolBufferDecodeError, LookupError, |
|
148 TypeError, ValueError) |
|
149 error_msg = ('Data in %s is corrupt or a different version. ' |
|
150 'Try running with the --clear_datastore flag.\n%r') |
|
151 |
|
152 if self.__datastore_file and self.__datastore_file != '/dev/null': |
|
153 for encoded_entity in self.__ReadPickled(self.__datastore_file): |
|
154 try: |
|
155 entity = entity_pb.EntityProto(encoded_entity) |
|
156 except pb_exceptions, e: |
|
157 raise datastore_errors.InternalError(error_msg % |
|
158 (self.__datastore_file, e)) |
|
159 |
|
160 last_path = entity.key().path().element_list()[-1] |
|
161 app_kind = (entity.key().app(), last_path.type()) |
|
162 kind_dict = self.__entities.setdefault(app_kind, {}) |
|
163 kind_dict[entity.key()] = entity |
|
164 |
|
165 if last_path.has_id() and last_path.id() >= self.__next_id: |
|
166 self.__next_id = last_path.id() + 1 |
|
167 |
|
168 self.__query_history = {} |
|
169 for encoded_query, count in self.__ReadPickled(self.__history_file): |
|
170 try: |
|
171 query_pb = datastore_pb.Query(encoded_query) |
|
172 except pb_exceptions, e: |
|
173 raise datastore_errors.InternalError(error_msg % |
|
174 (self.__history_file, e)) |
|
175 |
|
176 if query_pb in self.__query_history: |
|
177 self.__query_history[query_pb] += count |
|
178 else: |
|
179 self.__query_history[query_pb] = count |
|
180 |
|
181 def Write(self): |
|
182 """ Writes out the datastore and history files. Be careful! If the files |
|
183 already exist, this method overwrites them! |
|
184 """ |
|
185 self.__WriteDatastore() |
|
186 self.__WriteHistory() |
|
187 |
|
188 def __WriteDatastore(self): |
|
189 """ Writes out the datastore file. Be careful! If the file already exist, |
|
190 this method overwrites it! |
|
191 """ |
|
192 if self.__datastore_file and self.__datastore_file != '/dev/null': |
|
193 encoded = [] |
|
194 for kind_dict in self.__entities.values(): |
|
195 for entity in kind_dict.values(): |
|
196 encoded.append(entity.Encode()) |
|
197 |
|
198 self.__WritePickled(encoded, self.__datastore_file) |
|
199 |
|
200 def __WriteHistory(self): |
|
201 """ Writes out the history file. Be careful! If the file already exist, |
|
202 this method overwrites it! |
|
203 """ |
|
204 if self.__history_file and self.__history_file != '/dev/null': |
|
205 encoded = [(query.Encode(), count) |
|
206 for query, count in self.__query_history.items()] |
|
207 |
|
208 self.__WritePickled(encoded, self.__history_file) |
|
209 |
|
210 def __ReadPickled(self, filename): |
|
211 """Reads a pickled object from the given file and returns it. |
|
212 """ |
|
213 self.__file_lock.acquire() |
|
214 |
|
215 try: |
|
216 try: |
|
217 if filename and filename != '/dev/null' and os.path.isfile(filename): |
|
218 return pickle.load(open(filename, 'rb')) |
|
219 else: |
|
220 logging.warning('Could not read datastore data from %s', filename) |
|
221 except (AttributeError, LookupError, NameError, TypeError, |
|
222 ValueError, struct.error, pickle.PickleError), e: |
|
223 raise datastore_errors.InternalError( |
|
224 'Could not read data from %s. Try running with the ' |
|
225 '--clear_datastore flag. Cause:\n%r' % (filename, e)) |
|
226 finally: |
|
227 self.__file_lock.release() |
|
228 |
|
229 return [] |
|
230 |
|
231 def __WritePickled(self, obj, filename, openfile=file): |
|
232 """Pickles the object and writes it to the given file. |
|
233 """ |
|
234 if not filename or filename == '/dev/null' or not obj: |
|
235 return |
|
236 |
|
237 tmpfile = openfile(os.tempnam(os.path.dirname(filename)), 'wb') |
|
238 pickle.dump(obj, tmpfile, 1) |
|
239 tmpfile.close() |
|
240 |
|
241 self.__file_lock.acquire() |
|
242 try: |
|
243 try: |
|
244 os.rename(tmpfile.name, filename) |
|
245 except OSError: |
|
246 try: |
|
247 os.remove(filename) |
|
248 except: |
|
249 pass |
|
250 os.rename(tmpfile.name, filename) |
|
251 finally: |
|
252 self.__file_lock.release() |
|
253 |
|
254 def MakeSyncCall(self, service, call, request, response): |
|
255 """ The main RPC entry point. service must be 'datastore_v3'. So far, the |
|
256 supported calls are 'Get', 'Put', 'RunQuery', 'Next', and 'Count'. |
|
257 """ |
|
258 |
|
259 assert service == 'datastore_v3' |
|
260 |
|
261 explanation = [] |
|
262 assert request.IsInitialized(explanation), explanation |
|
263 |
|
264 (getattr(self, "_Dynamic_" + call))(request, response) |
|
265 |
|
266 assert response.IsInitialized(explanation), explanation |
|
267 |
|
268 def ResolveAppId(self, app): |
|
269 """ If the given app name is the placeholder for the local app, returns |
|
270 our app_id. Otherwise returns the app name unchanged. |
|
271 """ |
|
272 assert app != '' |
|
273 if app == datastore._LOCAL_APP_ID: |
|
274 return self.__app_id |
|
275 else: |
|
276 return app |
|
277 |
|
278 def QueryHistory(self): |
|
279 """Returns a dict that maps Query PBs to times they've been run. |
|
280 """ |
|
281 return dict((pb, times) for pb, times in self.__query_history.items() |
|
282 if pb.app() == self.__app_id) |
|
283 |
|
284 def _Dynamic_Put(self, put_request, put_response): |
|
285 clones = [] |
|
286 for entity in put_request.entity_list(): |
|
287 clone = entity_pb.EntityProto() |
|
288 clone.CopyFrom(entity) |
|
289 clones.append(clone) |
|
290 |
|
291 assert clone.has_key() |
|
292 assert clone.key().path().element_size() > 0 |
|
293 |
|
294 app = self.ResolveAppId(clone.key().app()) |
|
295 clone.mutable_key().set_app(app) |
|
296 |
|
297 last_path = clone.key().path().element_list()[-1] |
|
298 if last_path.id() == 0 and not last_path.has_name(): |
|
299 self.__id_lock.acquire() |
|
300 last_path.set_id(self.__next_id) |
|
301 self.__next_id += 1 |
|
302 self.__id_lock.release() |
|
303 |
|
304 assert clone.entity_group().element_size() == 0 |
|
305 group = clone.mutable_entity_group() |
|
306 root = clone.key().path().element(0) |
|
307 group.add_element().CopyFrom(root) |
|
308 |
|
309 else: |
|
310 assert (clone.has_entity_group() and |
|
311 clone.entity_group().element_size() > 0) |
|
312 |
|
313 self.__entities_lock.acquire() |
|
314 |
|
315 try: |
|
316 for clone in clones: |
|
317 last_path = clone.key().path().element_list()[-1] |
|
318 kind_dict = self.__entities.setdefault((app, last_path.type()), {}) |
|
319 kind_dict[clone.key()] = clone |
|
320 finally: |
|
321 self.__entities_lock.release() |
|
322 |
|
323 if not put_request.has_transaction(): |
|
324 self.__WriteDatastore() |
|
325 |
|
326 put_response.key_list().extend([c.key() for c in clones]) |
|
327 |
|
328 |
|
329 def _Dynamic_Get(self, get_request, get_response): |
|
330 for key in get_request.key_list(): |
|
331 app = self.ResolveAppId(key.app()) |
|
332 key.set_app(app) |
|
333 last_path = key.path().element_list()[-1] |
|
334 |
|
335 group = get_response.add_entity() |
|
336 try: |
|
337 entity = self.__entities[app, last_path.type()][key] |
|
338 except KeyError: |
|
339 entity = None |
|
340 |
|
341 if entity: |
|
342 group.mutable_entity().CopyFrom(entity) |
|
343 |
|
344 |
|
345 def _Dynamic_Delete(self, delete_request, delete_response): |
|
346 self.__entities_lock.acquire() |
|
347 try: |
|
348 for key in delete_request.key_list(): |
|
349 try: |
|
350 app = self.ResolveAppId(key.app()) |
|
351 key.set_app(app) |
|
352 kind = key.path().element_list()[-1].type() |
|
353 del self.__entities[app, kind][key] |
|
354 if not self.__entities[app, kind]: |
|
355 del self.__entities[app, kind] |
|
356 except KeyError: |
|
357 pass |
|
358 |
|
359 if not delete_request.has_transaction(): |
|
360 self.__WriteDatastore() |
|
361 finally: |
|
362 self.__entities_lock.release() |
|
363 |
|
364 |
|
365 def _Dynamic_RunQuery(self, query, query_result): |
|
366 if not self.__tx_lock.acquire(False): |
|
367 raise apiproxy_errors.ApplicationError( |
|
368 datastore_pb.Error.BAD_REQUEST, "Can't query inside a transaction.") |
|
369 else: |
|
370 self.__tx_lock.release() |
|
371 |
|
372 app = self.ResolveAppId(query.app()) |
|
373 |
|
374 if self.__require_indexes: |
|
375 required_index = datastore_index.CompositeIndexForQuery(query) |
|
376 if required_index is not None: |
|
377 kind, ancestor, props, num_eq_filters = required_index |
|
378 required_key = kind, ancestor, props |
|
379 indexes = self.__indexes.get(app) |
|
380 if not indexes: |
|
381 raise apiproxy_errors.ApplicationError( |
|
382 datastore_pb.Error.NEED_INDEX, |
|
383 "This query requires a composite index, but none are defined. " |
|
384 "You must create an index.yaml file in your application root.") |
|
385 eq_filters_set = set(props[:num_eq_filters]) |
|
386 remaining_filters = props[num_eq_filters:] |
|
387 for index in indexes: |
|
388 definition = datastore_admin.ProtoToIndexDefinition(index) |
|
389 index_key = datastore_index.IndexToKey(definition) |
|
390 if required_key == index_key: |
|
391 break |
|
392 if num_eq_filters > 1 and (kind, ancestor) == index_key[:2]: |
|
393 this_props = index_key[2] |
|
394 this_eq_filters_set = set(this_props[:num_eq_filters]) |
|
395 this_remaining_filters = this_props[num_eq_filters:] |
|
396 if (eq_filters_set == this_eq_filters_set and |
|
397 remaining_filters == this_remaining_filters): |
|
398 break |
|
399 else: |
|
400 raise apiproxy_errors.ApplicationError( |
|
401 datastore_pb.Error.NEED_INDEX, |
|
402 "This query requires a composite index that is not defined. " |
|
403 "You must update the index.yaml file in your application root.") |
|
404 |
|
405 try: |
|
406 query.set_app(app) |
|
407 results = self.__entities[app, query.kind()].values() |
|
408 results = [datastore.Entity._FromPb(pb) for pb in results] |
|
409 except KeyError: |
|
410 results = [] |
|
411 |
|
412 if query.has_ancestor(): |
|
413 ancestor_path = query.ancestor().path().element_list() |
|
414 def is_descendant(entity): |
|
415 path = entity.key()._Key__reference.path().element_list() |
|
416 return path[:len(ancestor_path)] == ancestor_path |
|
417 results = filter(is_descendant, results) |
|
418 |
|
419 operators = {datastore_pb.Query_Filter.LESS_THAN: '<', |
|
420 datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=', |
|
421 datastore_pb.Query_Filter.GREATER_THAN: '>', |
|
422 datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=', |
|
423 datastore_pb.Query_Filter.EQUAL: '==', |
|
424 } |
|
425 |
|
426 for filt in query.filter_list(): |
|
427 assert filt.op() != datastore_pb.Query_Filter.IN |
|
428 |
|
429 prop = filt.property(0).name().decode('utf-8') |
|
430 op = operators[filt.op()] |
|
431 |
|
432 def passes(entity): |
|
433 """ Returns True if the entity passes the filter, False otherwise. """ |
|
434 entity_vals = entity.get(prop, []) |
|
435 if type(entity_vals) is not types.ListType: |
|
436 entity_vals = [entity_vals] |
|
437 |
|
438 entity_property_list = [datastore_types.ToPropertyPb(prop, value) |
|
439 for value in entity_vals] |
|
440 |
|
441 for entity_prop in entity_property_list: |
|
442 fixed_entity_val = datastore_types.FromPropertyPb(entity_prop) |
|
443 |
|
444 if isinstance(fixed_entity_val, datastore_types._RAW_PROPERTY_TYPES): |
|
445 continue |
|
446 |
|
447 for filter_prop in filt.property_list(): |
|
448 filter_val = datastore_types.FromPropertyPb(filter_prop) |
|
449 |
|
450 comp = u'%r %s %r' % (fixed_entity_val, op, filter_val) |
|
451 |
|
452 logging.log(logging.DEBUG - 1, |
|
453 'Evaling filter expression "%s"', comp) |
|
454 |
|
455 try: |
|
456 ret = eval(comp) |
|
457 if ret and ret != NotImplementedError: |
|
458 return True |
|
459 except TypeError: |
|
460 pass |
|
461 |
|
462 return False |
|
463 |
|
464 results = filter(passes, results) |
|
465 |
|
466 def has_prop_indexed(entity, prop): |
|
467 """Returns True if prop is in the entity and is not a raw property.""" |
|
468 values = entity.get(prop, []) |
|
469 if not isinstance(values, (tuple, list)): |
|
470 values = [values] |
|
471 |
|
472 for value in values: |
|
473 if not isinstance(value, datastore_types._RAW_PROPERTY_TYPES): |
|
474 return True |
|
475 return False |
|
476 |
|
477 for order in query.order_list(): |
|
478 prop = order.property().decode('utf-8') |
|
479 results = [entity for entity in results if has_prop_indexed(entity, prop)] |
|
480 |
|
481 def order_compare(a, b): |
|
482 """ Return a negative, zero or positive number depending on whether |
|
483 entity a is considered smaller than, equal to, or larger than b, |
|
484 according to the query's orderings. """ |
|
485 cmped = 0 |
|
486 for o in query.order_list(): |
|
487 prop = o.property().decode('utf-8') |
|
488 |
|
489 if o.direction() is datastore_pb.Query_Order.ASCENDING: |
|
490 selector = min |
|
491 else: |
|
492 selector = max |
|
493 |
|
494 a_val = a[prop] |
|
495 if isinstance(a_val, list): |
|
496 a_val = selector(a_val) |
|
497 |
|
498 b_val = b[prop] |
|
499 if isinstance(b_val, list): |
|
500 b_val = selector(b_val) |
|
501 |
|
502 try: |
|
503 cmped = cmp(a_val, b_val) |
|
504 except TypeError: |
|
505 cmped = NotImplementedError |
|
506 |
|
507 if cmped == NotImplementedError: |
|
508 cmped = cmp(type(a_val), type(b_val)) |
|
509 |
|
510 if o.direction() is datastore_pb.Query_Order.DESCENDING: |
|
511 cmped = -cmped |
|
512 |
|
513 if cmped != 0: |
|
514 return cmped |
|
515 if cmped == 0: |
|
516 return cmp(a.key(), b.key()) |
|
517 |
|
518 results.sort(order_compare) |
|
519 |
|
520 offset = 0 |
|
521 limit = len(results) |
|
522 if query.has_offset(): |
|
523 offset = query.offset() |
|
524 if query.has_limit(): |
|
525 limit = query.limit() |
|
526 results = results[offset:limit + offset] |
|
527 |
|
528 clone = datastore_pb.Query() |
|
529 clone.CopyFrom(query) |
|
530 clone.clear_hint() |
|
531 if clone in self.__query_history: |
|
532 self.__query_history[clone] += 1 |
|
533 else: |
|
534 self.__query_history[clone] = 1 |
|
535 self.__WriteHistory() |
|
536 |
|
537 results = [e._ToPb() for e in results] |
|
538 self.__cursor_lock.acquire() |
|
539 cursor = self.__next_cursor |
|
540 self.__next_cursor += 1 |
|
541 self.__cursor_lock.release() |
|
542 self.__queries[cursor] = (results, len(results)) |
|
543 |
|
544 query_result.mutable_cursor().set_cursor(cursor) |
|
545 query_result.set_more_results(len(results) > 0) |
|
546 |
|
547 |
|
548 def _Dynamic_Next(self, next_request, query_result): |
|
549 cursor = next_request.cursor().cursor() |
|
550 |
|
551 try: |
|
552 results, orig_count = self.__queries[cursor] |
|
553 except KeyError: |
|
554 raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST, |
|
555 'Cursor %d not found' % cursor) |
|
556 |
|
557 count = next_request.count() |
|
558 for r in results[:count]: |
|
559 query_result.add_result().CopyFrom(r) |
|
560 del results[:count] |
|
561 |
|
562 query_result.set_more_results(len(results) > 0) |
|
563 |
|
564 |
|
565 def _Dynamic_Count(self, query, integer64proto): |
|
566 query_result = datastore_pb.QueryResult() |
|
567 self._Dynamic_RunQuery(query, query_result) |
|
568 cursor = query_result.cursor().cursor() |
|
569 results, count = self.__queries[cursor] |
|
570 integer64proto.set_value(count) |
|
571 del self.__queries[cursor] |
|
572 |
|
573 |
|
574 def _Dynamic_BeginTransaction(self, request, transaction): |
|
575 self.__tx_handle_lock.acquire() |
|
576 handle = self.__next_tx_handle |
|
577 self.__next_tx_handle += 1 |
|
578 self.__tx_handle_lock.release() |
|
579 |
|
580 self.__transactions[handle] = None |
|
581 transaction.set_handle(handle) |
|
582 |
|
583 self.__tx_lock.acquire() |
|
584 snapshot = [(app_kind, dict(entities)) |
|
585 for app_kind, entities in self.__entities.items()] |
|
586 self.__tx_snapshot = dict(snapshot) |
|
587 |
|
588 def _Dynamic_Commit(self, transaction, transaction_response): |
|
589 if not self.__transactions.has_key(transaction.handle()): |
|
590 raise apiproxy_errors.ApplicationError( |
|
591 datastore_pb.Error.BAD_REQUEST, |
|
592 'Transaction handle %d not found' % transaction.handle()) |
|
593 |
|
594 self.__tx_snapshot = {} |
|
595 try: |
|
596 self.__WriteDatastore() |
|
597 finally: |
|
598 self.__tx_lock.release() |
|
599 |
|
600 def _Dynamic_Rollback(self, transaction, transaction_response): |
|
601 if not self.__transactions.has_key(transaction.handle()): |
|
602 raise apiproxy_errors.ApplicationError( |
|
603 datastore_pb.Error.BAD_REQUEST, |
|
604 'Transaction handle %d not found' % transaction.handle()) |
|
605 |
|
606 self.__entities = self.__tx_snapshot |
|
607 self.__tx_snapshot = {} |
|
608 self.__tx_lock.release() |
|
609 |
|
610 def _Dynamic_GetSchema(self, app_str, schema): |
|
611 minint = -sys.maxint - 1 |
|
612 |
|
613 app_str = self.ResolveAppId(app_str.value()) |
|
614 |
|
615 kinds = [] |
|
616 |
|
617 for app, kind in self.__entities: |
|
618 if app == app_str: |
|
619 kind_pb = entity_pb.EntityProto() |
|
620 kind_pb.mutable_key().set_app('') |
|
621 kind_pb.mutable_key().mutable_path().add_element().set_type(kind) |
|
622 kind_pb.mutable_entity_group() |
|
623 kinds.append(kind_pb) |
|
624 |
|
625 props = {} |
|
626 |
|
627 for entity in self.__entities[(app, kind)].values(): |
|
628 for prop in entity.property_list(): |
|
629 if prop.name() not in props: |
|
630 props[prop.name()] = entity_pb.PropertyValue() |
|
631 props[prop.name()].MergeFrom(prop.value()) |
|
632 |
|
633 for value_pb in props.values(): |
|
634 if value_pb.has_int64value(): |
|
635 value_pb.set_int64value(minint) |
|
636 if value_pb.has_booleanvalue(): |
|
637 value_pb.set_booleanvalue(False) |
|
638 if value_pb.has_stringvalue(): |
|
639 value_pb.set_stringvalue('') |
|
640 if value_pb.has_doublevalue(): |
|
641 value_pb.set_doublevalue(float('-inf')) |
|
642 if value_pb.has_pointvalue(): |
|
643 value_pb.mutable_pointvalue().set_x(float('-inf')) |
|
644 value_pb.mutable_pointvalue().set_y(float('-inf')) |
|
645 if value_pb.has_uservalue(): |
|
646 value_pb.mutable_uservalue().set_gaiaid(minint) |
|
647 value_pb.mutable_uservalue().set_email('') |
|
648 value_pb.mutable_uservalue().set_auth_domain('') |
|
649 value_pb.mutable_uservalue().clear_nickname() |
|
650 elif value_pb.has_referencevalue(): |
|
651 value_pb.clear_referencevalue() |
|
652 value_pb.mutable_referencevalue().set_app('') |
|
653 |
|
654 for name, value_pb in props.items(): |
|
655 prop_pb = kind_pb.add_property() |
|
656 prop_pb.set_name(name) |
|
657 prop_pb.mutable_value().CopyFrom(value_pb) |
|
658 |
|
659 schema.kind_list().extend(kinds) |
|
660 |
|
661 def _Dynamic_CreateIndex(self, index, id_response): |
|
662 if index.id() != 0: |
|
663 raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST, |
|
664 'New index id must be 0.') |
|
665 elif self.__FindIndex(index): |
|
666 raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST, |
|
667 'Index already exists.') |
|
668 |
|
669 self.__index_id_lock.acquire() |
|
670 index.set_id(self.__next_index_id) |
|
671 id_response.set_value(self.__next_index_id) |
|
672 self.__next_index_id += 1 |
|
673 self.__index_id_lock.release() |
|
674 |
|
675 clone = entity_pb.CompositeIndex() |
|
676 clone.CopyFrom(index) |
|
677 app = self.ResolveAppId(index.app_id()) |
|
678 clone.set_app_id(app) |
|
679 |
|
680 self.__indexes_lock.acquire() |
|
681 try: |
|
682 if app not in self.__indexes: |
|
683 self.__indexes[app] = [] |
|
684 self.__indexes[app].append(clone) |
|
685 finally: |
|
686 self.__indexes_lock.release() |
|
687 |
|
688 def _Dynamic_GetIndices(self, app_str, composite_indices): |
|
689 composite_indices.index_list().extend( |
|
690 self.__indexes.get(self.ResolveAppId(app_str.value()), [])) |
|
691 |
|
692 def _Dynamic_UpdateIndex(self, index, void): |
|
693 stored_index = self.__FindIndex(index) |
|
694 if not stored_index: |
|
695 raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST, |
|
696 "Index doesn't exist.") |
|
697 elif index.state() != stored_index.state() + 1: |
|
698 raise apiproxy_errors.ApplicationError( |
|
699 datastore_pb.Error.BAD_REQUEST, |
|
700 "cannot move index state from %s to %s" % |
|
701 (entity_pb.CompositeIndex.State_Name(stored_index.state()), |
|
702 (entity_pb.CompositeIndex.State_Name(index.state())))) |
|
703 |
|
704 self.__indexes_lock.acquire() |
|
705 try: |
|
706 stored_index.set_state(index.state()) |
|
707 finally: |
|
708 self.__indexes_lock.release() |
|
709 |
|
710 def _Dynamic_DeleteIndex(self, index, void): |
|
711 stored_index = self.__FindIndex(index) |
|
712 if not stored_index: |
|
713 raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST, |
|
714 "Index doesn't exist.") |
|
715 |
|
716 app = self.ResolveAppId(index.app_id()) |
|
717 self.__indexes_lock.acquire() |
|
718 try: |
|
719 self.__indexes[app].remove(stored_index) |
|
720 finally: |
|
721 self.__indexes_lock.release() |
|
722 |
|
723 def __FindIndex(self, index): |
|
724 """Finds an existing index by definition. |
|
725 |
|
726 Args: |
|
727 definition: entity_pb.CompositeIndex |
|
728 |
|
729 Returns: |
|
730 entity_pb.CompositeIndex, if it exists; otherwise None |
|
731 """ |
|
732 app = self.ResolveAppId(index.app_id()) |
|
733 |
|
734 if app in self.__indexes: |
|
735 for stored_index in self.__indexes[app]: |
|
736 if index.definition() == stored_index.definition(): |
|
737 return stored_index |
|
738 |
|
739 return None |