1163 order.set_direction(direction) |
1164 order.set_direction(direction) |
1164 |
1165 |
1165 return pb |
1166 return pb |
1166 |
1167 |
1167 |
1168 |
|
1169 class MultiQuery(Query): |
|
1170 """Class representing a query which requires multiple datastore queries. |
|
1171 |
|
1172 This class is actually a subclass of datastore.Query as it is intended to act |
|
1173 like a normal Query object (supporting the same interface). |
|
1174 """ |
|
1175 |
|
1176 def __init__(self, bound_queries, orderings): |
|
1177 if len(bound_queries) > MAX_ALLOWABLE_QUERIES: |
|
1178 raise datastore_errors.BadArgumentError( |
|
1179 'Cannot satisfy query -- too many subqueries (max: %d, got %d).' |
|
1180 ' Probable cause: too many IN/!= filters in query.' % |
|
1181 (MAX_ALLOWABLE_QUERIES, len(bound_queries))) |
|
1182 self.__bound_queries = bound_queries |
|
1183 self.__orderings = orderings |
|
1184 |
|
1185 def __str__(self): |
|
1186 res = 'MultiQuery: ' |
|
1187 for query in self.__bound_queries: |
|
1188 res = '%s %s' % (res, str(query)) |
|
1189 return res |
|
1190 |
|
1191 def Get(self, limit, offset=0): |
|
1192 """Get results of the query with a limit on the number of results. |
|
1193 |
|
1194 Args: |
|
1195 limit: maximum number of values to return. |
|
1196 offset: offset requested -- if nonzero, this will override the offset in |
|
1197 the original query |
|
1198 |
|
1199 Returns: |
|
1200 A list of entities with at most "limit" entries (less if the query |
|
1201 completes before reading limit values). |
|
1202 """ |
|
1203 count = 1 |
|
1204 result = [] |
|
1205 |
|
1206 iterator = self.Run() |
|
1207 |
|
1208 try: |
|
1209 for i in xrange(offset): |
|
1210 val = iterator.next() |
|
1211 except StopIteration: |
|
1212 pass |
|
1213 |
|
1214 try: |
|
1215 while count <= limit: |
|
1216 val = iterator.next() |
|
1217 result.append(val) |
|
1218 count += 1 |
|
1219 except StopIteration: |
|
1220 pass |
|
1221 return result |
|
1222 |
|
1223 class SortOrderEntity(object): |
|
1224 """Allow entity comparisons using provided orderings. |
|
1225 |
|
1226 The iterator passed to the constructor is eventually consumed via |
|
1227 calls to GetNext(), which generate new SortOrderEntity s with the |
|
1228 same orderings. |
|
1229 """ |
|
1230 |
|
1231 def __init__(self, entity_iterator, orderings): |
|
1232 """Ctor. |
|
1233 |
|
1234 Args: |
|
1235 entity_iterator: an iterator of entities which will be wrapped. |
|
1236 orderings: an iterable of (identifier, order) pairs. order |
|
1237 should be either Query.ASCENDING or Query.DESCENDING. |
|
1238 """ |
|
1239 self.__entity_iterator = entity_iterator |
|
1240 self.__entity = None |
|
1241 self.__min_max_value_cache = {} |
|
1242 try: |
|
1243 self.__entity = entity_iterator.next() |
|
1244 except StopIteration: |
|
1245 pass |
|
1246 else: |
|
1247 self.__orderings = orderings |
|
1248 |
|
1249 def __str__(self): |
|
1250 return str(self.__entity) |
|
1251 |
|
1252 def GetEntity(self): |
|
1253 """Gets the wrapped entity.""" |
|
1254 return self.__entity |
|
1255 |
|
1256 def GetNext(self): |
|
1257 """Wrap and return the next entity. |
|
1258 |
|
1259 The entity is retrieved from the iterator given at construction time. |
|
1260 """ |
|
1261 return MultiQuery.SortOrderEntity(self.__entity_iterator, |
|
1262 self.__orderings) |
|
1263 |
|
1264 def CmpProperties(self, that): |
|
1265 """Compare two entities and return their relative order. |
|
1266 |
|
1267 Compares self to that based on the current sort orderings and the |
|
1268 key orders between them. Returns negative, 0, or positive depending on |
|
1269 whether self is less, equal to, or greater than that. This |
|
1270 comparison returns as if all values were to be placed in ascending order |
|
1271 (highest value last). Only uses the sort orderings to compare (ignores |
|
1272 keys). |
|
1273 |
|
1274 Args: |
|
1275 that: SortOrderEntity |
|
1276 |
|
1277 Returns: |
|
1278 Negative if self < that |
|
1279 Zero if self == that |
|
1280 Positive if self > that |
|
1281 """ |
|
1282 if not self.__entity: |
|
1283 return cmp(self.__entity, that.__entity) |
|
1284 |
|
1285 for (identifier, order) in self.__orderings: |
|
1286 value1 = self.__GetValueForId(self, identifier, order) |
|
1287 value2 = self.__GetValueForId(that, identifier, order) |
|
1288 |
|
1289 result = cmp(value1, value2) |
|
1290 if order == Query.DESCENDING: |
|
1291 result = -result |
|
1292 if result: |
|
1293 return result |
|
1294 return 0 |
|
1295 |
|
1296 def __GetValueForId(self, sort_order_entity, identifier, sort_order): |
|
1297 value = sort_order_entity.__entity[identifier] |
|
1298 entity_key = sort_order_entity.__entity.key() |
|
1299 if (entity_key, identifier) in self.__min_max_value_cache: |
|
1300 value = self.__min_max_value_cache[(entity_key, identifier)] |
|
1301 elif isinstance(value, list): |
|
1302 if sort_order == Query.DESCENDING: |
|
1303 value = min(value) |
|
1304 else: |
|
1305 value = max(value) |
|
1306 self.__min_max_value_cache[(entity_key, identifier)] = value |
|
1307 |
|
1308 return value |
|
1309 |
|
1310 def __cmp__(self, that): |
|
1311 """Compare self to that w.r.t. values defined in the sort order. |
|
1312 |
|
1313 Compare an entity with another, using sort-order first, then the key |
|
1314 order to break ties. This can be used in a heap to have faster min-value |
|
1315 lookup. |
|
1316 |
|
1317 Args: |
|
1318 that: other entity to compare to |
|
1319 Returns: |
|
1320 negative: if self is less than that in sort order |
|
1321 zero: if self is equal to that in sort order |
|
1322 positive: if self is greater than that in sort order |
|
1323 """ |
|
1324 property_compare = self.CmpProperties(that) |
|
1325 if property_compare: |
|
1326 return property_compare |
|
1327 else: |
|
1328 return cmp(self.__entity.key(), that.__entity.key()) |
|
1329 |
|
1330 def Run(self): |
|
1331 """Return an iterable output with all results in order.""" |
|
1332 results = [] |
|
1333 count = 1 |
|
1334 log_level = logging.DEBUG - 1 |
|
1335 for bound_query in self.__bound_queries: |
|
1336 logging.log(log_level, 'Running query #%i' % count) |
|
1337 results.append(bound_query.Run()) |
|
1338 count += 1 |
|
1339 |
|
1340 def IterateResults(results): |
|
1341 """Iterator function to return all results in sorted order. |
|
1342 |
|
1343 Iterate over the array of results, yielding the next element, in |
|
1344 sorted order. This function is destructive (results will be empty |
|
1345 when the operation is complete). |
|
1346 |
|
1347 Args: |
|
1348 results: list of result iterators to merge and iterate through |
|
1349 |
|
1350 Yields: |
|
1351 The next result in sorted order. |
|
1352 """ |
|
1353 result_heap = [] |
|
1354 for result in results: |
|
1355 heap_value = MultiQuery.SortOrderEntity(result, self.__orderings) |
|
1356 if heap_value.GetEntity(): |
|
1357 heapq.heappush(result_heap, heap_value) |
|
1358 |
|
1359 used_keys = set() |
|
1360 |
|
1361 while result_heap: |
|
1362 top_result = heapq.heappop(result_heap) |
|
1363 |
|
1364 results_to_push = [] |
|
1365 if top_result.GetEntity().key() not in used_keys: |
|
1366 yield top_result.GetEntity() |
|
1367 else: |
|
1368 pass |
|
1369 |
|
1370 used_keys.add(top_result.GetEntity().key()) |
|
1371 |
|
1372 results_to_push = [] |
|
1373 while result_heap: |
|
1374 next = heapq.heappop(result_heap) |
|
1375 if cmp(top_result, next): |
|
1376 results_to_push.append(next) |
|
1377 break |
|
1378 else: |
|
1379 results_to_push.append(next.GetNext()) |
|
1380 results_to_push.append(top_result.GetNext()) |
|
1381 |
|
1382 for popped_result in results_to_push: |
|
1383 if popped_result.GetEntity(): |
|
1384 heapq.heappush(result_heap, popped_result) |
|
1385 |
|
1386 return IterateResults(results) |
|
1387 |
|
1388 def Count(self, limit=None): |
|
1389 """Return the number of matched entities for this query. |
|
1390 |
|
1391 Will return the de-duplicated count of results. Will call the more |
|
1392 efficient Get() function if a limit is given. |
|
1393 |
|
1394 Args: |
|
1395 limit: maximum number of entries to count (for any result > limit, return |
|
1396 limit). |
|
1397 Returns: |
|
1398 count of the number of entries returned. |
|
1399 """ |
|
1400 if limit is None: |
|
1401 count = 0 |
|
1402 for i in self.Run(): |
|
1403 count += 1 |
|
1404 return count |
|
1405 else: |
|
1406 return len(self.Get(limit)) |
|
1407 |
|
1408 def __setitem__(self, query_filter, value): |
|
1409 """Add a new filter by setting it on all subqueries. |
|
1410 |
|
1411 If any of the setting operations raise an exception, the ones |
|
1412 that succeeded are undone and the exception is propagated |
|
1413 upward. |
|
1414 |
|
1415 Args: |
|
1416 query_filter: a string of the form "property operand". |
|
1417 value: the value that the given property is compared against. |
|
1418 """ |
|
1419 saved_items = [] |
|
1420 for index, query in enumerate(self.__bound_queries): |
|
1421 saved_items.append(query.get(query_filter, None)) |
|
1422 try: |
|
1423 query[query_filter] = value |
|
1424 except: |
|
1425 for q, old_value in itertools.izip(self.__bound_queries[:index], |
|
1426 saved_items): |
|
1427 if old_value is not None: |
|
1428 q[query_filter] = old_value |
|
1429 else: |
|
1430 del q[query_filter] |
|
1431 raise |
|
1432 |
|
1433 def __delitem__(self, query_filter): |
|
1434 """Delete a filter by deleting it from all subqueries. |
|
1435 |
|
1436 If a KeyError is raised during the attempt, it is ignored, unless |
|
1437 every subquery raised a KeyError. If any other exception is |
|
1438 raised, any deletes will be rolled back. |
|
1439 |
|
1440 Args: |
|
1441 query_filter: the filter to delete. |
|
1442 |
|
1443 Raises: |
|
1444 KeyError: No subquery had an entry containing query_filter. |
|
1445 """ |
|
1446 subquery_count = len(self.__bound_queries) |
|
1447 keyerror_count = 0 |
|
1448 saved_items = [] |
|
1449 for index, query in enumerate(self.__bound_queries): |
|
1450 try: |
|
1451 saved_items.append(query.get(query_filter, None)) |
|
1452 del query[query_filter] |
|
1453 except KeyError: |
|
1454 keyerror_count += 1 |
|
1455 except: |
|
1456 for q, old_value in itertools.izip(self.__bound_queries[:index], |
|
1457 saved_items): |
|
1458 if old_value is not None: |
|
1459 q[query_filter] = old_value |
|
1460 raise |
|
1461 |
|
1462 if keyerror_count == subquery_count: |
|
1463 raise KeyError(query_filter) |
|
1464 |
|
1465 def __iter__(self): |
|
1466 return iter(self.__bound_queries) |
|
1467 |
|
1468 |
1168 class Iterator(object): |
1469 class Iterator(object): |
1169 """An iterator over the results of a datastore query. |
1470 """An iterator over the results of a datastore query. |
1170 |
1471 |
1171 Iterators are used to access the results of a Query. An iterator is |
1472 Iterators are used to access the results of a Query. An iterator is |
1172 obtained by building a Query, then calling Run() on it. |
1473 obtained by building a Query, then calling Run() on it. |