--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/tools/adaptive_thread_pool.py Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,460 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Provides thread-pool-like functionality for workers accessing App Engine.
+
+The pool adapts to slow or timing out requests by reducing the number of
+active workers, or increasing the number when requests latency reduces.
+"""
+
+
+
+import logging
+import Queue
+import sys
+import threading
+import time
+import traceback
+
+from google.appengine.tools.requeue import ReQueue
+
+logger = logging.getLogger('google.appengine.tools.adaptive_thread_pool')
+
+_THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
+
+INITIAL_BACKOFF = 1.0
+
+BACKOFF_FACTOR = 2.0
+
+
+class Error(Exception):
+ """Base-class for exceptions in this module."""
+
+
+class WorkItemError(Error):
+ """Error while processing a WorkItem."""
+
+
+class RetryException(Error):
+ """A non-fatal exception that indicates that a work item should be retried."""
+
+
+def InterruptibleSleep(sleep_time):
+ """Puts thread to sleep, checking this threads exit_flag four times a second.
+
+ Args:
+ sleep_time: Time to sleep.
+ """
+ slept = 0.0
+ epsilon = .0001
+ thread = threading.currentThread()
+ while slept < sleep_time - epsilon:
+ remaining = sleep_time - slept
+ this_sleep_time = min(remaining, 0.25)
+ time.sleep(this_sleep_time)
+ slept += this_sleep_time
+ if thread.exit_flag:
+ return
+
+
+class WorkerThread(threading.Thread):
+ """A WorkerThread to execute WorkItems.
+
+ Attributes:
+ exit_flag: A boolean indicating whether this thread should stop
+ its work and exit.
+ """
+
+ def __init__(self, thread_pool, thread_gate, name=None):
+ """Initialize a WorkerThread instance.
+
+ Args:
+ thread_pool: An AdaptiveThreadPool instance.
+ thread_gate: A ThreadGate instance.
+ name: A name for this WorkerThread.
+ """
+ threading.Thread.__init__(self)
+
+ self.setDaemon(True)
+
+ self.exit_flag = False
+ self.__error = None
+ self.__traceback = None
+ self.__thread_pool = thread_pool
+ self.__work_queue = thread_pool.requeue
+ self.__thread_gate = thread_gate
+ if not name:
+ self.__name = 'Anonymous_' + self.__class__.__name__
+ else:
+ self.__name = name
+
+ def run(self):
+ """Perform the work of the thread."""
+ logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__)
+
+ try:
+ self.WorkOnItems()
+ except:
+ self.SetError()
+
+ logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__)
+
+ def SetError(self):
+ """Sets the error and traceback information for this thread.
+
+ This must be called from an exception handler.
+ """
+ if not self.__error:
+ exc_info = sys.exc_info()
+ self.__error = exc_info[1]
+ self.__traceback = exc_info[2]
+ logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
+
+ def WorkOnItems(self):
+ """Perform the work of a WorkerThread."""
+ while not self.exit_flag:
+ item = None
+ self.__thread_gate.StartWork()
+ try:
+ status, instruction = WorkItem.FAILURE, ThreadGate.DECREASE
+ try:
+ if self.exit_flag:
+ instruction = ThreadGate.HOLD
+ break
+
+ try:
+ item = self.__work_queue.get(block=True, timeout=1.0)
+ except Queue.Empty:
+ instruction = ThreadGate.HOLD
+ continue
+ if item == _THREAD_SHOULD_EXIT or self.exit_flag:
+ status, instruction = WorkItem.SUCCESS, ThreadGate.HOLD
+ break
+
+ logger.debug('[%s] Got work item %s', self.getName(), item)
+
+ status, instruction = item.PerformWork(self.__thread_pool)
+ except RetryException:
+ status, instruction = WorkItem.RETRY, ThreadGate.HOLD
+ except:
+ self.SetError()
+ raise
+
+ finally:
+ try:
+ if item:
+ if status == WorkItem.SUCCESS:
+ self.__work_queue.task_done()
+ elif status == WorkItem.RETRY:
+ try:
+ self.__work_queue.reput(item, block=False)
+ except Queue.Full:
+ logger.error('[%s] Failed to reput work item.', self.getName())
+ raise Error('Failed to reput work item')
+ else:
+ if not self.__error:
+ if item.error:
+ self.__error = item.error
+ self.__traceback = item.traceback
+ else:
+ self.__error = WorkItemError(
+ 'Fatal error while processing %s' % item)
+ raise self.__error
+
+ finally:
+ self.__thread_gate.FinishWork(instruction=instruction)
+
+ def CheckError(self):
+ """If an error is present, then log it."""
+ if self.__error:
+ logger.error('Error in %s: %s', self.getName(), self.__error)
+ if self.__traceback:
+ logger.debug('%s', ''.join(traceback.format_exception(
+ self.__error.__class__,
+ self.__error,
+ self.__traceback)))
+
+ def __str__(self):
+ return self.__name
+
+
+class AdaptiveThreadPool(object):
+ """A thread pool which processes WorkItems from a queue.
+
+ Attributes:
+ requeue: The requeue instance which holds work items for this
+ thread pool.
+ """
+
+ def __init__(self,
+ num_threads,
+ queue_size=None,
+ base_thread_name=None,
+ worker_thread_factory=WorkerThread,
+ queue_factory=Queue.Queue):
+ """Initialize an AdaptiveThreadPool.
+
+ An adaptive thread pool executes WorkItems using a number of
+ WorkerThreads. WorkItems represent items of work that may
+ succeed, soft fail, or hard fail. In addition, a completed work
+ item can signal this AdaptiveThreadPool to enable more or fewer
+ threads. Initially one thread is active. Soft failures are
+ reqeueud to be retried. Hard failures cause this
+ AdaptiveThreadPool to shut down entirely. See the WorkItem class
+ for more details.
+
+ Args:
+ num_threads: The number of threads to use.
+ queue_size: The size of the work item queue to use.
+ base_thread_name: A string from which worker thread names are derived.
+ worker_thread_factory: A factory which procudes WorkerThreads.
+ queue_factory: Used for dependency injection.
+ """
+ if queue_size is None:
+ queue_size = num_threads
+ self.requeue = ReQueue(queue_size, queue_factory=queue_factory)
+ self.__thread_gate = ThreadGate(num_threads)
+ self.__num_threads = num_threads
+ self.__threads = []
+ for i in xrange(num_threads):
+ thread = worker_thread_factory(self, self.__thread_gate)
+ if base_thread_name:
+ base = base_thread_name
+ else:
+ base = thread.__class__.__name__
+ thread.name = '%s-%d' % (base, i)
+ self.__threads.append(thread)
+ thread.start()
+
+ def num_threads(self):
+ """Return the number of threads in this thread pool."""
+ return self.__num_threads
+
+ def Threads(self):
+ """Yields the registered threads."""
+ for thread in self.__threads:
+ yield thread
+
+ def SubmitItem(self, item, block=True, timeout=0.0):
+ """Submit a WorkItem to the AdaptiveThreadPool.
+
+ Args:
+ item: A WorkItem instance.
+ block: Whether to block on submitting if the submit queue is full.
+ timeout: Time wait for room in the queue if block is True, 0.0 to
+ block indefinitely.
+
+ Raises:
+ Queue.Full if the submit queue is full.
+ """
+ self.requeue.put(item, block=block, timeout=timeout)
+
+ def QueuedItemCount(self):
+ """Returns the number of items currently in the queue."""
+ return self.requeue.qsize()
+
+ def Shutdown(self):
+ """Shutdown the thread pool.
+
+ Tasks may remain unexecuted in the submit queue.
+ """
+ while not self.requeue.empty():
+ try:
+ unused_item = self.requeue.get_nowait()
+ self.requeue.task_done()
+ except Queue.Empty:
+ pass
+ for thread in self.__threads:
+ thread.exit_flag = True
+ self.requeue.put(_THREAD_SHOULD_EXIT)
+ self.__thread_gate.EnableAllThreads()
+
+ def Wait(self):
+ """Wait until all work items have been completed."""
+ self.requeue.join()
+
+ def JoinThreads(self):
+ """Wait for all threads to exit."""
+ for thread in self.__threads:
+ logger.debug('Waiting for %s to exit' % str(thread))
+ thread.join()
+
+ def CheckErrors(self):
+ """Output logs for any errors that occurred in the worker threads."""
+ for thread in self.__threads:
+ thread.CheckError()
+
+
+class ThreadGate(object):
+ """Manage the number of active worker threads.
+
+ The ThreadGate limits the number of threads that are simultaneously
+ active in order to implement adaptive rate control.
+
+ Initially the ThreadGate allows only one thread to be active. For
+ each successful work item, another thread is activated and for each
+ failed item, the number of active threads is reduced by one. When only
+ one thread is active, failures will cause exponential backoff.
+
+ For example, a ThreadGate instance, thread_gate can be used in a number
+ of threads as so:
+
+ # Block until this thread is enabled for work.
+ thread_gate.StartWork()
+ try:
+ status = DoSomeWorkInvolvingLimitedSharedResources()
+ suceeded = IsStatusGood(status)
+ badly_failed = IsStatusVeryBad(status)
+ finally:
+ if suceeded:
+ # Suceeded, add more simultaneously enabled threads to the task.
+ thread_gate.FinishWork(instruction=ThreadGate.INCREASE)
+ elif badly_failed:
+ # Failed, or succeeded but with high resource load, reduce number of
+ # workers.
+ thread_gate.FinishWork(instruction=ThreadGate.DECREASE)
+ else:
+ # We succeeded, but don't want to add more workers to the task.
+ thread_gate.FinishWork(instruction=ThreadGate.HOLD)
+
+ the thread_gate will enable and disable/backoff threads in response to
+ resource load conditions.
+
+ StartWork can block indefinitely. FinishWork, while not
+ lock-free, should never block absent a demonic scheduler.
+ """
+
+ INCREASE = 'increase'
+ HOLD = 'hold'
+ DECREASE = 'decrease'
+
+ def __init__(self,
+ num_threads,
+ sleep=InterruptibleSleep):
+ """Constructor for ThreadGate instances.
+
+ Args:
+ num_threads: The total number of threads using this gate.
+ sleep: Used for dependency injection.
+ """
+ self.__enabled_count = 1
+ self.__lock = threading.Lock()
+ self.__thread_semaphore = threading.Semaphore(self.__enabled_count)
+ self.__num_threads = num_threads
+ self.__backoff_time = 0
+ self.__sleep = sleep
+
+ def num_threads(self):
+ return self.__num_threads
+
+ def EnableThread(self):
+ """Enable one more worker thread."""
+ self.__lock.acquire()
+ try:
+ self.__enabled_count += 1
+ finally:
+ self.__lock.release()
+ self.__thread_semaphore.release()
+
+ def EnableAllThreads(self):
+ """Enable all worker threads."""
+ for unused_idx in xrange(self.__num_threads - self.__enabled_count):
+ self.EnableThread()
+
+ def StartWork(self):
+ """Starts a critical section in which the number of workers is limited.
+
+ Starts a critical section which allows self.__enabled_count
+ simultaneously operating threads. The critical section is ended by
+ calling self.FinishWork().
+ """
+ self.__thread_semaphore.acquire()
+ if self.__backoff_time > 0.0:
+ if not threading.currentThread().exit_flag:
+ logger.info('Backing off due to errors: %.1f seconds',
+ self.__backoff_time)
+ self.__sleep(self.__backoff_time)
+
+ def FinishWork(self, instruction=None):
+ """Ends a critical section started with self.StartWork()."""
+ if not instruction or instruction == ThreadGate.HOLD:
+ self.__thread_semaphore.release()
+
+ elif instruction == ThreadGate.INCREASE:
+ if self.__backoff_time > 0.0:
+ logger.info('Resetting backoff to 0.0')
+ self.__backoff_time = 0.0
+ do_enable = False
+ self.__lock.acquire()
+ try:
+ if self.__num_threads > self.__enabled_count:
+ do_enable = True
+ self.__enabled_count += 1
+ finally:
+ self.__lock.release()
+ if do_enable:
+ logger.debug('Increasing active thread count to %d',
+ self.__enabled_count)
+ self.__thread_semaphore.release()
+ self.__thread_semaphore.release()
+
+ elif instruction == ThreadGate.DECREASE:
+ do_disable = False
+ self.__lock.acquire()
+ try:
+ if self.__enabled_count > 1:
+ do_disable = True
+ self.__enabled_count -= 1
+ else:
+ if self.__backoff_time == 0.0:
+ self.__backoff_time = INITIAL_BACKOFF
+ else:
+ self.__backoff_time *= BACKOFF_FACTOR
+ finally:
+ self.__lock.release()
+ if do_disable:
+ logger.debug('Decreasing the number of active threads to %d',
+ self.__enabled_count)
+ else:
+ self.__thread_semaphore.release()
+
+
+class WorkItem(object):
+ """Holds a unit of work."""
+
+ SUCCESS = 'success'
+ RETRY = 'retry'
+ FAILURE = 'failure'
+
+ def __init__(self, name):
+ self.__name = name
+
+ def PerformWork(self, thread_pool):
+ """Perform the work of this work item and report the results.
+
+ Args:
+ thread_pool: The AdaptiveThreadPool instance associated with this
+ thread.
+
+ Returns:
+ A tuple (status, instruction) of the work status and an instruction
+ for the ThreadGate.
+ """
+ raise NotImplementedError
+
+ def __str__(self):
+ return self.__name