thirdparty/google_appengine/google/appengine/tools/adaptive_thread_pool.py
changeset 2864 2e0b0af889be
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/tools/adaptive_thread_pool.py	Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,460 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Provides thread-pool-like functionality for workers accessing App Engine.
+
+The pool adapts to slow or timing out requests by reducing the number of
+active workers, or increasing the number when requests latency reduces.
+"""
+
+
+
+import logging
+import Queue
+import sys
+import threading
+import time
+import traceback
+
+from google.appengine.tools.requeue import ReQueue
+
+logger = logging.getLogger('google.appengine.tools.adaptive_thread_pool')
+
+_THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
+
+INITIAL_BACKOFF = 1.0
+
+BACKOFF_FACTOR = 2.0
+
+
+class Error(Exception):
+  """Base-class for exceptions in this module."""
+
+
+class WorkItemError(Error):
+  """Error while processing a WorkItem."""
+
+
+class RetryException(Error):
+  """A non-fatal exception that indicates that a work item should be retried."""
+
+
+def InterruptibleSleep(sleep_time):
+  """Puts thread to sleep, checking this threads exit_flag four times a second.
+
+  Args:
+    sleep_time: Time to sleep.
+  """
+  slept = 0.0
+  epsilon = .0001
+  thread = threading.currentThread()
+  while slept < sleep_time - epsilon:
+    remaining = sleep_time - slept
+    this_sleep_time = min(remaining, 0.25)
+    time.sleep(this_sleep_time)
+    slept += this_sleep_time
+    if thread.exit_flag:
+      return
+
+
+class WorkerThread(threading.Thread):
+  """A WorkerThread to execute WorkItems.
+
+  Attributes:
+    exit_flag: A boolean indicating whether this thread should stop
+      its work and exit.
+  """
+
+  def __init__(self, thread_pool, thread_gate, name=None):
+    """Initialize a WorkerThread instance.
+
+    Args:
+      thread_pool: An AdaptiveThreadPool instance.
+      thread_gate: A ThreadGate instance.
+      name: A name for this WorkerThread.
+    """
+    threading.Thread.__init__(self)
+
+    self.setDaemon(True)
+
+    self.exit_flag = False
+    self.__error = None
+    self.__traceback = None
+    self.__thread_pool = thread_pool
+    self.__work_queue = thread_pool.requeue
+    self.__thread_gate = thread_gate
+    if not name:
+      self.__name = 'Anonymous_' + self.__class__.__name__
+    else:
+      self.__name = name
+
+  def run(self):
+    """Perform the work of the thread."""
+    logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__)
+
+    try:
+      self.WorkOnItems()
+    except:
+      self.SetError()
+
+    logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__)
+
+  def SetError(self):
+    """Sets the error and traceback information for this thread.
+
+    This must be called from an exception handler.
+    """
+    if not self.__error:
+      exc_info = sys.exc_info()
+      self.__error = exc_info[1]
+      self.__traceback = exc_info[2]
+      logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
+
+  def WorkOnItems(self):
+    """Perform the work of a WorkerThread."""
+    while not self.exit_flag:
+      item = None
+      self.__thread_gate.StartWork()
+      try:
+        status, instruction = WorkItem.FAILURE, ThreadGate.DECREASE
+        try:
+          if self.exit_flag:
+            instruction = ThreadGate.HOLD
+            break
+
+          try:
+            item = self.__work_queue.get(block=True, timeout=1.0)
+          except Queue.Empty:
+            instruction = ThreadGate.HOLD
+            continue
+          if item == _THREAD_SHOULD_EXIT or self.exit_flag:
+            status, instruction = WorkItem.SUCCESS, ThreadGate.HOLD
+            break
+
+          logger.debug('[%s] Got work item %s', self.getName(), item)
+
+          status, instruction = item.PerformWork(self.__thread_pool)
+        except RetryException:
+          status, instruction = WorkItem.RETRY, ThreadGate.HOLD
+        except:
+          self.SetError()
+          raise
+
+      finally:
+        try:
+          if item:
+            if status == WorkItem.SUCCESS:
+              self.__work_queue.task_done()
+            elif status == WorkItem.RETRY:
+              try:
+                self.__work_queue.reput(item, block=False)
+              except Queue.Full:
+                logger.error('[%s] Failed to reput work item.', self.getName())
+                raise Error('Failed to reput work item')
+            else:
+              if not self.__error:
+                if item.error:
+                  self.__error = item.error
+                  self.__traceback = item.traceback
+                else:
+                  self.__error = WorkItemError(
+                      'Fatal error while processing %s' % item)
+                raise self.__error
+
+        finally:
+          self.__thread_gate.FinishWork(instruction=instruction)
+
+  def CheckError(self):
+    """If an error is present, then log it."""
+    if self.__error:
+      logger.error('Error in %s: %s', self.getName(), self.__error)
+      if self.__traceback:
+        logger.debug('%s', ''.join(traceback.format_exception(
+            self.__error.__class__,
+            self.__error,
+            self.__traceback)))
+
+  def __str__(self):
+    return self.__name
+
+
+class AdaptiveThreadPool(object):
+  """A thread pool which processes WorkItems from a queue.
+
+  Attributes:
+    requeue: The requeue instance which holds work items for this
+      thread pool.
+  """
+
+  def __init__(self,
+               num_threads,
+               queue_size=None,
+               base_thread_name=None,
+               worker_thread_factory=WorkerThread,
+               queue_factory=Queue.Queue):
+    """Initialize an AdaptiveThreadPool.
+
+    An adaptive thread pool executes WorkItems using a number of
+    WorkerThreads.  WorkItems represent items of work that may
+    succeed, soft fail, or hard fail. In addition, a completed work
+    item can signal this AdaptiveThreadPool to enable more or fewer
+    threads.  Initially one thread is active.  Soft failures are
+    reqeueud to be retried.  Hard failures cause this
+    AdaptiveThreadPool to shut down entirely.  See the WorkItem class
+    for more details.
+
+    Args:
+      num_threads: The number of threads to use.
+      queue_size: The size of the work item queue to use.
+      base_thread_name: A string from which worker thread names are derived.
+      worker_thread_factory: A factory which procudes WorkerThreads.
+      queue_factory: Used for dependency injection.
+    """
+    if queue_size is None:
+      queue_size = num_threads
+    self.requeue = ReQueue(queue_size, queue_factory=queue_factory)
+    self.__thread_gate = ThreadGate(num_threads)
+    self.__num_threads = num_threads
+    self.__threads = []
+    for i in xrange(num_threads):
+      thread = worker_thread_factory(self, self.__thread_gate)
+      if base_thread_name:
+        base = base_thread_name
+      else:
+        base = thread.__class__.__name__
+      thread.name = '%s-%d' % (base, i)
+      self.__threads.append(thread)
+      thread.start()
+
+  def num_threads(self):
+    """Return the number of threads in this thread pool."""
+    return self.__num_threads
+
+  def Threads(self):
+    """Yields the registered threads."""
+    for thread in self.__threads:
+      yield thread
+
+  def SubmitItem(self, item, block=True, timeout=0.0):
+    """Submit a WorkItem to the AdaptiveThreadPool.
+
+    Args:
+      item: A WorkItem instance.
+      block: Whether to block on submitting if the submit queue is full.
+      timeout: Time wait for room in the queue if block is True, 0.0 to
+        block indefinitely.
+
+    Raises:
+      Queue.Full if the submit queue is full.
+    """
+    self.requeue.put(item, block=block, timeout=timeout)
+
+  def QueuedItemCount(self):
+    """Returns the number of items currently in the queue."""
+    return self.requeue.qsize()
+
+  def Shutdown(self):
+    """Shutdown the thread pool.
+
+    Tasks may remain unexecuted in the submit queue.
+    """
+    while not self.requeue.empty():
+      try:
+        unused_item = self.requeue.get_nowait()
+        self.requeue.task_done()
+      except Queue.Empty:
+        pass
+    for thread in self.__threads:
+      thread.exit_flag = True
+      self.requeue.put(_THREAD_SHOULD_EXIT)
+    self.__thread_gate.EnableAllThreads()
+
+  def Wait(self):
+    """Wait until all work items have been completed."""
+    self.requeue.join()
+
+  def JoinThreads(self):
+    """Wait for all threads to exit."""
+    for thread in self.__threads:
+      logger.debug('Waiting for %s to exit' % str(thread))
+      thread.join()
+
+  def CheckErrors(self):
+    """Output logs for any errors that occurred in the worker threads."""
+    for thread in self.__threads:
+      thread.CheckError()
+
+
+class ThreadGate(object):
+  """Manage the number of active worker threads.
+
+  The ThreadGate limits the number of threads that are simultaneously
+  active in order to implement adaptive rate control.
+
+  Initially the ThreadGate allows only one thread to be active.  For
+  each successful work item, another thread is activated and for each
+  failed item, the number of active threads is reduced by one.  When only
+  one thread is active, failures will cause exponential backoff.
+
+  For example, a ThreadGate instance, thread_gate can be used in a number
+  of threads as so:
+
+  # Block until this thread is enabled for work.
+  thread_gate.StartWork()
+  try:
+    status = DoSomeWorkInvolvingLimitedSharedResources()
+    suceeded = IsStatusGood(status)
+    badly_failed = IsStatusVeryBad(status)
+  finally:
+    if suceeded:
+      # Suceeded, add more simultaneously enabled threads to the task.
+      thread_gate.FinishWork(instruction=ThreadGate.INCREASE)
+    elif badly_failed:
+      # Failed, or succeeded but with high resource load, reduce number of
+      # workers.
+      thread_gate.FinishWork(instruction=ThreadGate.DECREASE)
+    else:
+      # We succeeded, but don't want to add more workers to the task.
+      thread_gate.FinishWork(instruction=ThreadGate.HOLD)
+
+  the thread_gate will enable and disable/backoff threads in response to
+  resource load conditions.
+
+  StartWork can block indefinitely. FinishWork, while not
+  lock-free, should never block absent a demonic scheduler.
+  """
+
+  INCREASE = 'increase'
+  HOLD = 'hold'
+  DECREASE = 'decrease'
+
+  def __init__(self,
+               num_threads,
+               sleep=InterruptibleSleep):
+    """Constructor for ThreadGate instances.
+
+    Args:
+      num_threads: The total number of threads using this gate.
+      sleep: Used for dependency injection.
+    """
+    self.__enabled_count = 1
+    self.__lock = threading.Lock()
+    self.__thread_semaphore = threading.Semaphore(self.__enabled_count)
+    self.__num_threads = num_threads
+    self.__backoff_time = 0
+    self.__sleep = sleep
+
+  def num_threads(self):
+    return self.__num_threads
+
+  def EnableThread(self):
+    """Enable one more worker thread."""
+    self.__lock.acquire()
+    try:
+      self.__enabled_count += 1
+    finally:
+      self.__lock.release()
+    self.__thread_semaphore.release()
+
+  def EnableAllThreads(self):
+    """Enable all worker threads."""
+    for unused_idx in xrange(self.__num_threads - self.__enabled_count):
+      self.EnableThread()
+
+  def StartWork(self):
+    """Starts a critical section in which the number of workers is limited.
+
+    Starts a critical section which allows self.__enabled_count
+    simultaneously operating threads. The critical section is ended by
+    calling self.FinishWork().
+    """
+    self.__thread_semaphore.acquire()
+    if self.__backoff_time > 0.0:
+      if not threading.currentThread().exit_flag:
+        logger.info('Backing off due to errors: %.1f seconds',
+                    self.__backoff_time)
+        self.__sleep(self.__backoff_time)
+
+  def FinishWork(self, instruction=None):
+    """Ends a critical section started with self.StartWork()."""
+    if not instruction or instruction == ThreadGate.HOLD:
+      self.__thread_semaphore.release()
+
+    elif instruction == ThreadGate.INCREASE:
+      if self.__backoff_time > 0.0:
+        logger.info('Resetting backoff to 0.0')
+        self.__backoff_time = 0.0
+      do_enable = False
+      self.__lock.acquire()
+      try:
+        if self.__num_threads > self.__enabled_count:
+          do_enable = True
+          self.__enabled_count += 1
+      finally:
+        self.__lock.release()
+      if do_enable:
+        logger.debug('Increasing active thread count to %d',
+                     self.__enabled_count)
+        self.__thread_semaphore.release()
+      self.__thread_semaphore.release()
+
+    elif instruction == ThreadGate.DECREASE:
+      do_disable = False
+      self.__lock.acquire()
+      try:
+        if self.__enabled_count > 1:
+          do_disable = True
+          self.__enabled_count -= 1
+        else:
+          if self.__backoff_time == 0.0:
+            self.__backoff_time = INITIAL_BACKOFF
+          else:
+            self.__backoff_time *= BACKOFF_FACTOR
+      finally:
+        self.__lock.release()
+        if do_disable:
+          logger.debug('Decreasing the number of active threads to %d',
+                       self.__enabled_count)
+        else:
+          self.__thread_semaphore.release()
+
+
+class WorkItem(object):
+  """Holds a unit of work."""
+
+  SUCCESS = 'success'
+  RETRY = 'retry'
+  FAILURE = 'failure'
+
+  def __init__(self, name):
+    self.__name = name
+
+  def PerformWork(self, thread_pool):
+    """Perform the work of this work item and report the results.
+
+    Args:
+      thread_pool: The AdaptiveThreadPool instance associated with this
+        thread.
+
+    Returns:
+      A tuple (status, instruction) of the work status and an instruction
+      for the ThreadGate.
+    """
+    raise NotImplementedError
+
+  def __str__(self):
+    return self.__name