thirdparty/google_appengine/google/appengine/api/datastore.py
changeset 1278 a7766286a7be
parent 828 f5fd65cc3bf3
child 2309 be1b94099f2d
equal deleted inserted replaced
1277:5c931bd3dc1e 1278:a7766286a7be
    29 
    29 
    30 
    30 
    31 
    31 
    32 
    32 
    33 
    33 
       
    34 import heapq
       
    35 import itertools
    34 import logging
    36 import logging
    35 import re
    37 import re
    36 import string
    38 import string
    37 import sys
    39 import sys
    38 import traceback
    40 import traceback
    45 from google.appengine.datastore import datastore_index
    47 from google.appengine.datastore import datastore_index
    46 from google.appengine.datastore import datastore_pb
    48 from google.appengine.datastore import datastore_pb
    47 from google.appengine.runtime import apiproxy_errors
    49 from google.appengine.runtime import apiproxy_errors
    48 from google.appengine.datastore import entity_pb
    50 from google.appengine.datastore import entity_pb
    49 
    51 
    50 TRANSACTION_RETRIES = 3
    52 MAX_ALLOWABLE_QUERIES = 30
       
    53 
       
    54 DEFAULT_TRANSACTION_RETRIES = 3
    51 
    55 
    52 _MAX_INDEXED_PROPERTIES = 5000
    56 _MAX_INDEXED_PROPERTIES = 5000
    53 
    57 
    54 Key = datastore_types.Key
    58 Key = datastore_types.Key
    55 typename = datastore_types.typename
    59 typename = datastore_types.typename
   486 
   490 
   487       sample = values
   491       sample = values
   488       if isinstance(sample, list):
   492       if isinstance(sample, list):
   489         sample = values[0]
   493         sample = values[0]
   490 
   494 
   491       if isinstance(sample, (datastore_types.Blob, datastore_types.Text)):
   495       if isinstance(sample, datastore_types._RAW_PROPERTY_TYPES):
   492         pb.raw_property_list().extend(properties)
   496         pb.raw_property_list().extend(properties)
   493       else:
   497       else:
   494         pb.property_list().extend(properties)
   498         pb.property_list().extend(properties)
   495 
   499 
   496     if pb.property_size() > _MAX_INDEXED_PROPERTIES:
   500     if pb.property_size() > _MAX_INDEXED_PROPERTIES:
  1070 
  1074 
  1071     if isinstance(values, tuple):
  1075     if isinstance(values, tuple):
  1072       values = list(values)
  1076       values = list(values)
  1073     elif not isinstance(values, list):
  1077     elif not isinstance(values, list):
  1074       values = [values]
  1078       values = [values]
  1075     if isinstance(values[0], datastore_types.Blob):
  1079     if isinstance(values[0], datastore_types._RAW_PROPERTY_TYPES):
  1076       raise datastore_errors.BadValueError(
  1080       raise datastore_errors.BadValueError(
  1077         'Filtering on Blob properties is not supported.')
  1081         'Filtering on %s properties is not supported.' % typename(values[0]))
  1078     if isinstance(values[0], datastore_types.Text):
       
  1079       raise datastore_errors.BadValueError(
       
  1080         'Filtering on Text properties is not supported.')
       
  1081 
  1082 
  1082     if operator in self.INEQUALITY_OPERATORS:
  1083     if operator in self.INEQUALITY_OPERATORS:
  1083       if self.__inequality_prop and property != self.__inequality_prop:
  1084       if self.__inequality_prop and property != self.__inequality_prop:
  1084         raise datastore_errors.BadFilterError(
  1085         raise datastore_errors.BadFilterError(
  1085           'Only one property per query may have inequality filters (%s).' %
  1086           'Only one property per query may have inequality filters (%s).' %
  1163       order.set_direction(direction)
  1164       order.set_direction(direction)
  1164 
  1165 
  1165     return pb
  1166     return pb
  1166 
  1167 
  1167 
  1168 
       
  1169 class MultiQuery(Query):
       
  1170   """Class representing a query which requires multiple datastore queries.
       
  1171 
       
  1172   This class is actually a subclass of datastore.Query as it is intended to act
       
  1173   like a normal Query object (supporting the same interface).
       
  1174   """
       
  1175 
       
  1176   def __init__(self, bound_queries, orderings):
       
  1177     if len(bound_queries) > MAX_ALLOWABLE_QUERIES:
       
  1178       raise datastore_errors.BadArgumentError(
       
  1179           'Cannot satisfy query -- too many subqueries (max: %d, got %d).'
       
  1180           ' Probable cause: too many IN/!= filters in query.' %
       
  1181           (MAX_ALLOWABLE_QUERIES, len(bound_queries)))
       
  1182     self.__bound_queries = bound_queries
       
  1183     self.__orderings = orderings
       
  1184 
       
  1185   def __str__(self):
       
  1186     res = 'MultiQuery: '
       
  1187     for query in self.__bound_queries:
       
  1188       res = '%s %s' % (res, str(query))
       
  1189     return res
       
  1190 
       
  1191   def Get(self, limit, offset=0):
       
  1192     """Get results of the query with a limit on the number of results.
       
  1193 
       
  1194     Args:
       
  1195       limit: maximum number of values to return.
       
  1196       offset: offset requested -- if nonzero, this will override the offset in
       
  1197               the original query
       
  1198 
       
  1199     Returns:
       
  1200       A list of entities with at most "limit" entries (less if the query
       
  1201       completes before reading limit values).
       
  1202     """
       
  1203     count = 1
       
  1204     result = []
       
  1205 
       
  1206     iterator = self.Run()
       
  1207 
       
  1208     try:
       
  1209       for i in xrange(offset):
       
  1210         val = iterator.next()
       
  1211     except StopIteration:
       
  1212       pass
       
  1213 
       
  1214     try:
       
  1215       while count <= limit:
       
  1216         val = iterator.next()
       
  1217         result.append(val)
       
  1218         count += 1
       
  1219     except StopIteration:
       
  1220       pass
       
  1221     return result
       
  1222 
       
  1223   class SortOrderEntity(object):
       
  1224     """Allow entity comparisons using provided orderings.
       
  1225 
       
  1226     The iterator passed to the constructor is eventually consumed via
       
  1227     calls to GetNext(), which generate new SortOrderEntity s with the
       
  1228     same orderings.
       
  1229     """
       
  1230 
       
  1231     def __init__(self, entity_iterator, orderings):
       
  1232       """Ctor.
       
  1233 
       
  1234       Args:
       
  1235         entity_iterator: an iterator of entities which will be wrapped.
       
  1236         orderings: an iterable of (identifier, order) pairs. order
       
  1237           should be either Query.ASCENDING or Query.DESCENDING.
       
  1238       """
       
  1239       self.__entity_iterator = entity_iterator
       
  1240       self.__entity = None
       
  1241       self.__min_max_value_cache = {}
       
  1242       try:
       
  1243         self.__entity = entity_iterator.next()
       
  1244       except StopIteration:
       
  1245         pass
       
  1246       else:
       
  1247         self.__orderings = orderings
       
  1248 
       
  1249     def __str__(self):
       
  1250       return str(self.__entity)
       
  1251 
       
  1252     def GetEntity(self):
       
  1253       """Gets the wrapped entity."""
       
  1254       return self.__entity
       
  1255 
       
  1256     def GetNext(self):
       
  1257       """Wrap and return the next entity.
       
  1258 
       
  1259       The entity is retrieved from the iterator given at construction time.
       
  1260       """
       
  1261       return MultiQuery.SortOrderEntity(self.__entity_iterator,
       
  1262                                         self.__orderings)
       
  1263 
       
  1264     def CmpProperties(self, that):
       
  1265       """Compare two entities and return their relative order.
       
  1266 
       
  1267       Compares self to that based on the current sort orderings and the
       
  1268       key orders between them. Returns negative, 0, or positive depending on
       
  1269       whether self is less, equal to, or greater than that. This
       
  1270       comparison returns as if all values were to be placed in ascending order
       
  1271       (highest value last).  Only uses the sort orderings to compare (ignores
       
  1272        keys).
       
  1273 
       
  1274       Args:
       
  1275         that: SortOrderEntity
       
  1276 
       
  1277       Returns:
       
  1278         Negative if self < that
       
  1279         Zero if self == that
       
  1280         Positive if self > that
       
  1281       """
       
  1282       if not self.__entity:
       
  1283         return cmp(self.__entity, that.__entity)
       
  1284 
       
  1285       for (identifier, order) in self.__orderings:
       
  1286         value1 = self.__GetValueForId(self, identifier, order)
       
  1287         value2 = self.__GetValueForId(that, identifier, order)
       
  1288 
       
  1289         result = cmp(value1, value2)
       
  1290         if order == Query.DESCENDING:
       
  1291           result = -result
       
  1292         if result:
       
  1293           return result
       
  1294       return 0
       
  1295 
       
  1296     def __GetValueForId(self, sort_order_entity, identifier, sort_order):
       
  1297       value = sort_order_entity.__entity[identifier]
       
  1298       entity_key = sort_order_entity.__entity.key()
       
  1299       if (entity_key, identifier) in self.__min_max_value_cache:
       
  1300         value = self.__min_max_value_cache[(entity_key, identifier)]
       
  1301       elif isinstance(value, list):
       
  1302         if sort_order == Query.DESCENDING:
       
  1303           value = min(value)
       
  1304         else:
       
  1305           value = max(value)
       
  1306         self.__min_max_value_cache[(entity_key, identifier)] = value
       
  1307 
       
  1308       return value
       
  1309 
       
  1310     def __cmp__(self, that):
       
  1311       """Compare self to that w.r.t. values defined in the sort order.
       
  1312 
       
  1313       Compare an entity with another, using sort-order first, then the key
       
  1314       order to break ties. This can be used in a heap to have faster min-value
       
  1315       lookup.
       
  1316 
       
  1317       Args:
       
  1318         that: other entity to compare to
       
  1319       Returns:
       
  1320         negative: if self is less than that in sort order
       
  1321         zero: if self is equal to that in sort order
       
  1322         positive: if self is greater than that in sort order
       
  1323       """
       
  1324       property_compare = self.CmpProperties(that)
       
  1325       if property_compare:
       
  1326         return property_compare
       
  1327       else:
       
  1328         return cmp(self.__entity.key(), that.__entity.key())
       
  1329 
       
  1330   def Run(self):
       
  1331     """Return an iterable output with all results in order."""
       
  1332     results = []
       
  1333     count = 1
       
  1334     log_level = logging.DEBUG - 1
       
  1335     for bound_query in self.__bound_queries:
       
  1336       logging.log(log_level, 'Running query #%i' % count)
       
  1337       results.append(bound_query.Run())
       
  1338       count += 1
       
  1339 
       
  1340     def IterateResults(results):
       
  1341       """Iterator function to return all results in sorted order.
       
  1342 
       
  1343       Iterate over the array of results, yielding the next element, in
       
  1344       sorted order. This function is destructive (results will be empty
       
  1345       when the operation is complete).
       
  1346 
       
  1347       Args:
       
  1348         results: list of result iterators to merge and iterate through
       
  1349 
       
  1350       Yields:
       
  1351         The next result in sorted order.
       
  1352       """
       
  1353       result_heap = []
       
  1354       for result in results:
       
  1355         heap_value = MultiQuery.SortOrderEntity(result, self.__orderings)
       
  1356         if heap_value.GetEntity():
       
  1357           heapq.heappush(result_heap, heap_value)
       
  1358 
       
  1359       used_keys = set()
       
  1360 
       
  1361       while result_heap:
       
  1362         top_result = heapq.heappop(result_heap)
       
  1363 
       
  1364         results_to_push = []
       
  1365         if top_result.GetEntity().key() not in used_keys:
       
  1366           yield top_result.GetEntity()
       
  1367         else:
       
  1368           pass
       
  1369 
       
  1370         used_keys.add(top_result.GetEntity().key())
       
  1371 
       
  1372         results_to_push = []
       
  1373         while result_heap:
       
  1374           next = heapq.heappop(result_heap)
       
  1375           if cmp(top_result, next):
       
  1376             results_to_push.append(next)
       
  1377             break
       
  1378           else:
       
  1379             results_to_push.append(next.GetNext())
       
  1380         results_to_push.append(top_result.GetNext())
       
  1381 
       
  1382         for popped_result in results_to_push:
       
  1383           if popped_result.GetEntity():
       
  1384             heapq.heappush(result_heap, popped_result)
       
  1385 
       
  1386     return IterateResults(results)
       
  1387 
       
  1388   def Count(self, limit=None):
       
  1389     """Return the number of matched entities for this query.
       
  1390 
       
  1391     Will return the de-duplicated count of results.  Will call the more
       
  1392     efficient Get() function if a limit is given.
       
  1393 
       
  1394     Args:
       
  1395       limit: maximum number of entries to count (for any result > limit, return
       
  1396       limit).
       
  1397     Returns:
       
  1398       count of the number of entries returned.
       
  1399     """
       
  1400     if limit is None:
       
  1401       count = 0
       
  1402       for i in self.Run():
       
  1403         count += 1
       
  1404       return count
       
  1405     else:
       
  1406       return len(self.Get(limit))
       
  1407 
       
  1408   def __setitem__(self, query_filter, value):
       
  1409     """Add a new filter by setting it on all subqueries.
       
  1410 
       
  1411     If any of the setting operations raise an exception, the ones
       
  1412     that succeeded are undone and the exception is propagated
       
  1413     upward.
       
  1414 
       
  1415     Args:
       
  1416       query_filter: a string of the form "property operand".
       
  1417       value: the value that the given property is compared against.
       
  1418     """
       
  1419     saved_items = []
       
  1420     for index, query in enumerate(self.__bound_queries):
       
  1421       saved_items.append(query.get(query_filter, None))
       
  1422       try:
       
  1423         query[query_filter] = value
       
  1424       except:
       
  1425         for q, old_value in itertools.izip(self.__bound_queries[:index],
       
  1426                                            saved_items):
       
  1427           if old_value is not None:
       
  1428             q[query_filter] = old_value
       
  1429           else:
       
  1430             del q[query_filter]
       
  1431         raise
       
  1432 
       
  1433   def __delitem__(self, query_filter):
       
  1434     """Delete a filter by deleting it from all subqueries.
       
  1435 
       
  1436     If a KeyError is raised during the attempt, it is ignored, unless
       
  1437     every subquery raised a KeyError. If any other exception is
       
  1438     raised, any deletes will be rolled back.
       
  1439 
       
  1440     Args:
       
  1441       query_filter: the filter to delete.
       
  1442 
       
  1443     Raises:
       
  1444       KeyError: No subquery had an entry containing query_filter.
       
  1445     """
       
  1446     subquery_count = len(self.__bound_queries)
       
  1447     keyerror_count = 0
       
  1448     saved_items = []
       
  1449     for index, query in enumerate(self.__bound_queries):
       
  1450       try:
       
  1451         saved_items.append(query.get(query_filter, None))
       
  1452         del query[query_filter]
       
  1453       except KeyError:
       
  1454         keyerror_count += 1
       
  1455       except:
       
  1456         for q, old_value in itertools.izip(self.__bound_queries[:index],
       
  1457                                            saved_items):
       
  1458           if old_value is not None:
       
  1459             q[query_filter] = old_value
       
  1460         raise
       
  1461 
       
  1462     if keyerror_count == subquery_count:
       
  1463       raise KeyError(query_filter)
       
  1464 
       
  1465   def __iter__(self):
       
  1466     return iter(self.__bound_queries)
       
  1467 
       
  1468 
  1168 class Iterator(object):
  1469 class Iterator(object):
  1169   """An iterator over the results of a datastore query.
  1470   """An iterator over the results of a datastore query.
  1170 
  1471 
  1171   Iterators are used to access the results of a Query. An iterator is
  1472   Iterators are used to access the results of a Query. An iterator is
  1172   obtained by building a Query, then calling Run() on it.
  1473   obtained by building a Query, then calling Run() on it.
  1329           already_modified.pop())
  1630           already_modified.pop())
  1330 
  1631 
  1331     self.modified_keys.update(keys)
  1632     self.modified_keys.update(keys)
  1332 
  1633 
  1333 
  1634 
  1334 
       
  1335 def RunInTransaction(function, *args, **kwargs):
  1635 def RunInTransaction(function, *args, **kwargs):
       
  1636   """Runs a function inside a datastore transaction.
       
  1637 
       
  1638      Runs the user-provided function inside transaction, retries default
       
  1639      number of times.
       
  1640 
       
  1641     Args:
       
  1642     # a function to be run inside the transaction
       
  1643     function: callable
       
  1644     # positional arguments to pass to the function
       
  1645     args: variable number of any type
       
  1646 
       
  1647   Returns:
       
  1648     the function's return value, if any
       
  1649 
       
  1650   Raises:
       
  1651     TransactionFailedError, if the transaction could not be committed.
       
  1652   """
       
  1653   return RunInTransactionCustomRetries(
       
  1654       DEFAULT_TRANSACTION_RETRIES, function, *args, **kwargs)
       
  1655 
       
  1656 
       
  1657 def RunInTransactionCustomRetries(retries, function, *args, **kwargs):
  1336   """Runs a function inside a datastore transaction.
  1658   """Runs a function inside a datastore transaction.
  1337 
  1659 
  1338   Runs the user-provided function inside a full-featured, ACID datastore
  1660   Runs the user-provided function inside a full-featured, ACID datastore
  1339   transaction. Every Put, Get, and Delete call in the function is made within
  1661   transaction. Every Put, Get, and Delete call in the function is made within
  1340   the transaction. All entities involved in these calls must belong to the
  1662   the transaction. All entities involved in these calls must belong to the
  1385   - Durable. On commit, all writes are persisted to the datastore.
  1707   - Durable. On commit, all writes are persisted to the datastore.
  1386 
  1708 
  1387   Nested transactions are not supported.
  1709   Nested transactions are not supported.
  1388 
  1710 
  1389   Args:
  1711   Args:
       
  1712     # number of retries
       
  1713     retries: integer
  1390     # a function to be run inside the transaction
  1714     # a function to be run inside the transaction
  1391     function: callable
  1715     function: callable
  1392     # positional arguments to pass to the function
  1716     # positional arguments to pass to the function
  1393     args: variable number of any type
  1717     args: variable number of any type
  1394 
  1718 
  1401 
  1725 
  1402   if _CurrentTransactionKey():
  1726   if _CurrentTransactionKey():
  1403     raise datastore_errors.BadRequestError(
  1727     raise datastore_errors.BadRequestError(
  1404       'Nested transactions are not supported.')
  1728       'Nested transactions are not supported.')
  1405 
  1729 
       
  1730   if retries < 0:
       
  1731     raise datastore_errors.BadRequestError(
       
  1732       'Number of retries should be non-negative number.')
       
  1733 
  1406   tx_key = None
  1734   tx_key = None
  1407 
  1735 
  1408   try:
  1736   try:
  1409     tx_key = _NewTransactionKey()
  1737     tx_key = _NewTransactionKey()
  1410     tx = _Transaction()
  1738     tx = _Transaction()
  1411     _txes[tx_key] = tx
  1739     _txes[tx_key] = tx
  1412 
  1740 
  1413     for i in range(0, TRANSACTION_RETRIES + 1):
  1741     for i in range(0, retries + 1):
  1414       tx.modified_keys.clear()
  1742       tx.modified_keys.clear()
  1415 
  1743 
  1416       try:
  1744       try:
  1417         result = function(*args, **kwargs)
  1745         result = function(*args, **kwargs)
  1418       except:
  1746       except:
  1434         else:
  1762         else:
  1435           raise type, value, trace
  1763           raise type, value, trace
  1436 
  1764 
  1437       if tx.handle:
  1765       if tx.handle:
  1438         try:
  1766         try:
  1439           resp = api_base_pb.VoidProto()
  1767           resp = datastore_pb.CommitResponse()
  1440           apiproxy_stub_map.MakeSyncCall('datastore_v3', 'Commit',
  1768           apiproxy_stub_map.MakeSyncCall('datastore_v3', 'Commit',
  1441                                          tx.handle, resp)
  1769                                          tx.handle, resp)
  1442         except apiproxy_errors.ApplicationError, err:
  1770         except apiproxy_errors.ApplicationError, err:
  1443           if (err.application_error ==
  1771           if (err.application_error ==
  1444               datastore_pb.Error.CONCURRENT_TRANSACTION):
  1772               datastore_pb.Error.CONCURRENT_TRANSACTION):
  1542 
  1870 
  1543 def _FindTransactionFrameInStack():
  1871 def _FindTransactionFrameInStack():
  1544   """Walks the stack to find a RunInTransaction() call.
  1872   """Walks the stack to find a RunInTransaction() call.
  1545 
  1873 
  1546   Returns:
  1874   Returns:
  1547     # this is the RunInTransaction() frame record, if found
  1875     # this is the RunInTransactionCustomRetries() frame record, if found
  1548     frame record or None
  1876     frame record or None
  1549   """
  1877   """
  1550   frame = sys._getframe()
  1878   frame = sys._getframe()
  1551   filename = frame.f_code.co_filename
  1879   filename = frame.f_code.co_filename
  1552 
  1880 
  1553   frame = frame.f_back.f_back
  1881   frame = frame.f_back.f_back
  1554   while frame:
  1882   while frame:
  1555     if (frame.f_code.co_filename == filename and
  1883     if (frame.f_code.co_filename == filename and
  1556         frame.f_code.co_name == 'RunInTransaction'):
  1884         frame.f_code.co_name == 'RunInTransactionCustomRetries'):
  1557       return frame
  1885       return frame
  1558     frame = frame.f_back
  1886     frame = frame.f_back
  1559 
  1887 
  1560   return None
  1888   return None
  1561 
  1889