--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/ext/remote_api/throttle.py Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,637 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Client-side transfer throttling for use with remote_api_stub.
+
+This module is used to configure rate limiting for programs accessing
+AppEngine services through remote_api.
+
+See the Throttle class for more information.
+
+An example with throttling:
+---
+from google.appengine.ext import db
+from google.appengine.ext.remote_api import remote_api_stub
+from google.appengine.ext.remote_api import throttle
+from myapp import models
+import getpass
+import threading
+
+def auth_func():
+ return (raw_input('Username:'), getpass.getpass('Password:'))
+
+remote_api_stub.ConfigureRemoteDatastore('my-app', '/remote_api', auth_func)
+full_throttle = throttle.DefaultThrottle(multiplier=1.0)
+throttle.ThrottleRemoteDatastore(full_throttle)
+
+# Register any threads that will be using the datastore with the throttler
+full_throttle.Register(threading.currentThread())
+
+# Now you can access the remote datastore just as if your code was running on
+# App Engine, and you don't need to worry about exceeding quota limits!
+
+houses = models.House.all().fetch(100)
+for a_house in houses:
+ a_house.doors += 1
+db.put(houses)
+---
+
+This example limits usage to the default free quota levels. The multiplier
+kwarg to throttle.DefaultThrottle can be used to scale the throttle levels
+higher or lower.
+
+Throttles can also be constructed directly for more control over the limits
+for different operations. See the Throttle class and the constants following
+it for details.
+"""
+
+
+import logging
+import threading
+import time
+import urllib2
+import urlparse
+
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.ext.remote_api import remote_api_stub
+from google.appengine.tools import appengine_rpc
+
+logger = logging.getLogger('google.appengine.ext.remote_api.throttle')
+
+MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
+
+
+class Error(Exception):
+ """Base class for errors in this module."""
+
+
+class ThreadNotRegisteredError(Error):
+ """An unregistered thread has accessed the throttled datastore stub."""
+
+
+class UnknownThrottleNameError(Error):
+ """A transfer was added for an unknown throttle name."""
+
+
+def InterruptibleSleep(sleep_time):
+ """Puts thread to sleep, checking this threads exit_flag four times a second.
+
+ Args:
+ sleep_time: Time to sleep.
+ """
+ slept = 0.0
+ epsilon = .0001
+ thread = threading.currentThread()
+ while slept < sleep_time - epsilon:
+ remaining = sleep_time - slept
+ this_sleep_time = min(remaining, 0.25)
+ time.sleep(this_sleep_time)
+ slept += this_sleep_time
+ if hasattr(thread, 'exit_flag') and thread.exit_flag:
+ return
+
+
+class Throttle(object):
+ """A base class for upload rate throttling.
+
+ Transferring large number of entities, too quickly, could trigger
+ quota limits and cause the transfer process to halt. In order to
+ stay within the application's quota, we throttle the data transfer
+ to a specified limit (across all transfer threads).
+
+ This class tracks a moving average of some aspect of the transfer
+ rate (bandwidth, records per second, http connections per
+ second). It keeps two windows of counts of bytes transferred, on a
+ per-thread basis. One block is the "current" block, and the other is
+ the "prior" block. It will rotate the counts from current to prior
+ when ROTATE_PERIOD has passed. Thus, the current block will
+ represent from 0 seconds to ROTATE_PERIOD seconds of activity
+ (determined by: time.time() - self.last_rotate). The prior block
+ will always represent a full ROTATE_PERIOD.
+
+ Sleeping is performed just before a transfer of another block, and is
+ based on the counts transferred *before* the next transfer. It really
+ does not matter how much will be transferred, but only that for all the
+ data transferred SO FAR that we have interspersed enough pauses to
+ ensure the aggregate transfer rate is within the specified limit.
+
+ These counts are maintained on a per-thread basis, so we do not require
+ any interlocks around incrementing the counts. There IS an interlock on
+ the rotation of the counts because we do not want multiple threads to
+ multiply-rotate the counts.
+
+ There are various race conditions in the computation and collection
+ of these counts. We do not require precise values, but simply to
+ keep the overall transfer within the bandwidth limits. If a given
+ pause is a little short, or a little long, then the aggregate delays
+ will be correct.
+ """
+
+ ROTATE_PERIOD = 600
+
+ def __init__(self,
+ get_time=time.time,
+ thread_sleep=InterruptibleSleep,
+ layout=None):
+ self.get_time = get_time
+ self.thread_sleep = thread_sleep
+
+ self.start_time = get_time()
+ self.transferred = {}
+ self.prior_block = {}
+ self.totals = {}
+ self.throttles = {}
+
+ self.last_rotate = {}
+ self.rotate_mutex = {}
+ if layout:
+ self.AddThrottles(layout)
+
+ def AddThrottle(self, name, limit):
+ self.throttles[name] = limit
+ self.transferred[name] = {}
+ self.prior_block[name] = {}
+ self.totals[name] = {}
+ self.last_rotate[name] = self.get_time()
+ self.rotate_mutex[name] = threading.Lock()
+
+ def AddThrottles(self, layout):
+ for key, value in layout.iteritems():
+ self.AddThrottle(key, value)
+
+ def Register(self, thread):
+ """Register this thread with the throttler."""
+ thread_id = id(thread)
+ for throttle_name in self.throttles.iterkeys():
+ self.transferred[throttle_name][thread_id] = 0
+ self.prior_block[throttle_name][thread_id] = 0
+ self.totals[throttle_name][thread_id] = 0
+
+ def VerifyThrottleName(self, throttle_name):
+ if throttle_name not in self.throttles:
+ raise UnknownThrottleNameError('%s is not a registered throttle' %
+ throttle_name)
+
+ def AddTransfer(self, throttle_name, token_count):
+ """Add a count to the amount this thread has transferred.
+
+ Each time a thread transfers some data, it should call this method to
+ note the amount sent. The counts may be rotated if sufficient time
+ has passed since the last rotation.
+
+ Args:
+ throttle_name: The name of the throttle to add to.
+ token_count: The number to add to the throttle counter.
+ """
+ self.VerifyThrottleName(throttle_name)
+ transferred = self.transferred[throttle_name]
+ try:
+ transferred[id(threading.currentThread())] += token_count
+ except KeyError:
+ thread = threading.currentThread()
+ raise ThreadNotRegisteredError(
+ 'Unregistered thread accessing throttled datastore stub: id = %s\n'
+ 'name = %s' % (id(thread), thread.getName()))
+
+ if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time():
+ self._RotateCounts(throttle_name)
+
+ def Sleep(self, throttle_name=None):
+ """Possibly sleep in order to limit the transfer rate.
+
+ Note that we sleep based on *prior* transfers rather than what we
+ may be about to transfer. The next transfer could put us under/over
+ and that will be rectified *after* that transfer. Net result is that
+ the average transfer rate will remain within bounds. Spiky behavior
+ or uneven rates among the threads could possibly bring the transfer
+ rate above the requested limit for short durations.
+
+ Args:
+ throttle_name: The name of the throttle to sleep on. If None or
+ omitted, then sleep on all throttles.
+ """
+ if throttle_name is None:
+ for throttle_name in self.throttles:
+ self.Sleep(throttle_name=throttle_name)
+ return
+
+ self.VerifyThrottleName(throttle_name)
+
+ thread = threading.currentThread()
+
+ while True:
+ duration = self.get_time() - self.last_rotate[throttle_name]
+
+ total = 0
+ for count in self.prior_block[throttle_name].values():
+ total += count
+
+ if total:
+ duration += self.ROTATE_PERIOD
+
+ for count in self.transferred[throttle_name].values():
+ total += count
+
+ sleep_time = self._SleepTime(total, self.throttles[throttle_name],
+ duration)
+
+ if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
+ break
+
+ logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
+ '(duration=%.1f ms, total=%d)',
+ thread.getName(), throttle_name,
+ sleep_time * 1000, duration * 1000, total)
+ self.thread_sleep(sleep_time)
+ if thread.exit_flag:
+ break
+ self._RotateCounts(throttle_name)
+
+ def _SleepTime(self, total, limit, duration):
+ """Calculate the time to sleep on a throttle.
+
+ Args:
+ total: The total amount transferred.
+ limit: The amount per second that is allowed to be sent.
+ duration: The amount of time taken to send the total.
+
+ Returns:
+ A float for the amount of time to sleep.
+ """
+ if not limit:
+ return 0.0
+ return max(0.0, (total / limit) - duration)
+
+ def _RotateCounts(self, throttle_name):
+ """Rotate the transfer counters.
+
+ If sufficient time has passed, then rotate the counters from active to
+ the prior-block of counts.
+
+ This rotation is interlocked to ensure that multiple threads do not
+ over-rotate the counts.
+
+ Args:
+ throttle_name: The name of the throttle to rotate.
+ """
+ self.VerifyThrottleName(throttle_name)
+ self.rotate_mutex[throttle_name].acquire()
+ try:
+ next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD
+ if next_rotate_time >= self.get_time():
+ return
+
+ for name, count in self.transferred[throttle_name].items():
+
+
+ self.prior_block[throttle_name][name] = count
+ self.transferred[throttle_name][name] = 0
+
+ self.totals[throttle_name][name] += count
+
+ self.last_rotate[throttle_name] = self.get_time()
+
+ finally:
+ self.rotate_mutex[throttle_name].release()
+
+ def TotalTransferred(self, throttle_name):
+ """Return the total transferred, and over what period.
+
+ Args:
+ throttle_name: The name of the throttle to total.
+
+ Returns:
+ A tuple of the total count and running time for the given throttle name.
+ """
+ total = 0
+ for count in self.totals[throttle_name].values():
+ total += count
+ for count in self.transferred[throttle_name].values():
+ total += count
+ return total, self.get_time() - self.start_time
+
+
+BANDWIDTH_UP = 'http-bandwidth-up'
+BANDWIDTH_DOWN = 'http-bandwidth-down'
+REQUESTS = 'http-requests'
+HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
+HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
+HTTPS_REQUESTS = 'https-requests'
+DATASTORE_CALL_COUNT = 'datastore-call-count'
+ENTITIES_FETCHED = 'entities-fetched'
+ENTITIES_MODIFIED = 'entities-modified'
+INDEX_MODIFICATIONS = 'index-modifications'
+
+
+DEFAULT_LIMITS = {
+ BANDWIDTH_UP: 100000,
+ BANDWIDTH_DOWN: 100000,
+ REQUESTS: 15,
+ HTTPS_BANDWIDTH_UP: 100000,
+ HTTPS_BANDWIDTH_DOWN: 100000,
+ HTTPS_REQUESTS: 15,
+ DATASTORE_CALL_COUNT: 120,
+ ENTITIES_FETCHED: 400,
+ ENTITIES_MODIFIED: 400,
+ INDEX_MODIFICATIONS: 1600,
+}
+
+NO_LIMITS = {
+ BANDWIDTH_UP: None,
+ BANDWIDTH_DOWN: None,
+ REQUESTS: None,
+ HTTPS_BANDWIDTH_UP: None,
+ HTTPS_BANDWIDTH_DOWN: None,
+ HTTPS_REQUESTS: None,
+ DATASTORE_CALL_COUNT: None,
+ ENTITIES_FETCHED: None,
+ ENTITIES_MODIFIED: None,
+ INDEX_MODIFICATIONS: None,
+}
+
+
+def DefaultThrottle(multiplier=1.0):
+ """Return a Throttle instance with multiplier * the quota limits."""
+ layout = dict([(name, multiplier * limit)
+ for (name, limit) in DEFAULT_LIMITS.iteritems()])
+ return Throttle(layout=layout)
+
+
+class ThrottleHandler(urllib2.BaseHandler):
+ """A urllib2 handler for http and https requests that adds to a throttle."""
+
+ def __init__(self, throttle):
+ """Initialize a ThrottleHandler.
+
+ Args:
+ throttle: A Throttle instance to call for bandwidth and http/https request
+ throttling.
+ """
+ self.throttle = throttle
+
+ def AddRequest(self, throttle_name, req):
+ """Add to bandwidth throttle for given request.
+
+ Args:
+ throttle_name: The name of the bandwidth throttle to add to.
+ req: The request whose size will be added to the throttle.
+ """
+ size = 0
+ for key, value in req.headers.iteritems():
+ size += len('%s: %s\n' % (key, value))
+ for key, value in req.unredirected_hdrs.iteritems():
+ size += len('%s: %s\n' % (key, value))
+ (unused_scheme,
+ unused_host_port, url_path,
+ unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
+ size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
+ data = req.get_data()
+ if data:
+ size += len(data)
+ self.throttle.AddTransfer(throttle_name, size)
+
+ def AddResponse(self, throttle_name, res):
+ """Add to bandwidth throttle for given response.
+
+ Args:
+ throttle_name: The name of the bandwidth throttle to add to.
+ res: The response whose size will be added to the throttle.
+ """
+ content = res.read()
+
+ def ReturnContent():
+ return content
+
+ res.read = ReturnContent
+ size = len(content)
+ headers = res.info()
+ for key, value in headers.items():
+ size += len('%s: %s\n' % (key, value))
+ self.throttle.AddTransfer(throttle_name, size)
+
+ def http_request(self, req):
+ """Process an HTTP request.
+
+ If the throttle is over quota, sleep first. Then add request size to
+ throttle before returning it to be sent.
+
+ Args:
+ req: A urllib2.Request object.
+
+ Returns:
+ The request passed in.
+ """
+ self.throttle.Sleep(BANDWIDTH_UP)
+ self.throttle.Sleep(BANDWIDTH_DOWN)
+ self.AddRequest(BANDWIDTH_UP, req)
+ return req
+
+ def https_request(self, req):
+ """Process an HTTPS request.
+
+ If the throttle is over quota, sleep first. Then add request size to
+ throttle before returning it to be sent.
+
+ Args:
+ req: A urllib2.Request object.
+
+ Returns:
+ The request passed in.
+ """
+ self.throttle.Sleep(HTTPS_BANDWIDTH_UP)
+ self.throttle.Sleep(HTTPS_BANDWIDTH_DOWN)
+ self.AddRequest(HTTPS_BANDWIDTH_UP, req)
+ return req
+
+ def http_response(self, unused_req, res):
+ """Process an HTTP response.
+
+ The size of the response is added to the bandwidth throttle and the request
+ throttle is incremented by one.
+
+ Args:
+ unused_req: The urllib2 request for this response.
+ res: A urllib2 response object.
+
+ Returns:
+ The response passed in.
+ """
+ self.AddResponse(BANDWIDTH_DOWN, res)
+ self.throttle.AddTransfer(REQUESTS, 1)
+ return res
+
+ def https_response(self, unused_req, res):
+ """Process an HTTPS response.
+
+ The size of the response is added to the bandwidth throttle and the request
+ throttle is incremented by one.
+
+ Args:
+ unused_req: The urllib2 request for this response.
+ res: A urllib2 response object.
+
+ Returns:
+ The response passed in.
+ """
+ self.AddResponse(HTTPS_BANDWIDTH_DOWN, res)
+ self.throttle.AddTransfer(HTTPS_REQUESTS, 1)
+ return res
+
+
+class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer):
+ """Provides a simplified RPC-style interface for HTTP requests.
+
+ This RPC server uses a Throttle to prevent exceeding quotas.
+ """
+
+ def __init__(self, throttle, *args, **kwargs):
+ """Initialize a ThrottledHttpRpcServer.
+
+ Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance.
+
+ Args:
+ throttle: A Throttles instance.
+ args: Positional arguments to pass through to
+ appengine_rpc.HttpRpcServer.__init__
+ kwargs: Keyword arguments to pass through to
+ appengine_rpc.HttpRpcServer.__init__
+ """
+ self.throttle = throttle
+ appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs)
+
+ def _GetOpener(self):
+ """Returns an OpenerDirector that supports cookies and ignores redirects.
+
+ Returns:
+ A urllib2.OpenerDirector object.
+ """
+ opener = appengine_rpc.HttpRpcServer._GetOpener(self)
+ opener.add_handler(ThrottleHandler(self.throttle))
+
+ return opener
+
+
+def ThrottledHttpRpcServerFactory(throttle):
+ """Create a factory to produce ThrottledHttpRpcServer for a given throttle.
+
+ Args:
+ throttle: A Throttle instance to use for the ThrottledHttpRpcServer.
+
+ Returns:
+ A factory to produce a ThrottledHttpRpcServer.
+ """
+
+ def MakeRpcServer(*args, **kwargs):
+ """Factory to produce a ThrottledHttpRpcServer.
+
+ Args:
+ args: Positional args to pass to ThrottledHttpRpcServer.
+ kwargs: Keyword args to pass to ThrottledHttpRpcServer.
+
+ Returns:
+ A ThrottledHttpRpcServer instance.
+ """
+ kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
+ kwargs['save_cookies'] = True
+ rpc_server = ThrottledHttpRpcServer(throttle, *args, **kwargs)
+ return rpc_server
+ return MakeRpcServer
+
+
+class Throttler(object):
+ def PrehookHandler(self, service, call, request, response):
+ handler = getattr(self, '_Prehook_' + call, None)
+ if handler:
+ handler(request, response)
+
+ def PosthookHandler(self, service, call, request, response):
+ handler = getattr(self, '_Posthook_' + call, None)
+ if handler:
+ handler(request, response)
+
+
+def SleepHandler(*throttle_names):
+ def SleepOnThrottles(self, request, response):
+ for throttle_name in throttle_names:
+ self._DatastoreThrottler__throttle.Sleep(throttle_name)
+ return SleepOnThrottles
+
+
+class DatastoreThrottler(Throttler):
+ def __init__(self, throttle):
+ Throttler.__init__(self)
+ self.__throttle = throttle
+
+ def AddCost(self, cost_proto):
+ """Add costs from the Cost protobuf."""
+ self.__throttle.AddTransfer(INDEX_MODIFICATIONS, cost_proto.index_writes())
+ self.__throttle.AddTransfer(ENTITIES_MODIFIED, cost_proto.entity_writes())
+
+
+ _Prehook_Put = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS)
+
+ def _Posthook_Put(self, request, response):
+ self.AddCost(response.cost())
+
+
+ _Prehook_Get = SleepHandler(ENTITIES_FETCHED)
+
+ def _Posthook_Get(self, request, response):
+ self.__throttle.AddTransfer(ENTITIES_FETCHED, response.entity_size())
+
+
+ _Prehook_RunQuery = SleepHandler(ENTITIES_FETCHED)
+
+ def _Posthook_RunQuery(self, request, response):
+ if not response.keys_only():
+ self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size())
+
+
+ _Prehook_Next = SleepHandler(ENTITIES_FETCHED)
+
+ def _Posthook_Next(self, request, response):
+ if not response.keys_only():
+ self.__throttle.AddTransfer(ENTITIES_FETCHED, response.result_size())
+
+
+ _Prehook_Delete = SleepHandler(ENTITIES_MODIFIED, INDEX_MODIFICATIONS)
+
+ def _Posthook_Delete(self, request, response):
+ self.AddCost(response.cost())
+
+
+ _Prehook_Commit = SleepHandler()
+
+ def _Posthook_Commit(self, request, response):
+ self.AddCost(response.cost())
+
+
+def ThrottleRemoteDatastore(throttle, remote_datastore_stub=None):
+ """Install the given throttle for the remote datastore stub.
+
+ Args:
+ throttle: A Throttle instance to limit datastore access rates
+ remote_datastore_stub: The datstore stub instance to throttle, for
+ testing purposes.
+ """
+ if not remote_datastore_stub:
+ remote_datastore_stub = apiproxy_stub_map.apiproxy.GetStub('datastore_v3')
+ if not isinstance(remote_datastore_stub, remote_api_stub.RemoteDatastoreStub):
+ raise remote_api_stub.ConfigurationError('remote_api is not configured.')
+ throttler = DatastoreThrottler(throttle)
+ remote_datastore_stub._PreHookHandler = throttler.PrehookHandler
+ remote_datastore_stub._PostHookHandler = throttler.PosthookHandler