|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """Client-side transfer throttling for use with remote_api_stub. |
|
19 |
|
20 This module is used to configure rate limiting for programs accessing |
|
21 AppEngine services through remote_api. |
|
22 |
|
23 See the Throttle class for more information. |
|
24 |
|
25 An example with throttling: |
|
26 --- |
|
27 from google.appengine.ext import db |
|
28 from google.appengine.ext.remote_api import remote_api_stub |
|
29 from google.appengine.ext.remote_api import throttle |
|
30 from myapp import models |
|
31 import getpass |
|
32 import threading |
|
33 |
|
34 def auth_func(): |
|
35 return (raw_input('Username:'), getpass.getpass('Password:')) |
|
36 |
|
37 remote_api_stub.ConfigureRemoteDatastore('my-app', '/remote_api', auth_func) |
|
38 full_throttle = throttle.DefaultThrottle(multiplier=1.0) |
|
39 throttle.ThrottleRemoteDatastore(full_throttle) |
|
40 |
|
41 # Register any threads that will be using the datastore with the throttler |
|
42 full_throttle.Register(threading.currentThread()) |
|
43 |
|
44 # Now you can access the remote datastore just as if your code was running on |
|
45 # App Engine, and you don't need to worry about exceeding quota limits! |
|
46 |
|
47 houses = models.House.all().fetch(100) |
|
48 for a_house in houses: |
|
49 a_house.doors += 1 |
|
50 db.put(houses) |
|
51 --- |
|
52 |
|
53 This example limits usage to the default free quota levels. The multiplier |
|
54 kwarg to throttle.DefaultThrottle can be used to scale the throttle levels |
|
55 higher or lower. |
|
56 |
|
57 Throttles can also be constructed directly for more control over the limits |
|
58 for different operations. See the Throttle class and the constants following |
|
59 it for details. |
|
60 """ |
|
61 |
|
62 |
|
63 import logging |
|
64 import threading |
|
65 import time |
|
66 import urllib2 |
|
67 import urlparse |
|
68 |
|
69 from google.appengine.api import apiproxy_stub_map |
|
70 from google.appengine.ext.remote_api import remote_api_stub |
|
71 from google.appengine.tools import appengine_rpc |
|
72 |
|
73 logger = logging.getLogger('google.appengine.ext.remote_api.throttle') |
|
74 |
|
75 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001 |
|
76 |
|
77 |
|
78 class Error(Exception): |
|
79 """Base class for errors in this module.""" |
|
80 |
|
81 |
|
82 class ThreadNotRegisteredError(Error): |
|
83 """An unregistered thread has accessed the throttled datastore stub.""" |
|
84 |
|
85 |
|
86 class UnknownThrottleNameError(Error): |
|
87 """A transfer was added for an unknown throttle name.""" |
|
88 |
|
89 |
|
90 def InterruptibleSleep(sleep_time): |
|
91 """Puts thread to sleep, checking this threads exit_flag four times a second. |
|
92 |
|
93 Args: |
|
94 sleep_time: Time to sleep. |
|
95 """ |
|
96 slept = 0.0 |
|
97 epsilon = .0001 |
|
98 thread = threading.currentThread() |
|
99 while slept < sleep_time - epsilon: |
|
100 remaining = sleep_time - slept |
|
101 this_sleep_time = min(remaining, 0.25) |
|
102 time.sleep(this_sleep_time) |
|
103 slept += this_sleep_time |
|
104 if hasattr(thread, 'exit_flag') and thread.exit_flag: |
|
105 return |
|
106 |
|
107 |
|
108 class Throttle(object): |
|
109 """A base class for upload rate throttling. |
|
110 |
|
111 Transferring large number of entities, too quickly, could trigger |
|
112 quota limits and cause the transfer process to halt. In order to |
|
113 stay within the application's quota, we throttle the data transfer |
|
114 to a specified limit (across all transfer threads). |
|
115 |
|
116 This class tracks a moving average of some aspect of the transfer |
|
117 rate (bandwidth, records per second, http connections per |
|
118 second). It keeps two windows of counts of bytes transferred, on a |
|
119 per-thread basis. One block is the "current" block, and the other is |
|
120 the "prior" block. It will rotate the counts from current to prior |
|
121 when ROTATE_PERIOD has passed. Thus, the current block will |
|
122 represent from 0 seconds to ROTATE_PERIOD seconds of activity |
|
123 (determined by: time.time() - self.last_rotate). The prior block |
|
124 will always represent a full ROTATE_PERIOD. |
|
125 |
|
126 Sleeping is performed just before a transfer of another block, and is |
|
127 based on the counts transferred *before* the next transfer. It really |
|
128 does not matter how much will be transferred, but only that for all the |
|
129 data transferred SO FAR that we have interspersed enough pauses to |
|
130 ensure the aggregate transfer rate is within the specified limit. |
|
131 |
|
132 These counts are maintained on a per-thread basis, so we do not require |
|
133 any interlocks around incrementing the counts. There IS an interlock on |
|
134 the rotation of the counts because we do not want multiple threads to |
|
135 multiply-rotate the counts. |
|
136 |
|
137 There are various race conditions in the computation and collection |
|
138 of these counts. We do not require precise values, but simply to |
|
139 keep the overall transfer within the bandwidth limits. If a given |
|
140 pause is a little short, or a little long, then the aggregate delays |
|
141 will be correct. |
|
142 """ |
|
143 |
|
144 ROTATE_PERIOD = 600 |
|
145 |
|
146 def __init__(self, |
|
147 get_time=time.time, |
|
148 thread_sleep=InterruptibleSleep, |
|
149 layout=None): |
|
150 self.get_time = get_time |
|
151 self.thread_sleep = thread_sleep |
|
152 |
|
153 self.start_time = get_time() |
|
154 self.transferred = {} |
|
155 self.prior_block = {} |
|
156 self.totals = {} |
|
157 self.throttles = {} |
|
158 |
|
159 self.last_rotate = {} |
|
160 self.rotate_mutex = {} |
|
161 if layout: |
|
162 self.AddThrottles(layout) |
|
163 |
|
164 def AddThrottle(self, name, limit): |
|
165 self.throttles[name] = limit |
|
166 self.transferred[name] = {} |
|
167 self.prior_block[name] = {} |
|
168 self.totals[name] = {} |
|
169 self.last_rotate[name] = self.get_time() |
|
170 self.rotate_mutex[name] = threading.Lock() |
|
171 |
|
172 def AddThrottles(self, layout): |
|
173 for key, value in layout.iteritems(): |
|
174 self.AddThrottle(key, value) |
|
175 |
|
176 def Register(self, thread): |
|
177 """Register this thread with the throttler.""" |
|
178 thread_id = id(thread) |
|
179 for throttle_name in self.throttles.iterkeys(): |
|
180 self.transferred[throttle_name][thread_id] = 0 |
|
181 self.prior_block[throttle_name][thread_id] = 0 |
|
182 self.totals[throttle_name][thread_id] = 0 |
|
183 |
|
184 def VerifyThrottleName(self, throttle_name): |
|
185 if throttle_name not in self.throttles: |
|
186 raise UnknownThrottleNameError('%s is not a registered throttle' % |
|
187 throttle_name) |
|
188 |
|
189 def AddTransfer(self, throttle_name, token_count): |
|
190 """Add a count to the amount this thread has transferred. |
|
191 |
|
192 Each time a thread transfers some data, it should call this method to |
|
193 note the amount sent. The counts may be rotated if sufficient time |
|
194 has passed since the last rotation. |
|
195 |
|
196 Args: |
|
197 throttle_name: The name of the throttle to add to. |
|
198 token_count: The number to add to the throttle counter. |
|
199 """ |
|
200 self.VerifyThrottleName(throttle_name) |
|
201 transferred = self.transferred[throttle_name] |
|
202 try: |
|
203 transferred[id(threading.currentThread())] += token_count |
|
204 except KeyError: |
|
205 thread = threading.currentThread() |
|
206 raise ThreadNotRegisteredError( |
|
207 'Unregistered thread accessing throttled datastore stub: id = %s\n' |
|
208 'name = %s' % (id(thread), thread.getName())) |
|
209 |
|
210 if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time(): |
|
211 self._RotateCounts(throttle_name) |
|
212 |
|
213 def Sleep(self, throttle_name=None): |
|
214 """Possibly sleep in order to limit the transfer rate. |
|
215 |
|
216 Note that we sleep based on *prior* transfers rather than what we |
|
217 may be about to transfer. The next transfer could put us under/over |
|
218 and that will be rectified *after* that transfer. Net result is that |
|
219 the average transfer rate will remain within bounds. Spiky behavior |
|
220 or uneven rates among the threads could possibly bring the transfer |
|
221 rate above the requested limit for short durations. |
|
222 |
|
223 Args: |
|
224 throttle_name: The name of the throttle to sleep on. If None or |
|
225 omitted, then sleep on all throttles. |
|
226 """ |
|
227 if throttle_name is None: |
|
228 for throttle_name in self.throttles: |
|
229 self.Sleep(throttle_name=throttle_name) |
|
230 return |
|
231 |
|
232 self.VerifyThrottleName(throttle_name) |
|
233 |
|
234 thread = threading.currentThread() |
|
235 |
|
236 while True: |
|
237 duration = self.get_time() - self.last_rotate[throttle_name] |
|
238 |
|
239 total = 0 |
|
240 for count in self.prior_block[throttle_name].values(): |
|
241 total += count |
|
242 |
|
243 if total: |
|
244 duration += self.ROTATE_PERIOD |
|
245 |
|
246 for count in self.transferred[throttle_name].values(): |
|
247 total += count |
|
248 |
|
249 sleep_time = self._SleepTime(total, self.throttles[throttle_name], |
|
250 duration) |
|
251 |
|
252 if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION: |
|
253 break |
|
254 |
|
255 logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms ' |
|
256 '(duration=%.1f ms, total=%d)', |
|
257 thread.getName(), throttle_name, |
|
258 sleep_time * 1000, duration * 1000, total) |
|
259 self.thread_sleep(sleep_time) |
|
260 if thread.exit_flag: |
|
261 break |
|
262 self._RotateCounts(throttle_name) |
|
263 |
|
264 def _SleepTime(self, total, limit, duration): |
|
265 """Calculate the time to sleep on a throttle. |
|
266 |
|
267 Args: |
|
268 total: The total amount transferred. |
|
269 limit: The amount per second that is allowed to be sent. |
|
270 duration: The amount of time taken to send the total. |
|
271 |
|
272 Returns: |
|
273 A float for the amount of time to sleep. |
|
274 """ |
|
275 if not limit: |
|
276 return 0.0 |
|
277 return max(0.0, (total / limit) - duration) |
|
278 |
|
279 def _RotateCounts(self, throttle_name): |
|
280 """Rotate the transfer counters. |
|
281 |
|
282 If sufficient time has passed, then rotate the counters from active to |
|
283 the prior-block of counts. |
|
284 |
|
285 This rotation is interlocked to ensure that multiple threads do not |
|
286 over-rotate the counts. |
|
287 |
|
288 Args: |
|
289 throttle_name: The name of the throttle to rotate. |
|
290 """ |
|
291 self.VerifyThrottleName(throttle_name) |
|
292 self.rotate_mutex[throttle_name].acquire() |
|
293 try: |
|
294 next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD |
|
295 if next_rotate_time >= self.get_time(): |
|
296 return |
|
297 |
|
298 for name, count in self.transferred[throttle_name].items(): |
|
299 |
|
300 |
|
301 self.prior_block[throttle_name][name] = count |
|
302 self.transferred[throttle_name][name] = 0 |
|
303 |
|
304 self.totals[throttle_name][name] += count |
|
305 |
|
306 self.last_rotate[throttle_name] = self.get_time() |
|
307 |
|
308 finally: |
|
309 self.rotate_mutex[throttle_name].release() |
|
310 |
|
311 def TotalTransferred(self, throttle_name): |
|
312 """Return the total transferred, and over what period. |
|
313 |
|
314 Args: |
|
315 throttle_name: The name of the throttle to total. |
|
316 |
|
317 Returns: |
|
318 A tuple of the total count and running time for the given throttle name. |
|
319 """ |
|
320 total = 0 |
|
321 for count in self.totals[throttle_name].values(): |
|
322 total += count |
|
323 for count in self.transferred[throttle_name].values(): |
|
324 total += count |
|
325 return total, self.get_time() - self.start_time |
|
326 |
|
327 |
|
328 BANDWIDTH_UP = 'http-bandwidth-up' |
|
329 BANDWIDTH_DOWN = 'http-bandwidth-down' |
|
330 REQUESTS = 'http-requests' |
|
331 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up' |
|
332 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down' |
|
333 HTTPS_REQUESTS = 'https-requests' |
|
334 DATASTORE_CALL_COUNT = 'datastore-call-count' |
|
335 ENTITIES_FETCHED = 'entities-fetched' |
|
336 ENTITIES_MODIFIED = 'entities-modified' |
|
337 INDEX_MODIFICATIONS = 'index-modifications' |
|
338 |
|
339 |
|
340 DEFAULT_LIMITS = { |
|
341 BANDWIDTH_UP: 100000, |
|
342 BANDWIDTH_DOWN: 100000, |
|
343 REQUESTS: 15, |
|
344 HTTPS_BANDWIDTH_UP: 100000, |
|
345 HTTPS_BANDWIDTH_DOWN: 100000, |
|
346 HTTPS_REQUESTS: 15, |
|
347 DATASTORE_CALL_COUNT: 120, |
|
348 ENTITIES_FETCHED: 400, |
|
349 ENTITIES_MODIFIED: 400, |
|
350 INDEX_MODIFICATIONS: 1600, |
|
351 } |
|
352 |
|
353 NO_LIMITS = { |
|
354 BANDWIDTH_UP: None, |
|
355 BANDWIDTH_DOWN: None, |
|
356 REQUESTS: None, |
|
357 HTTPS_BANDWIDTH_UP: None, |
|
358 HTTPS_BANDWIDTH_DOWN: None, |
|
359 HTTPS_REQUESTS: None, |
|
360 DATASTORE_CALL_COUNT: None, |
|
361 ENTITIES_FETCHED: None, |
|
362 ENTITIES_MODIFIED: None, |
|
363 INDEX_MODIFICATIONS: None, |
|
364 } |
|
365 |
|
366 |
|
367 def DefaultThrottle(multiplier=1.0): |
|
368 """Return a Throttle instance with multiplier * the quota limits.""" |
|
369 layout = dict([(name, multiplier * limit) |
|
370 for (name, limit) in DEFAULT_LIMITS.iteritems()]) |
|
371 return Throttle(layout=layout) |
|
372 |
|
373 |
|
374 class ThrottleHandler(urllib2.BaseHandler): |
|
375 """A urllib2 handler for http and https requests that adds to a throttle.""" |
|
376 |
|
377 def __init__(self, throttle): |
|
378 """Initialize a ThrottleHandler. |
|
379 |
|
380 Args: |
|
381 throttle: A Throttle instance to call for bandwidth and http/https request |
|
382 throttling. |
|
383 """ |
|
384 self.throttle = throttle |
|
385 |
|
386 def AddRequest(self, throttle_name, req): |
|
387 """Add to bandwidth throttle for given request. |
|
388 |
|
389 Args: |
|
390 throttle_name: The name of the bandwidth throttle to add to. |
|
391 req: The request whose size will be added to the throttle. |
|
392 """ |
|
393 size = 0 |
|
394 for key, value in req.headers.iteritems(): |
|
395 size += len('%s: %s\n' % (key, value)) |
|
396 for key, value in req.unredirected_hdrs.iteritems(): |
|
397 size += len('%s: %s\n' % (key, value)) |
|
398 (unused_scheme, |
|
399 unused_host_port, url_path, |
|
400 unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url()) |
|
401 size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path)) |
|
402 data = req.get_data() |
|
403 if data: |
|
404 size += len(data) |
|
405 self.throttle.AddTransfer(throttle_name, size) |
|
406 |
|
407 def AddResponse(self, throttle_name, res): |
|
408 """Add to bandwidth throttle for given response. |
|
409 |
|
410 Args: |
|
411 throttle_name: The name of the bandwidth throttle to add to. |
|
412 res: The response whose size will be added to the throttle. |
|
413 """ |
|
414 content = res.read() |
|
415 |
|
416 def ReturnContent(): |
|
417 return content |
|
418 |
|
419 res.read = ReturnContent |
|
420 size = len(content) |
|
421 headers = res.info() |
|
422 for key, value in headers.items(): |
|
423 size += len('%s: %s\n' % (key, value)) |
|
424 self.throttle.AddTransfer(throttle_name, size) |
|
425 |
|
426 def http_request(self, req): |
|
427 """Process an HTTP request. |
|
428 |
|
429 If the throttle is over quota, sleep first. Then add request size to |
|
430 throttle before returning it to be sent. |
|
431 |
|
432 Args: |
|
433 req: A urllib2.Request object. |
|
434 |
|
435 Returns: |
|
436 The request passed in. |
|
437 """ |
|
438 self.throttle.Sleep(BANDWIDTH_UP) |
|
439 self.throttle.Sleep(BANDWIDTH_DOWN) |
|
440 self.AddRequest(BANDWIDTH_UP, req) |
|
441 return req |
|
442 |
|
443 def https_request(self, req): |
|
444 """Process an HTTPS request. |
|
445 |
|
446 If the throttle is over quota, sleep first. Then add request size to |
|
447 throttle before returning it to be sent. |
|
448 |
|
449 Args: |
|
450 req: A urllib2.Request object. |
|
451 |
|
452 Returns: |
|
453 The request passed in. |
|
454 """ |
|
455 self.throttle.Sleep(HTTPS_BANDWIDTH_UP) |
|
456 self.throttle.Sleep(HTTPS_BANDWIDTH_DOWN) |
|
457 self.AddRequest(HTTPS_BANDWIDTH_UP, req) |
|
458 return req |
|
459 |
|
460 def http_response(self, unused_req, res): |
|
461 """Process an HTTP response. |
|
462 |
|
463 The size of the response is added to the bandwidth throttle and the request |
|
464 throttle is incremented by one. |
|
465 |
|
466 Args: |
|
467 unused_req: The urllib2 request for this response. |
|
468 res: A urllib2 response object. |
|
469 |
|
470 Returns: |
|
471 The response passed in. |
|
472 """ |
|
473 self.AddResponse(BANDWIDTH_DOWN, res) |
|
474 self.throttle.AddTransfer(REQUESTS, 1) |
|
475 return res |
|
476 |
|
477 def https_response(self, unused_req, res): |
|
478 """Process an HTTPS response. |
|
479 |
|
480 The size of the response is added to the bandwidth throttle and the request |
|
481 throttle is incremented by one. |
|
482 |
|
483 Args: |
|
484 unused_req: The urllib2 request for this response. |
|
485 res: A urllib2 response object. |
|
486 |
|
487 Returns: |
|
488 The response passed in. |
|
489 """ |
|
490 self.AddResponse(HTTPS_BANDWIDTH_DOWN, res) |
|
491 self.throttle.AddTransfer(HTTPS_REQUESTS, 1) |
|
492 return res |
|
493 |
|
494 |
|
495 class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer): |
|
496 """Provides a simplified RPC-style interface for HTTP requests. |
|
497 |
|
498 This RPC server uses a Throttle to prevent exceeding quotas. |
|
499 """ |
|
500 |
|
501 def __init__(self, throttle, *args, **kwargs): |
|
502 """Initialize a ThrottledHttpRpcServer. |
|
503 |
|
504 Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance. |
|
505 |
|
506 Args: |
|
507 throttle: A Throttles instance. |
|
508 args: Positional arguments to pass through to |
|
509 appengine_rpc.HttpRpcServer.__init__ |
|
510 kwargs: Keyword arguments to pass through to |
|
511 appengine_rpc.HttpRpcServer.__init__ |
|
512 """ |
|
513 self.throttle = throttle |
|
514 appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs) |
|
515 |
|
516 def _GetOpener(self): |
|
517 """Returns an OpenerDirector that supports cookies and ignores redirects. |
|
518 |
|
519 Returns: |
|
520 A urllib2.OpenerDirector object. |
|
521 """ |
|
522 opener = appengine_rpc.HttpRpcServer._GetOpener(self) |
|
523 opener.add_handler(ThrottleHandler(self.throttle)) |
|
524 |
|
525 return opener |
|
526 |
|
527 |
|
528 def ThrottledHttpRpcServerFactory(throttle): |
|
529 """Create a factory to produce ThrottledHttpRpcServer for a given throttle. |
|
530 |
|
531 Args: |
|
532 throttle: A Throttle instance to use for the ThrottledHttpRpcServer. |
|
533 |
|
534 Returns: |
|
535 A factory to produce a ThrottledHttpRpcServer. |
|
536 """ |
|
537 |
|
538 def MakeRpcServer(*args, **kwargs): |
|
539 """Factory to produce a ThrottledHttpRpcServer. |
|
540 |
|
541 Args: |
|
542 args: Positional args to pass to ThrottledHttpRpcServer. |
|
543 kwargs: Keyword args to pass to ThrottledHttpRpcServer. |
|
544 |
|
545 Returns: |
|
546 A ThrottledHttpRpcServer instance. |
|
547 """ |
|
548 kwargs['account_type'] = 'HOSTED_OR_GOOGLE' |
|
549 kwargs['save_cookies'] = True |
|
550 rpc_server = ThrottledHttpRpcServer(throttle, *args, **kwargs) |
|
551 return rpc_server |
|
552 return MakeRpcServer |
|
553 |
|
554 |
|
555 class Throttler(object): |
|
556 def PrehookHandler(self, service, call, request, response): |
|
557 handler = getattr(self, '_Prehook_' + call, None) |
|
558 if handler: |
|
559 handler(request, response) |
|
560 |
|
561 def PosthookHandler(self, service, call, request, response): |
|
562 handler = getattr(self, '_Posthook_' + call, None) |
|
563 if handler: |
|
564 handler(request, response) |
|
565 |
|
566 |
|
567 def SleepHandler(*throttle_names): |
|
568 def SleepOnThrottles(self, request, response): |
|
569 for throttle_name in throttle_names: |
|
570 self._DatastoreThrottler__throttle.Sleep(throttle_name) |
|
571 return SleepOnThrottles |
|
572 |
|
573 |
|
574 class DatastoreThrottler(Throttler): |
|
575 def __init__(self, throttle): |
|
576 Throttler.__init__(self) |
|
577 self.__throttle = throttle |
|
578 |
|
579 def AddCost(self, cost_proto): |
|
580 """Add costs from the Cost protobuf.""" |
|
581 self.__throttle.AddTransfer(INDEX_MODIFICATIONS, cost_proto.index_writes()) |
|
582 self.__throttle.AddTransfer(ENTITIES_MODIFIED, cost_proto.entity_writes()) |
|
583 |
|
584 |
|
585 _Prehook_Put = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS) |
|
586 |
|
587 def _Posthook_Put(self, request, response): |
|
588 self.AddCost(response.cost()) |
|
589 |
|
590 |
|
591 _Prehook_Get = SleepHandler(ENTITIES_FETCHED) |
|
592 |
|
593 def _Posthook_Get(self, request, response): |
|
594 self.__throttle.AddTransfer(ENTITIES_FETCHED, response.entity_size()) |
|
595 |
|
596 |
|
597 _Prehook_RunQuery = SleepHandler(ENTITIES_FETCHED) |
|
598 |
|
599 def _Posthook_RunQuery(self, request, response): |
|
600 if not response.keys_only(): |
|
601 self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size()) |
|
602 |
|
603 |
|
604 _Prehook_Next = SleepHandler(ENTITIES_FETCHED) |
|
605 |
|
606 def _Posthook_Next(self, request, response): |
|
607 if not response.keys_only(): |
|
608 self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size()) |
|
609 |
|
610 |
|
611 _Prehook_Delete = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS) |
|
612 |
|
613 def _Posthook_Delete(self, request, response): |
|
614 self.AddCost(response.cost()) |
|
615 |
|
616 |
|
617 _Prehook_Commit = SleepHandler() |
|
618 |
|
619 def _Posthook_Commit(self, request, response): |
|
620 self.AddCost(response.cost()) |
|
621 |
|
622 |
|
623 def ThrottleRemoteDatastore(throttle, remote_datastore_stub=None): |
|
624 """Install the given throttle for the remote datastore stub. |
|
625 |
|
626 Args: |
|
627 throttle: A Throttle instance to limit datastore access rates |
|
628 remote_datastore_stub: The datstore stub instance to throttle, for |
|
629 testing purposes. |
|
630 """ |
|
631 if not remote_datastore_stub: |
|
632 remote_datastore_stub = apiproxy_stub_map.apiproxy.GetStub('datastore_v3') |
|
633 if not isinstance(remote_datastore_stub, remote_api_stub.RemoteDatastoreStub): |
|
634 raise remote_api_stub.ConfigurationError('remote_api is not configured.') |
|
635 throttler = DatastoreThrottler(throttle) |
|
636 remote_datastore_stub._PreHookHandler = throttler.PrehookHandler |
|
637 remote_datastore_stub._PostHookHandler = throttler.PosthookHandler |