thirdparty/google_appengine/google/appengine/ext/remote_api/throttle.py
changeset 2864 2e0b0af889be
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/ext/remote_api/throttle.py	Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,637 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Client-side transfer throttling for use with remote_api_stub.
+
+This module is used to configure rate limiting for programs accessing
+AppEngine services through remote_api.
+
+See the Throttle class for more information.
+
+An example with throttling:
+---
+from google.appengine.ext import db
+from google.appengine.ext.remote_api import remote_api_stub
+from google.appengine.ext.remote_api import throttle
+from myapp import models
+import getpass
+import threading
+
+def auth_func():
+  return (raw_input('Username:'), getpass.getpass('Password:'))
+
+remote_api_stub.ConfigureRemoteDatastore('my-app', '/remote_api', auth_func)
+full_throttle = throttle.DefaultThrottle(multiplier=1.0)
+throttle.ThrottleRemoteDatastore(full_throttle)
+
+# Register any threads that will be using the datastore with the throttler
+full_throttle.Register(threading.currentThread())
+
+# Now you can access the remote datastore just as if your code was running on
+# App Engine, and you don't need to worry about exceeding quota limits!
+
+houses = models.House.all().fetch(100)
+for a_house in houses:
+  a_house.doors += 1
+db.put(houses)
+---
+
+This example limits usage to the default free quota levels.  The multiplier
+kwarg to throttle.DefaultThrottle can be used to scale the throttle levels
+higher or lower.
+
+Throttles can also be constructed directly for more control over the limits
+for different operations.  See the Throttle class and the constants following
+it for details.
+"""
+
+
+import logging
+import threading
+import time
+import urllib2
+import urlparse
+
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.ext.remote_api import remote_api_stub
+from google.appengine.tools import appengine_rpc
+
+logger = logging.getLogger('google.appengine.ext.remote_api.throttle')
+
+MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
+
+
+class Error(Exception):
+  """Base class for errors in this module."""
+
+
+class ThreadNotRegisteredError(Error):
+  """An unregistered thread has accessed the throttled datastore stub."""
+
+
+class UnknownThrottleNameError(Error):
+  """A transfer was added for an unknown throttle name."""
+
+
+def InterruptibleSleep(sleep_time):
+  """Puts thread to sleep, checking this threads exit_flag four times a second.
+
+  Args:
+    sleep_time: Time to sleep.
+  """
+  slept = 0.0
+  epsilon = .0001
+  thread = threading.currentThread()
+  while slept < sleep_time - epsilon:
+    remaining = sleep_time - slept
+    this_sleep_time = min(remaining, 0.25)
+    time.sleep(this_sleep_time)
+    slept += this_sleep_time
+    if hasattr(thread, 'exit_flag') and thread.exit_flag:
+      return
+
+
+class Throttle(object):
+  """A base class for upload rate throttling.
+
+  Transferring large number of entities, too quickly, could trigger
+  quota limits and cause the transfer process to halt.  In order to
+  stay within the application's quota, we throttle the data transfer
+  to a specified limit (across all transfer threads).
+
+  This class tracks a moving average of some aspect of the transfer
+  rate (bandwidth, records per second, http connections per
+  second). It keeps two windows of counts of bytes transferred, on a
+  per-thread basis. One block is the "current" block, and the other is
+  the "prior" block. It will rotate the counts from current to prior
+  when ROTATE_PERIOD has passed.  Thus, the current block will
+  represent from 0 seconds to ROTATE_PERIOD seconds of activity
+  (determined by: time.time() - self.last_rotate).  The prior block
+  will always represent a full ROTATE_PERIOD.
+
+  Sleeping is performed just before a transfer of another block, and is
+  based on the counts transferred *before* the next transfer. It really
+  does not matter how much will be transferred, but only that for all the
+  data transferred SO FAR that we have interspersed enough pauses to
+  ensure the aggregate transfer rate is within the specified limit.
+
+  These counts are maintained on a per-thread basis, so we do not require
+  any interlocks around incrementing the counts. There IS an interlock on
+  the rotation of the counts because we do not want multiple threads to
+  multiply-rotate the counts.
+
+  There are various race conditions in the computation and collection
+  of these counts. We do not require precise values, but simply to
+  keep the overall transfer within the bandwidth limits. If a given
+  pause is a little short, or a little long, then the aggregate delays
+  will be correct.
+  """
+
+  ROTATE_PERIOD = 600
+
+  def __init__(self,
+               get_time=time.time,
+               thread_sleep=InterruptibleSleep,
+               layout=None):
+    self.get_time = get_time
+    self.thread_sleep = thread_sleep
+
+    self.start_time = get_time()
+    self.transferred = {}
+    self.prior_block = {}
+    self.totals = {}
+    self.throttles = {}
+
+    self.last_rotate = {}
+    self.rotate_mutex = {}
+    if layout:
+      self.AddThrottles(layout)
+
+  def AddThrottle(self, name, limit):
+    self.throttles[name] = limit
+    self.transferred[name] = {}
+    self.prior_block[name] = {}
+    self.totals[name] = {}
+    self.last_rotate[name] = self.get_time()
+    self.rotate_mutex[name] = threading.Lock()
+
+  def AddThrottles(self, layout):
+    for key, value in layout.iteritems():
+      self.AddThrottle(key, value)
+
+  def Register(self, thread):
+    """Register this thread with the throttler."""
+    thread_id = id(thread)
+    for throttle_name in self.throttles.iterkeys():
+      self.transferred[throttle_name][thread_id] = 0
+      self.prior_block[throttle_name][thread_id] = 0
+      self.totals[throttle_name][thread_id] = 0
+
+  def VerifyThrottleName(self, throttle_name):
+    if throttle_name not in self.throttles:
+      raise UnknownThrottleNameError('%s is not a registered throttle' %
+                                     throttle_name)
+
+  def AddTransfer(self, throttle_name, token_count):
+    """Add a count to the amount this thread has transferred.
+
+    Each time a thread transfers some data, it should call this method to
+    note the amount sent. The counts may be rotated if sufficient time
+    has passed since the last rotation.
+
+    Args:
+      throttle_name: The name of the throttle to add to.
+      token_count: The number to add to the throttle counter.
+    """
+    self.VerifyThrottleName(throttle_name)
+    transferred = self.transferred[throttle_name]
+    try:
+      transferred[id(threading.currentThread())] += token_count
+    except KeyError:
+      thread = threading.currentThread()
+      raise ThreadNotRegisteredError(
+          'Unregistered thread accessing throttled datastore stub: id = %s\n'
+          'name = %s' % (id(thread), thread.getName()))
+
+    if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time():
+      self._RotateCounts(throttle_name)
+
+  def Sleep(self, throttle_name=None):
+    """Possibly sleep in order to limit the transfer rate.
+
+    Note that we sleep based on *prior* transfers rather than what we
+    may be about to transfer. The next transfer could put us under/over
+    and that will be rectified *after* that transfer. Net result is that
+    the average transfer rate will remain within bounds. Spiky behavior
+    or uneven rates among the threads could possibly bring the transfer
+    rate above the requested limit for short durations.
+
+    Args:
+      throttle_name: The name of the throttle to sleep on.  If None or
+        omitted, then sleep on all throttles.
+    """
+    if throttle_name is None:
+      for throttle_name in self.throttles:
+        self.Sleep(throttle_name=throttle_name)
+      return
+
+    self.VerifyThrottleName(throttle_name)
+
+    thread = threading.currentThread()
+
+    while True:
+      duration = self.get_time() - self.last_rotate[throttle_name]
+
+      total = 0
+      for count in self.prior_block[throttle_name].values():
+        total += count
+
+      if total:
+        duration += self.ROTATE_PERIOD
+
+      for count in self.transferred[throttle_name].values():
+        total += count
+
+      sleep_time = self._SleepTime(total, self.throttles[throttle_name],
+                                   duration)
+
+      if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
+        break
+
+      logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
+                   '(duration=%.1f ms, total=%d)',
+                   thread.getName(), throttle_name,
+                   sleep_time * 1000, duration * 1000, total)
+      self.thread_sleep(sleep_time)
+      if thread.exit_flag:
+        break
+      self._RotateCounts(throttle_name)
+
+  def _SleepTime(self, total, limit, duration):
+    """Calculate the time to sleep on a throttle.
+
+    Args:
+      total: The total amount transferred.
+      limit: The amount per second that is allowed to be sent.
+      duration: The amount of time taken to send the total.
+
+    Returns:
+      A float for the amount of time to sleep.
+    """
+    if not limit:
+      return 0.0
+    return max(0.0, (total / limit) - duration)
+
+  def _RotateCounts(self, throttle_name):
+    """Rotate the transfer counters.
+
+    If sufficient time has passed, then rotate the counters from active to
+    the prior-block of counts.
+
+    This rotation is interlocked to ensure that multiple threads do not
+    over-rotate the counts.
+
+    Args:
+      throttle_name: The name of the throttle to rotate.
+    """
+    self.VerifyThrottleName(throttle_name)
+    self.rotate_mutex[throttle_name].acquire()
+    try:
+      next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD
+      if next_rotate_time >= self.get_time():
+        return
+
+      for name, count in self.transferred[throttle_name].items():
+
+
+        self.prior_block[throttle_name][name] = count
+        self.transferred[throttle_name][name] = 0
+
+        self.totals[throttle_name][name] += count
+
+      self.last_rotate[throttle_name] = self.get_time()
+
+    finally:
+      self.rotate_mutex[throttle_name].release()
+
+  def TotalTransferred(self, throttle_name):
+    """Return the total transferred, and over what period.
+
+    Args:
+      throttle_name: The name of the throttle to total.
+
+    Returns:
+      A tuple of the total count and running time for the given throttle name.
+    """
+    total = 0
+    for count in self.totals[throttle_name].values():
+      total += count
+    for count in self.transferred[throttle_name].values():
+      total += count
+    return total, self.get_time() - self.start_time
+
+
+BANDWIDTH_UP = 'http-bandwidth-up'
+BANDWIDTH_DOWN = 'http-bandwidth-down'
+REQUESTS = 'http-requests'
+HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
+HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
+HTTPS_REQUESTS = 'https-requests'
+DATASTORE_CALL_COUNT = 'datastore-call-count'
+ENTITIES_FETCHED = 'entities-fetched'
+ENTITIES_MODIFIED = 'entities-modified'
+INDEX_MODIFICATIONS = 'index-modifications'
+
+
+DEFAULT_LIMITS = {
+    BANDWIDTH_UP: 100000,
+    BANDWIDTH_DOWN: 100000,
+    REQUESTS: 15,
+    HTTPS_BANDWIDTH_UP: 100000,
+    HTTPS_BANDWIDTH_DOWN: 100000,
+    HTTPS_REQUESTS: 15,
+    DATASTORE_CALL_COUNT: 120,
+    ENTITIES_FETCHED: 400,
+    ENTITIES_MODIFIED: 400,
+    INDEX_MODIFICATIONS: 1600,
+}
+
+NO_LIMITS = {
+    BANDWIDTH_UP: None,
+    BANDWIDTH_DOWN: None,
+    REQUESTS: None,
+    HTTPS_BANDWIDTH_UP: None,
+    HTTPS_BANDWIDTH_DOWN: None,
+    HTTPS_REQUESTS: None,
+    DATASTORE_CALL_COUNT: None,
+    ENTITIES_FETCHED: None,
+    ENTITIES_MODIFIED: None,
+    INDEX_MODIFICATIONS: None,
+}
+
+
+def DefaultThrottle(multiplier=1.0):
+  """Return a Throttle instance with multiplier * the quota limits."""
+  layout = dict([(name, multiplier * limit)
+                 for (name, limit) in DEFAULT_LIMITS.iteritems()])
+  return Throttle(layout=layout)
+
+
+class ThrottleHandler(urllib2.BaseHandler):
+  """A urllib2 handler for http and https requests that adds to a throttle."""
+
+  def __init__(self, throttle):
+    """Initialize a ThrottleHandler.
+
+    Args:
+      throttle: A Throttle instance to call for bandwidth and http/https request
+        throttling.
+    """
+    self.throttle = throttle
+
+  def AddRequest(self, throttle_name, req):
+    """Add to bandwidth throttle for given request.
+
+    Args:
+      throttle_name: The name of the bandwidth throttle to add to.
+      req: The request whose size will be added to the throttle.
+    """
+    size = 0
+    for key, value in req.headers.iteritems():
+      size += len('%s: %s\n' % (key, value))
+    for key, value in req.unredirected_hdrs.iteritems():
+      size += len('%s: %s\n' % (key, value))
+    (unused_scheme,
+     unused_host_port, url_path,
+     unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
+    size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
+    data = req.get_data()
+    if data:
+      size += len(data)
+    self.throttle.AddTransfer(throttle_name, size)
+
+  def AddResponse(self, throttle_name, res):
+    """Add to bandwidth throttle for given response.
+
+    Args:
+      throttle_name: The name of the bandwidth throttle to add to.
+      res: The response whose size will be added to the throttle.
+    """
+    content = res.read()
+
+    def ReturnContent():
+      return content
+
+    res.read = ReturnContent
+    size = len(content)
+    headers = res.info()
+    for key, value in headers.items():
+      size += len('%s: %s\n' % (key, value))
+    self.throttle.AddTransfer(throttle_name, size)
+
+  def http_request(self, req):
+    """Process an HTTP request.
+
+    If the throttle is over quota, sleep first.  Then add request size to
+    throttle before returning it to be sent.
+
+    Args:
+      req: A urllib2.Request object.
+
+    Returns:
+      The request passed in.
+    """
+    self.throttle.Sleep(BANDWIDTH_UP)
+    self.throttle.Sleep(BANDWIDTH_DOWN)
+    self.AddRequest(BANDWIDTH_UP, req)
+    return req
+
+  def https_request(self, req):
+    """Process an HTTPS request.
+
+    If the throttle is over quota, sleep first.  Then add request size to
+    throttle before returning it to be sent.
+
+    Args:
+      req: A urllib2.Request object.
+
+    Returns:
+      The request passed in.
+    """
+    self.throttle.Sleep(HTTPS_BANDWIDTH_UP)
+    self.throttle.Sleep(HTTPS_BANDWIDTH_DOWN)
+    self.AddRequest(HTTPS_BANDWIDTH_UP, req)
+    return req
+
+  def http_response(self, unused_req, res):
+    """Process an HTTP response.
+
+    The size of the response is added to the bandwidth throttle and the request
+    throttle is incremented by one.
+
+    Args:
+      unused_req: The urllib2 request for this response.
+      res: A urllib2 response object.
+
+    Returns:
+      The response passed in.
+    """
+    self.AddResponse(BANDWIDTH_DOWN, res)
+    self.throttle.AddTransfer(REQUESTS, 1)
+    return res
+
+  def https_response(self, unused_req, res):
+    """Process an HTTPS response.
+
+    The size of the response is added to the bandwidth throttle and the request
+    throttle is incremented by one.
+
+    Args:
+      unused_req: The urllib2 request for this response.
+      res: A urllib2 response object.
+
+    Returns:
+      The response passed in.
+    """
+    self.AddResponse(HTTPS_BANDWIDTH_DOWN, res)
+    self.throttle.AddTransfer(HTTPS_REQUESTS, 1)
+    return res
+
+
+class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer):
+  """Provides a simplified RPC-style interface for HTTP requests.
+
+  This RPC server uses a Throttle to prevent exceeding quotas.
+  """
+
+  def __init__(self, throttle, *args, **kwargs):
+    """Initialize a ThrottledHttpRpcServer.
+
+    Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance.
+
+    Args:
+      throttle: A Throttles instance.
+      args: Positional arguments to pass through to
+        appengine_rpc.HttpRpcServer.__init__
+      kwargs: Keyword arguments to pass through to
+        appengine_rpc.HttpRpcServer.__init__
+    """
+    self.throttle = throttle
+    appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs)
+
+  def _GetOpener(self):
+    """Returns an OpenerDirector that supports cookies and ignores redirects.
+
+    Returns:
+      A urllib2.OpenerDirector object.
+    """
+    opener = appengine_rpc.HttpRpcServer._GetOpener(self)
+    opener.add_handler(ThrottleHandler(self.throttle))
+
+    return opener
+
+
+def ThrottledHttpRpcServerFactory(throttle):
+  """Create a factory to produce ThrottledHttpRpcServer for a given throttle.
+
+  Args:
+    throttle: A Throttle instance to use for the ThrottledHttpRpcServer.
+
+  Returns:
+    A factory to produce a ThrottledHttpRpcServer.
+  """
+
+  def MakeRpcServer(*args, **kwargs):
+    """Factory to produce a ThrottledHttpRpcServer.
+
+    Args:
+      args: Positional args to pass to ThrottledHttpRpcServer.
+      kwargs: Keyword args to pass to ThrottledHttpRpcServer.
+
+    Returns:
+      A ThrottledHttpRpcServer instance.
+    """
+    kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
+    kwargs['save_cookies'] = True
+    rpc_server = ThrottledHttpRpcServer(throttle, *args, **kwargs)
+    return rpc_server
+  return MakeRpcServer
+
+
+class Throttler(object):
+  def PrehookHandler(self, service, call, request, response):
+    handler = getattr(self, '_Prehook_' + call, None)
+    if handler:
+      handler(request, response)
+
+  def PosthookHandler(self, service, call, request, response):
+    handler = getattr(self, '_Posthook_' + call, None)
+    if handler:
+      handler(request, response)
+
+
+def SleepHandler(*throttle_names):
+  def SleepOnThrottles(self, request, response):
+    for throttle_name in throttle_names:
+      self._DatastoreThrottler__throttle.Sleep(throttle_name)
+  return SleepOnThrottles
+
+
+class DatastoreThrottler(Throttler):
+  def __init__(self, throttle):
+    Throttler.__init__(self)
+    self.__throttle = throttle
+
+  def AddCost(self, cost_proto):
+    """Add costs from the Cost protobuf."""
+    self.__throttle.AddTransfer(INDEX_MODIFICATIONS, cost_proto.index_writes())
+    self.__throttle.AddTransfer(ENTITIES_MODIFIED, cost_proto.entity_writes())
+
+
+  _Prehook_Put = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS)
+
+  def _Posthook_Put(self, request, response):
+    self.AddCost(response.cost())
+
+
+  _Prehook_Get = SleepHandler(ENTITIES_FETCHED)
+
+  def _Posthook_Get(self, request, response):
+    self.__throttle.AddTransfer(ENTITIES_FETCHED, response.entity_size())
+
+
+  _Prehook_RunQuery = SleepHandler(ENTITIES_FETCHED)
+
+  def _Posthook_RunQuery(self, request, response):
+    if not response.keys_only():
+      self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size())
+
+
+  _Prehook_Next = SleepHandler(ENTITIES_FETCHED)
+
+  def _Posthook_Next(self, request, response):
+    if not response.keys_only():
+      self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size())
+
+
+  _Prehook_Delete = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS)
+
+  def _Posthook_Delete(self, request, response):
+    self.AddCost(response.cost())
+
+
+  _Prehook_Commit = SleepHandler()
+
+  def _Posthook_Commit(self, request, response):
+    self.AddCost(response.cost())
+
+
+def ThrottleRemoteDatastore(throttle, remote_datastore_stub=None):
+  """Install the given throttle for the remote datastore stub.
+
+  Args:
+    throttle: A Throttle instance to limit datastore access rates
+    remote_datastore_stub: The datstore stub instance to throttle, for
+      testing purposes.
+  """
+  if not remote_datastore_stub:
+    remote_datastore_stub = apiproxy_stub_map.apiproxy.GetStub('datastore_v3')
+  if not isinstance(remote_datastore_stub, remote_api_stub.RemoteDatastoreStub):
+    raise remote_api_stub.ConfigurationError('remote_api is not configured.')
+  throttler = DatastoreThrottler(throttle)
+  remote_datastore_stub._PreHookHandler = throttler.PrehookHandler
+  remote_datastore_stub._PostHookHandler = throttler.PosthookHandler