thirdparty/google_appengine/google/appengine/ext/remote_api/throttle.py
changeset 2864 2e0b0af889be
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """Client-side transfer throttling for use with remote_api_stub.
       
    19 
       
    20 This module is used to configure rate limiting for programs accessing
       
    21 AppEngine services through remote_api.
       
    22 
       
    23 See the Throttle class for more information.
       
    24 
       
    25 An example with throttling:
       
    26 ---
       
    27 from google.appengine.ext import db
       
    28 from google.appengine.ext.remote_api import remote_api_stub
       
    29 from google.appengine.ext.remote_api import throttle
       
    30 from myapp import models
       
    31 import getpass
       
    32 import threading
       
    33 
       
    34 def auth_func():
       
    35   return (raw_input('Username:'), getpass.getpass('Password:'))
       
    36 
       
    37 remote_api_stub.ConfigureRemoteDatastore('my-app', '/remote_api', auth_func)
       
    38 full_throttle = throttle.DefaultThrottle(multiplier=1.0)
       
    39 throttle.ThrottleRemoteDatastore(full_throttle)
       
    40 
       
    41 # Register any threads that will be using the datastore with the throttler
       
    42 full_throttle.Register(threading.currentThread())
       
    43 
       
    44 # Now you can access the remote datastore just as if your code was running on
       
    45 # App Engine, and you don't need to worry about exceeding quota limits!
       
    46 
       
    47 houses = models.House.all().fetch(100)
       
    48 for a_house in houses:
       
    49   a_house.doors += 1
       
    50 db.put(houses)
       
    51 ---
       
    52 
       
    53 This example limits usage to the default free quota levels.  The multiplier
       
    54 kwarg to throttle.DefaultThrottle can be used to scale the throttle levels
       
    55 higher or lower.
       
    56 
       
    57 Throttles can also be constructed directly for more control over the limits
       
    58 for different operations.  See the Throttle class and the constants following
       
    59 it for details.
       
    60 """
       
    61 
       
    62 
       
    63 import logging
       
    64 import threading
       
    65 import time
       
    66 import urllib2
       
    67 import urlparse
       
    68 
       
    69 from google.appengine.api import apiproxy_stub_map
       
    70 from google.appengine.ext.remote_api import remote_api_stub
       
    71 from google.appengine.tools import appengine_rpc
       
    72 
       
    73 logger = logging.getLogger('google.appengine.ext.remote_api.throttle')
       
    74 
       
    75 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
       
    76 
       
    77 
       
    78 class Error(Exception):
       
    79   """Base class for errors in this module."""
       
    80 
       
    81 
       
    82 class ThreadNotRegisteredError(Error):
       
    83   """An unregistered thread has accessed the throttled datastore stub."""
       
    84 
       
    85 
       
    86 class UnknownThrottleNameError(Error):
       
    87   """A transfer was added for an unknown throttle name."""
       
    88 
       
    89 
       
    90 def InterruptibleSleep(sleep_time):
       
    91   """Puts thread to sleep, checking this threads exit_flag four times a second.
       
    92 
       
    93   Args:
       
    94     sleep_time: Time to sleep.
       
    95   """
       
    96   slept = 0.0
       
    97   epsilon = .0001
       
    98   thread = threading.currentThread()
       
    99   while slept < sleep_time - epsilon:
       
   100     remaining = sleep_time - slept
       
   101     this_sleep_time = min(remaining, 0.25)
       
   102     time.sleep(this_sleep_time)
       
   103     slept += this_sleep_time
       
   104     if hasattr(thread, 'exit_flag') and thread.exit_flag:
       
   105       return
       
   106 
       
   107 
       
   108 class Throttle(object):
       
   109   """A base class for upload rate throttling.
       
   110 
       
   111   Transferring large number of entities, too quickly, could trigger
       
   112   quota limits and cause the transfer process to halt.  In order to
       
   113   stay within the application's quota, we throttle the data transfer
       
   114   to a specified limit (across all transfer threads).
       
   115 
       
   116   This class tracks a moving average of some aspect of the transfer
       
   117   rate (bandwidth, records per second, http connections per
       
   118   second). It keeps two windows of counts of bytes transferred, on a
       
   119   per-thread basis. One block is the "current" block, and the other is
       
   120   the "prior" block. It will rotate the counts from current to prior
       
   121   when ROTATE_PERIOD has passed.  Thus, the current block will
       
   122   represent from 0 seconds to ROTATE_PERIOD seconds of activity
       
   123   (determined by: time.time() - self.last_rotate).  The prior block
       
   124   will always represent a full ROTATE_PERIOD.
       
   125 
       
   126   Sleeping is performed just before a transfer of another block, and is
       
   127   based on the counts transferred *before* the next transfer. It really
       
   128   does not matter how much will be transferred, but only that for all the
       
   129   data transferred SO FAR that we have interspersed enough pauses to
       
   130   ensure the aggregate transfer rate is within the specified limit.
       
   131 
       
   132   These counts are maintained on a per-thread basis, so we do not require
       
   133   any interlocks around incrementing the counts. There IS an interlock on
       
   134   the rotation of the counts because we do not want multiple threads to
       
   135   multiply-rotate the counts.
       
   136 
       
   137   There are various race conditions in the computation and collection
       
   138   of these counts. We do not require precise values, but simply to
       
   139   keep the overall transfer within the bandwidth limits. If a given
       
   140   pause is a little short, or a little long, then the aggregate delays
       
   141   will be correct.
       
   142   """
       
   143 
       
   144   ROTATE_PERIOD = 600
       
   145 
       
   146   def __init__(self,
       
   147                get_time=time.time,
       
   148                thread_sleep=InterruptibleSleep,
       
   149                layout=None):
       
   150     self.get_time = get_time
       
   151     self.thread_sleep = thread_sleep
       
   152 
       
   153     self.start_time = get_time()
       
   154     self.transferred = {}
       
   155     self.prior_block = {}
       
   156     self.totals = {}
       
   157     self.throttles = {}
       
   158 
       
   159     self.last_rotate = {}
       
   160     self.rotate_mutex = {}
       
   161     if layout:
       
   162       self.AddThrottles(layout)
       
   163 
       
   164   def AddThrottle(self, name, limit):
       
   165     self.throttles[name] = limit
       
   166     self.transferred[name] = {}
       
   167     self.prior_block[name] = {}
       
   168     self.totals[name] = {}
       
   169     self.last_rotate[name] = self.get_time()
       
   170     self.rotate_mutex[name] = threading.Lock()
       
   171 
       
   172   def AddThrottles(self, layout):
       
   173     for key, value in layout.iteritems():
       
   174       self.AddThrottle(key, value)
       
   175 
       
   176   def Register(self, thread):
       
   177     """Register this thread with the throttler."""
       
   178     thread_id = id(thread)
       
   179     for throttle_name in self.throttles.iterkeys():
       
   180       self.transferred[throttle_name][thread_id] = 0
       
   181       self.prior_block[throttle_name][thread_id] = 0
       
   182       self.totals[throttle_name][thread_id] = 0
       
   183 
       
   184   def VerifyThrottleName(self, throttle_name):
       
   185     if throttle_name not in self.throttles:
       
   186       raise UnknownThrottleNameError('%s is not a registered throttle' %
       
   187                                      throttle_name)
       
   188 
       
   189   def AddTransfer(self, throttle_name, token_count):
       
   190     """Add a count to the amount this thread has transferred.
       
   191 
       
   192     Each time a thread transfers some data, it should call this method to
       
   193     note the amount sent. The counts may be rotated if sufficient time
       
   194     has passed since the last rotation.
       
   195 
       
   196     Args:
       
   197       throttle_name: The name of the throttle to add to.
       
   198       token_count: The number to add to the throttle counter.
       
   199     """
       
   200     self.VerifyThrottleName(throttle_name)
       
   201     transferred = self.transferred[throttle_name]
       
   202     try:
       
   203       transferred[id(threading.currentThread())] += token_count
       
   204     except KeyError:
       
   205       thread = threading.currentThread()
       
   206       raise ThreadNotRegisteredError(
       
   207           'Unregistered thread accessing throttled datastore stub: id = %s\n'
       
   208           'name = %s' % (id(thread), thread.getName()))
       
   209 
       
   210     if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time():
       
   211       self._RotateCounts(throttle_name)
       
   212 
       
   213   def Sleep(self, throttle_name=None):
       
   214     """Possibly sleep in order to limit the transfer rate.
       
   215 
       
   216     Note that we sleep based on *prior* transfers rather than what we
       
   217     may be about to transfer. The next transfer could put us under/over
       
   218     and that will be rectified *after* that transfer. Net result is that
       
   219     the average transfer rate will remain within bounds. Spiky behavior
       
   220     or uneven rates among the threads could possibly bring the transfer
       
   221     rate above the requested limit for short durations.
       
   222 
       
   223     Args:
       
   224       throttle_name: The name of the throttle to sleep on.  If None or
       
   225         omitted, then sleep on all throttles.
       
   226     """
       
   227     if throttle_name is None:
       
   228       for throttle_name in self.throttles:
       
   229         self.Sleep(throttle_name=throttle_name)
       
   230       return
       
   231 
       
   232     self.VerifyThrottleName(throttle_name)
       
   233 
       
   234     thread = threading.currentThread()
       
   235 
       
   236     while True:
       
   237       duration = self.get_time() - self.last_rotate[throttle_name]
       
   238 
       
   239       total = 0
       
   240       for count in self.prior_block[throttle_name].values():
       
   241         total += count
       
   242 
       
   243       if total:
       
   244         duration += self.ROTATE_PERIOD
       
   245 
       
   246       for count in self.transferred[throttle_name].values():
       
   247         total += count
       
   248 
       
   249       sleep_time = self._SleepTime(total, self.throttles[throttle_name],
       
   250                                    duration)
       
   251 
       
   252       if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
       
   253         break
       
   254 
       
   255       logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
       
   256                    '(duration=%.1f ms, total=%d)',
       
   257                    thread.getName(), throttle_name,
       
   258                    sleep_time * 1000, duration * 1000, total)
       
   259       self.thread_sleep(sleep_time)
       
   260       if thread.exit_flag:
       
   261         break
       
   262       self._RotateCounts(throttle_name)
       
   263 
       
   264   def _SleepTime(self, total, limit, duration):
       
   265     """Calculate the time to sleep on a throttle.
       
   266 
       
   267     Args:
       
   268       total: The total amount transferred.
       
   269       limit: The amount per second that is allowed to be sent.
       
   270       duration: The amount of time taken to send the total.
       
   271 
       
   272     Returns:
       
   273       A float for the amount of time to sleep.
       
   274     """
       
   275     if not limit:
       
   276       return 0.0
       
   277     return max(0.0, (total / limit) - duration)
       
   278 
       
   279   def _RotateCounts(self, throttle_name):
       
   280     """Rotate the transfer counters.
       
   281 
       
   282     If sufficient time has passed, then rotate the counters from active to
       
   283     the prior-block of counts.
       
   284 
       
   285     This rotation is interlocked to ensure that multiple threads do not
       
   286     over-rotate the counts.
       
   287 
       
   288     Args:
       
   289       throttle_name: The name of the throttle to rotate.
       
   290     """
       
   291     self.VerifyThrottleName(throttle_name)
       
   292     self.rotate_mutex[throttle_name].acquire()
       
   293     try:
       
   294       next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD
       
   295       if next_rotate_time >= self.get_time():
       
   296         return
       
   297 
       
   298       for name, count in self.transferred[throttle_name].items():
       
   299 
       
   300 
       
   301         self.prior_block[throttle_name][name] = count
       
   302         self.transferred[throttle_name][name] = 0
       
   303 
       
   304         self.totals[throttle_name][name] += count
       
   305 
       
   306       self.last_rotate[throttle_name] = self.get_time()
       
   307 
       
   308     finally:
       
   309       self.rotate_mutex[throttle_name].release()
       
   310 
       
   311   def TotalTransferred(self, throttle_name):
       
   312     """Return the total transferred, and over what period.
       
   313 
       
   314     Args:
       
   315       throttle_name: The name of the throttle to total.
       
   316 
       
   317     Returns:
       
   318       A tuple of the total count and running time for the given throttle name.
       
   319     """
       
   320     total = 0
       
   321     for count in self.totals[throttle_name].values():
       
   322       total += count
       
   323     for count in self.transferred[throttle_name].values():
       
   324       total += count
       
   325     return total, self.get_time() - self.start_time
       
   326 
       
   327 
       
   328 BANDWIDTH_UP = 'http-bandwidth-up'
       
   329 BANDWIDTH_DOWN = 'http-bandwidth-down'
       
   330 REQUESTS = 'http-requests'
       
   331 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
       
   332 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
       
   333 HTTPS_REQUESTS = 'https-requests'
       
   334 DATASTORE_CALL_COUNT = 'datastore-call-count'
       
   335 ENTITIES_FETCHED = 'entities-fetched'
       
   336 ENTITIES_MODIFIED = 'entities-modified'
       
   337 INDEX_MODIFICATIONS = 'index-modifications'
       
   338 
       
   339 
       
   340 DEFAULT_LIMITS = {
       
   341     BANDWIDTH_UP: 100000,
       
   342     BANDWIDTH_DOWN: 100000,
       
   343     REQUESTS: 15,
       
   344     HTTPS_BANDWIDTH_UP: 100000,
       
   345     HTTPS_BANDWIDTH_DOWN: 100000,
       
   346     HTTPS_REQUESTS: 15,
       
   347     DATASTORE_CALL_COUNT: 120,
       
   348     ENTITIES_FETCHED: 400,
       
   349     ENTITIES_MODIFIED: 400,
       
   350     INDEX_MODIFICATIONS: 1600,
       
   351 }
       
   352 
       
   353 NO_LIMITS = {
       
   354     BANDWIDTH_UP: None,
       
   355     BANDWIDTH_DOWN: None,
       
   356     REQUESTS: None,
       
   357     HTTPS_BANDWIDTH_UP: None,
       
   358     HTTPS_BANDWIDTH_DOWN: None,
       
   359     HTTPS_REQUESTS: None,
       
   360     DATASTORE_CALL_COUNT: None,
       
   361     ENTITIES_FETCHED: None,
       
   362     ENTITIES_MODIFIED: None,
       
   363     INDEX_MODIFICATIONS: None,
       
   364 }
       
   365 
       
   366 
       
   367 def DefaultThrottle(multiplier=1.0):
       
   368   """Return a Throttle instance with multiplier * the quota limits."""
       
   369   layout = dict([(name, multiplier * limit)
       
   370                  for (name, limit) in DEFAULT_LIMITS.iteritems()])
       
   371   return Throttle(layout=layout)
       
   372 
       
   373 
       
   374 class ThrottleHandler(urllib2.BaseHandler):
       
   375   """A urllib2 handler for http and https requests that adds to a throttle."""
       
   376 
       
   377   def __init__(self, throttle):
       
   378     """Initialize a ThrottleHandler.
       
   379 
       
   380     Args:
       
   381       throttle: A Throttle instance to call for bandwidth and http/https request
       
   382         throttling.
       
   383     """
       
   384     self.throttle = throttle
       
   385 
       
   386   def AddRequest(self, throttle_name, req):
       
   387     """Add to bandwidth throttle for given request.
       
   388 
       
   389     Args:
       
   390       throttle_name: The name of the bandwidth throttle to add to.
       
   391       req: The request whose size will be added to the throttle.
       
   392     """
       
   393     size = 0
       
   394     for key, value in req.headers.iteritems():
       
   395       size += len('%s: %s\n' % (key, value))
       
   396     for key, value in req.unredirected_hdrs.iteritems():
       
   397       size += len('%s: %s\n' % (key, value))
       
   398     (unused_scheme,
       
   399      unused_host_port, url_path,
       
   400      unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
       
   401     size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
       
   402     data = req.get_data()
       
   403     if data:
       
   404       size += len(data)
       
   405     self.throttle.AddTransfer(throttle_name, size)
       
   406 
       
   407   def AddResponse(self, throttle_name, res):
       
   408     """Add to bandwidth throttle for given response.
       
   409 
       
   410     Args:
       
   411       throttle_name: The name of the bandwidth throttle to add to.
       
   412       res: The response whose size will be added to the throttle.
       
   413     """
       
   414     content = res.read()
       
   415 
       
   416     def ReturnContent():
       
   417       return content
       
   418 
       
   419     res.read = ReturnContent
       
   420     size = len(content)
       
   421     headers = res.info()
       
   422     for key, value in headers.items():
       
   423       size += len('%s: %s\n' % (key, value))
       
   424     self.throttle.AddTransfer(throttle_name, size)
       
   425 
       
   426   def http_request(self, req):
       
   427     """Process an HTTP request.
       
   428 
       
   429     If the throttle is over quota, sleep first.  Then add request size to
       
   430     throttle before returning it to be sent.
       
   431 
       
   432     Args:
       
   433       req: A urllib2.Request object.
       
   434 
       
   435     Returns:
       
   436       The request passed in.
       
   437     """
       
   438     self.throttle.Sleep(BANDWIDTH_UP)
       
   439     self.throttle.Sleep(BANDWIDTH_DOWN)
       
   440     self.AddRequest(BANDWIDTH_UP, req)
       
   441     return req
       
   442 
       
   443   def https_request(self, req):
       
   444     """Process an HTTPS request.
       
   445 
       
   446     If the throttle is over quota, sleep first.  Then add request size to
       
   447     throttle before returning it to be sent.
       
   448 
       
   449     Args:
       
   450       req: A urllib2.Request object.
       
   451 
       
   452     Returns:
       
   453       The request passed in.
       
   454     """
       
   455     self.throttle.Sleep(HTTPS_BANDWIDTH_UP)
       
   456     self.throttle.Sleep(HTTPS_BANDWIDTH_DOWN)
       
   457     self.AddRequest(HTTPS_BANDWIDTH_UP, req)
       
   458     return req
       
   459 
       
   460   def http_response(self, unused_req, res):
       
   461     """Process an HTTP response.
       
   462 
       
   463     The size of the response is added to the bandwidth throttle and the request
       
   464     throttle is incremented by one.
       
   465 
       
   466     Args:
       
   467       unused_req: The urllib2 request for this response.
       
   468       res: A urllib2 response object.
       
   469 
       
   470     Returns:
       
   471       The response passed in.
       
   472     """
       
   473     self.AddResponse(BANDWIDTH_DOWN, res)
       
   474     self.throttle.AddTransfer(REQUESTS, 1)
       
   475     return res
       
   476 
       
   477   def https_response(self, unused_req, res):
       
   478     """Process an HTTPS response.
       
   479 
       
   480     The size of the response is added to the bandwidth throttle and the request
       
   481     throttle is incremented by one.
       
   482 
       
   483     Args:
       
   484       unused_req: The urllib2 request for this response.
       
   485       res: A urllib2 response object.
       
   486 
       
   487     Returns:
       
   488       The response passed in.
       
   489     """
       
   490     self.AddResponse(HTTPS_BANDWIDTH_DOWN, res)
       
   491     self.throttle.AddTransfer(HTTPS_REQUESTS, 1)
       
   492     return res
       
   493 
       
   494 
       
   495 class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer):
       
   496   """Provides a simplified RPC-style interface for HTTP requests.
       
   497 
       
   498   This RPC server uses a Throttle to prevent exceeding quotas.
       
   499   """
       
   500 
       
   501   def __init__(self, throttle, *args, **kwargs):
       
   502     """Initialize a ThrottledHttpRpcServer.
       
   503 
       
   504     Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance.
       
   505 
       
   506     Args:
       
   507       throttle: A Throttles instance.
       
   508       args: Positional arguments to pass through to
       
   509         appengine_rpc.HttpRpcServer.__init__
       
   510       kwargs: Keyword arguments to pass through to
       
   511         appengine_rpc.HttpRpcServer.__init__
       
   512     """
       
   513     self.throttle = throttle
       
   514     appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs)
       
   515 
       
   516   def _GetOpener(self):
       
   517     """Returns an OpenerDirector that supports cookies and ignores redirects.
       
   518 
       
   519     Returns:
       
   520       A urllib2.OpenerDirector object.
       
   521     """
       
   522     opener = appengine_rpc.HttpRpcServer._GetOpener(self)
       
   523     opener.add_handler(ThrottleHandler(self.throttle))
       
   524 
       
   525     return opener
       
   526 
       
   527 
       
   528 def ThrottledHttpRpcServerFactory(throttle):
       
   529   """Create a factory to produce ThrottledHttpRpcServer for a given throttle.
       
   530 
       
   531   Args:
       
   532     throttle: A Throttle instance to use for the ThrottledHttpRpcServer.
       
   533 
       
   534   Returns:
       
   535     A factory to produce a ThrottledHttpRpcServer.
       
   536   """
       
   537 
       
   538   def MakeRpcServer(*args, **kwargs):
       
   539     """Factory to produce a ThrottledHttpRpcServer.
       
   540 
       
   541     Args:
       
   542       args: Positional args to pass to ThrottledHttpRpcServer.
       
   543       kwargs: Keyword args to pass to ThrottledHttpRpcServer.
       
   544 
       
   545     Returns:
       
   546       A ThrottledHttpRpcServer instance.
       
   547     """
       
   548     kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
       
   549     kwargs['save_cookies'] = True
       
   550     rpc_server = ThrottledHttpRpcServer(throttle, *args, **kwargs)
       
   551     return rpc_server
       
   552   return MakeRpcServer
       
   553 
       
   554 
       
   555 class Throttler(object):
       
   556   def PrehookHandler(self, service, call, request, response):
       
   557     handler = getattr(self, '_Prehook_' + call, None)
       
   558     if handler:
       
   559       handler(request, response)
       
   560 
       
   561   def PosthookHandler(self, service, call, request, response):
       
   562     handler = getattr(self, '_Posthook_' + call, None)
       
   563     if handler:
       
   564       handler(request, response)
       
   565 
       
   566 
       
   567 def SleepHandler(*throttle_names):
       
   568   def SleepOnThrottles(self, request, response):
       
   569     for throttle_name in throttle_names:
       
   570       self._DatastoreThrottler__throttle.Sleep(throttle_name)
       
   571   return SleepOnThrottles
       
   572 
       
   573 
       
   574 class DatastoreThrottler(Throttler):
       
   575   def __init__(self, throttle):
       
   576     Throttler.__init__(self)
       
   577     self.__throttle = throttle
       
   578 
       
   579   def AddCost(self, cost_proto):
       
   580     """Add costs from the Cost protobuf."""
       
   581     self.__throttle.AddTransfer(INDEX_MODIFICATIONS, cost_proto.index_writes())
       
   582     self.__throttle.AddTransfer(ENTITIES_MODIFIED, cost_proto.entity_writes())
       
   583 
       
   584 
       
   585   _Prehook_Put = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS)
       
   586 
       
   587   def _Posthook_Put(self, request, response):
       
   588     self.AddCost(response.cost())
       
   589 
       
   590 
       
   591   _Prehook_Get = SleepHandler(ENTITIES_FETCHED)
       
   592 
       
   593   def _Posthook_Get(self, request, response):
       
   594     self.__throttle.AddTransfer(ENTITIES_FETCHED, response.entity_size())
       
   595 
       
   596 
       
   597   _Prehook_RunQuery = SleepHandler(ENTITIES_FETCHED)
       
   598 
       
   599   def _Posthook_RunQuery(self, request, response):
       
   600     if not response.keys_only():
       
   601       self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size())
       
   602 
       
   603 
       
   604   _Prehook_Next = SleepHandler(ENTITIES_FETCHED)
       
   605 
       
   606   def _Posthook_Next(self, request, response):
       
   607     if not response.keys_only():
       
   608       self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size())
       
   609 
       
   610 
       
   611   _Prehook_Delete = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS)
       
   612 
       
   613   def _Posthook_Delete(self, request, response):
       
   614     self.AddCost(response.cost())
       
   615 
       
   616 
       
   617   _Prehook_Commit = SleepHandler()
       
   618 
       
   619   def _Posthook_Commit(self, request, response):
       
   620     self.AddCost(response.cost())
       
   621 
       
   622 
       
   623 def ThrottleRemoteDatastore(throttle, remote_datastore_stub=None):
       
   624   """Install the given throttle for the remote datastore stub.
       
   625 
       
   626   Args:
       
   627     throttle: A Throttle instance to limit datastore access rates
       
   628     remote_datastore_stub: The datstore stub instance to throttle, for
       
   629       testing purposes.
       
   630   """
       
   631   if not remote_datastore_stub:
       
   632     remote_datastore_stub = apiproxy_stub_map.apiproxy.GetStub('datastore_v3')
       
   633   if not isinstance(remote_datastore_stub, remote_api_stub.RemoteDatastoreStub):
       
   634     raise remote_api_stub.ConfigurationError('remote_api is not configured.')
       
   635   throttler = DatastoreThrottler(throttle)
       
   636   remote_datastore_stub._PreHookHandler = throttler.PrehookHandler
       
   637   remote_datastore_stub._PostHookHandler = throttler.PosthookHandler