thirdparty/google_appengine/google/appengine/tools/adaptive_thread_pool.py
changeset 2864 2e0b0af889be
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """Provides thread-pool-like functionality for workers accessing App Engine.
       
    19 
       
    20 The pool adapts to slow or timing out requests by reducing the number of
       
    21 active workers, or increasing the number when requests latency reduces.
       
    22 """
       
    23 
       
    24 
       
    25 
       
    26 import logging
       
    27 import Queue
       
    28 import sys
       
    29 import threading
       
    30 import time
       
    31 import traceback
       
    32 
       
    33 from google.appengine.tools.requeue import ReQueue
       
    34 
       
    35 logger = logging.getLogger('google.appengine.tools.adaptive_thread_pool')
       
    36 
       
    37 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
       
    38 
       
    39 INITIAL_BACKOFF = 1.0
       
    40 
       
    41 BACKOFF_FACTOR = 2.0
       
    42 
       
    43 
       
    44 class Error(Exception):
       
    45   """Base-class for exceptions in this module."""
       
    46 
       
    47 
       
    48 class WorkItemError(Error):
       
    49   """Error while processing a WorkItem."""
       
    50 
       
    51 
       
    52 class RetryException(Error):
       
    53   """A non-fatal exception that indicates that a work item should be retried."""
       
    54 
       
    55 
       
    56 def InterruptibleSleep(sleep_time):
       
    57   """Puts thread to sleep, checking this threads exit_flag four times a second.
       
    58 
       
    59   Args:
       
    60     sleep_time: Time to sleep.
       
    61   """
       
    62   slept = 0.0
       
    63   epsilon = .0001
       
    64   thread = threading.currentThread()
       
    65   while slept < sleep_time - epsilon:
       
    66     remaining = sleep_time - slept
       
    67     this_sleep_time = min(remaining, 0.25)
       
    68     time.sleep(this_sleep_time)
       
    69     slept += this_sleep_time
       
    70     if thread.exit_flag:
       
    71       return
       
    72 
       
    73 
       
    74 class WorkerThread(threading.Thread):
       
    75   """A WorkerThread to execute WorkItems.
       
    76 
       
    77   Attributes:
       
    78     exit_flag: A boolean indicating whether this thread should stop
       
    79       its work and exit.
       
    80   """
       
    81 
       
    82   def __init__(self, thread_pool, thread_gate, name=None):
       
    83     """Initialize a WorkerThread instance.
       
    84 
       
    85     Args:
       
    86       thread_pool: An AdaptiveThreadPool instance.
       
    87       thread_gate: A ThreadGate instance.
       
    88       name: A name for this WorkerThread.
       
    89     """
       
    90     threading.Thread.__init__(self)
       
    91 
       
    92     self.setDaemon(True)
       
    93 
       
    94     self.exit_flag = False
       
    95     self.__error = None
       
    96     self.__traceback = None
       
    97     self.__thread_pool = thread_pool
       
    98     self.__work_queue = thread_pool.requeue
       
    99     self.__thread_gate = thread_gate
       
   100     if not name:
       
   101       self.__name = 'Anonymous_' + self.__class__.__name__
       
   102     else:
       
   103       self.__name = name
       
   104 
       
   105   def run(self):
       
   106     """Perform the work of the thread."""
       
   107     logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__)
       
   108 
       
   109     try:
       
   110       self.WorkOnItems()
       
   111     except:
       
   112       self.SetError()
       
   113 
       
   114     logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__)
       
   115 
       
   116   def SetError(self):
       
   117     """Sets the error and traceback information for this thread.
       
   118 
       
   119     This must be called from an exception handler.
       
   120     """
       
   121     if not self.__error:
       
   122       exc_info = sys.exc_info()
       
   123       self.__error = exc_info[1]
       
   124       self.__traceback = exc_info[2]
       
   125       logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
       
   126 
       
   127   def WorkOnItems(self):
       
   128     """Perform the work of a WorkerThread."""
       
   129     while not self.exit_flag:
       
   130       item = None
       
   131       self.__thread_gate.StartWork()
       
   132       try:
       
   133         status, instruction = WorkItem.FAILURE, ThreadGate.DECREASE
       
   134         try:
       
   135           if self.exit_flag:
       
   136             instruction = ThreadGate.HOLD
       
   137             break
       
   138 
       
   139           try:
       
   140             item = self.__work_queue.get(block=True, timeout=1.0)
       
   141           except Queue.Empty:
       
   142             instruction = ThreadGate.HOLD
       
   143             continue
       
   144           if item == _THREAD_SHOULD_EXIT or self.exit_flag:
       
   145             status, instruction = WorkItem.SUCCESS, ThreadGate.HOLD
       
   146             break
       
   147 
       
   148           logger.debug('[%s] Got work item %s', self.getName(), item)
       
   149 
       
   150           status, instruction = item.PerformWork(self.__thread_pool)
       
   151         except RetryException:
       
   152           status, instruction = WorkItem.RETRY, ThreadGate.HOLD
       
   153         except:
       
   154           self.SetError()
       
   155           raise
       
   156 
       
   157       finally:
       
   158         try:
       
   159           if item:
       
   160             if status == WorkItem.SUCCESS:
       
   161               self.__work_queue.task_done()
       
   162             elif status == WorkItem.RETRY:
       
   163               try:
       
   164                 self.__work_queue.reput(item, block=False)
       
   165               except Queue.Full:
       
   166                 logger.error('[%s] Failed to reput work item.', self.getName())
       
   167                 raise Error('Failed to reput work item')
       
   168             else:
       
   169               if not self.__error:
       
   170                 if item.error:
       
   171                   self.__error = item.error
       
   172                   self.__traceback = item.traceback
       
   173                 else:
       
   174                   self.__error = WorkItemError(
       
   175                       'Fatal error while processing %s' % item)
       
   176                 raise self.__error
       
   177 
       
   178         finally:
       
   179           self.__thread_gate.FinishWork(instruction=instruction)
       
   180 
       
   181   def CheckError(self):
       
   182     """If an error is present, then log it."""
       
   183     if self.__error:
       
   184       logger.error('Error in %s: %s', self.getName(), self.__error)
       
   185       if self.__traceback:
       
   186         logger.debug('%s', ''.join(traceback.format_exception(
       
   187             self.__error.__class__,
       
   188             self.__error,
       
   189             self.__traceback)))
       
   190 
       
   191   def __str__(self):
       
   192     return self.__name
       
   193 
       
   194 
       
   195 class AdaptiveThreadPool(object):
       
   196   """A thread pool which processes WorkItems from a queue.
       
   197 
       
   198   Attributes:
       
   199     requeue: The requeue instance which holds work items for this
       
   200       thread pool.
       
   201   """
       
   202 
       
   203   def __init__(self,
       
   204                num_threads,
       
   205                queue_size=None,
       
   206                base_thread_name=None,
       
   207                worker_thread_factory=WorkerThread,
       
   208                queue_factory=Queue.Queue):
       
   209     """Initialize an AdaptiveThreadPool.
       
   210 
       
   211     An adaptive thread pool executes WorkItems using a number of
       
   212     WorkerThreads.  WorkItems represent items of work that may
       
   213     succeed, soft fail, or hard fail. In addition, a completed work
       
   214     item can signal this AdaptiveThreadPool to enable more or fewer
       
   215     threads.  Initially one thread is active.  Soft failures are
       
   216     reqeueud to be retried.  Hard failures cause this
       
   217     AdaptiveThreadPool to shut down entirely.  See the WorkItem class
       
   218     for more details.
       
   219 
       
   220     Args:
       
   221       num_threads: The number of threads to use.
       
   222       queue_size: The size of the work item queue to use.
       
   223       base_thread_name: A string from which worker thread names are derived.
       
   224       worker_thread_factory: A factory which procudes WorkerThreads.
       
   225       queue_factory: Used for dependency injection.
       
   226     """
       
   227     if queue_size is None:
       
   228       queue_size = num_threads
       
   229     self.requeue = ReQueue(queue_size, queue_factory=queue_factory)
       
   230     self.__thread_gate = ThreadGate(num_threads)
       
   231     self.__num_threads = num_threads
       
   232     self.__threads = []
       
   233     for i in xrange(num_threads):
       
   234       thread = worker_thread_factory(self, self.__thread_gate)
       
   235       if base_thread_name:
       
   236         base = base_thread_name
       
   237       else:
       
   238         base = thread.__class__.__name__
       
   239       thread.name = '%s-%d' % (base, i)
       
   240       self.__threads.append(thread)
       
   241       thread.start()
       
   242 
       
   243   def num_threads(self):
       
   244     """Return the number of threads in this thread pool."""
       
   245     return self.__num_threads
       
   246 
       
   247   def Threads(self):
       
   248     """Yields the registered threads."""
       
   249     for thread in self.__threads:
       
   250       yield thread
       
   251 
       
   252   def SubmitItem(self, item, block=True, timeout=0.0):
       
   253     """Submit a WorkItem to the AdaptiveThreadPool.
       
   254 
       
   255     Args:
       
   256       item: A WorkItem instance.
       
   257       block: Whether to block on submitting if the submit queue is full.
       
   258       timeout: Time wait for room in the queue if block is True, 0.0 to
       
   259         block indefinitely.
       
   260 
       
   261     Raises:
       
   262       Queue.Full if the submit queue is full.
       
   263     """
       
   264     self.requeue.put(item, block=block, timeout=timeout)
       
   265 
       
   266   def QueuedItemCount(self):
       
   267     """Returns the number of items currently in the queue."""
       
   268     return self.requeue.qsize()
       
   269 
       
   270   def Shutdown(self):
       
   271     """Shutdown the thread pool.
       
   272 
       
   273     Tasks may remain unexecuted in the submit queue.
       
   274     """
       
   275     while not self.requeue.empty():
       
   276       try:
       
   277         unused_item = self.requeue.get_nowait()
       
   278         self.requeue.task_done()
       
   279       except Queue.Empty:
       
   280         pass
       
   281     for thread in self.__threads:
       
   282       thread.exit_flag = True
       
   283       self.requeue.put(_THREAD_SHOULD_EXIT)
       
   284     self.__thread_gate.EnableAllThreads()
       
   285 
       
   286   def Wait(self):
       
   287     """Wait until all work items have been completed."""
       
   288     self.requeue.join()
       
   289 
       
   290   def JoinThreads(self):
       
   291     """Wait for all threads to exit."""
       
   292     for thread in self.__threads:
       
   293       logger.debug('Waiting for %s to exit' % str(thread))
       
   294       thread.join()
       
   295 
       
   296   def CheckErrors(self):
       
   297     """Output logs for any errors that occurred in the worker threads."""
       
   298     for thread in self.__threads:
       
   299       thread.CheckError()
       
   300 
       
   301 
       
   302 class ThreadGate(object):
       
   303   """Manage the number of active worker threads.
       
   304 
       
   305   The ThreadGate limits the number of threads that are simultaneously
       
   306   active in order to implement adaptive rate control.
       
   307 
       
   308   Initially the ThreadGate allows only one thread to be active.  For
       
   309   each successful work item, another thread is activated and for each
       
   310   failed item, the number of active threads is reduced by one.  When only
       
   311   one thread is active, failures will cause exponential backoff.
       
   312 
       
   313   For example, a ThreadGate instance, thread_gate can be used in a number
       
   314   of threads as so:
       
   315 
       
   316   # Block until this thread is enabled for work.
       
   317   thread_gate.StartWork()
       
   318   try:
       
   319     status = DoSomeWorkInvolvingLimitedSharedResources()
       
   320     suceeded = IsStatusGood(status)
       
   321     badly_failed = IsStatusVeryBad(status)
       
   322   finally:
       
   323     if suceeded:
       
   324       # Suceeded, add more simultaneously enabled threads to the task.
       
   325       thread_gate.FinishWork(instruction=ThreadGate.INCREASE)
       
   326     elif badly_failed:
       
   327       # Failed, or succeeded but with high resource load, reduce number of
       
   328       # workers.
       
   329       thread_gate.FinishWork(instruction=ThreadGate.DECREASE)
       
   330     else:
       
   331       # We succeeded, but don't want to add more workers to the task.
       
   332       thread_gate.FinishWork(instruction=ThreadGate.HOLD)
       
   333 
       
   334   the thread_gate will enable and disable/backoff threads in response to
       
   335   resource load conditions.
       
   336 
       
   337   StartWork can block indefinitely. FinishWork, while not
       
   338   lock-free, should never block absent a demonic scheduler.
       
   339   """
       
   340 
       
   341   INCREASE = 'increase'
       
   342   HOLD = 'hold'
       
   343   DECREASE = 'decrease'
       
   344 
       
   345   def __init__(self,
       
   346                num_threads,
       
   347                sleep=InterruptibleSleep):
       
   348     """Constructor for ThreadGate instances.
       
   349 
       
   350     Args:
       
   351       num_threads: The total number of threads using this gate.
       
   352       sleep: Used for dependency injection.
       
   353     """
       
   354     self.__enabled_count = 1
       
   355     self.__lock = threading.Lock()
       
   356     self.__thread_semaphore = threading.Semaphore(self.__enabled_count)
       
   357     self.__num_threads = num_threads
       
   358     self.__backoff_time = 0
       
   359     self.__sleep = sleep
       
   360 
       
   361   def num_threads(self):
       
   362     return self.__num_threads
       
   363 
       
   364   def EnableThread(self):
       
   365     """Enable one more worker thread."""
       
   366     self.__lock.acquire()
       
   367     try:
       
   368       self.__enabled_count += 1
       
   369     finally:
       
   370       self.__lock.release()
       
   371     self.__thread_semaphore.release()
       
   372 
       
   373   def EnableAllThreads(self):
       
   374     """Enable all worker threads."""
       
   375     for unused_idx in xrange(self.__num_threads - self.__enabled_count):
       
   376       self.EnableThread()
       
   377 
       
   378   def StartWork(self):
       
   379     """Starts a critical section in which the number of workers is limited.
       
   380 
       
   381     Starts a critical section which allows self.__enabled_count
       
   382     simultaneously operating threads. The critical section is ended by
       
   383     calling self.FinishWork().
       
   384     """
       
   385     self.__thread_semaphore.acquire()
       
   386     if self.__backoff_time > 0.0:
       
   387       if not threading.currentThread().exit_flag:
       
   388         logger.info('Backing off due to errors: %.1f seconds',
       
   389                     self.__backoff_time)
       
   390         self.__sleep(self.__backoff_time)
       
   391 
       
   392   def FinishWork(self, instruction=None):
       
   393     """Ends a critical section started with self.StartWork()."""
       
   394     if not instruction or instruction == ThreadGate.HOLD:
       
   395       self.__thread_semaphore.release()
       
   396 
       
   397     elif instruction == ThreadGate.INCREASE:
       
   398       if self.__backoff_time > 0.0:
       
   399         logger.info('Resetting backoff to 0.0')
       
   400         self.__backoff_time = 0.0
       
   401       do_enable = False
       
   402       self.__lock.acquire()
       
   403       try:
       
   404         if self.__num_threads > self.__enabled_count:
       
   405           do_enable = True
       
   406           self.__enabled_count += 1
       
   407       finally:
       
   408         self.__lock.release()
       
   409       if do_enable:
       
   410         logger.debug('Increasing active thread count to %d',
       
   411                      self.__enabled_count)
       
   412         self.__thread_semaphore.release()
       
   413       self.__thread_semaphore.release()
       
   414 
       
   415     elif instruction == ThreadGate.DECREASE:
       
   416       do_disable = False
       
   417       self.__lock.acquire()
       
   418       try:
       
   419         if self.__enabled_count > 1:
       
   420           do_disable = True
       
   421           self.__enabled_count -= 1
       
   422         else:
       
   423           if self.__backoff_time == 0.0:
       
   424             self.__backoff_time = INITIAL_BACKOFF
       
   425           else:
       
   426             self.__backoff_time *= BACKOFF_FACTOR
       
   427       finally:
       
   428         self.__lock.release()
       
   429         if do_disable:
       
   430           logger.debug('Decreasing the number of active threads to %d',
       
   431                        self.__enabled_count)
       
   432         else:
       
   433           self.__thread_semaphore.release()
       
   434 
       
   435 
       
   436 class WorkItem(object):
       
   437   """Holds a unit of work."""
       
   438 
       
   439   SUCCESS = 'success'
       
   440   RETRY = 'retry'
       
   441   FAILURE = 'failure'
       
   442 
       
   443   def __init__(self, name):
       
   444     self.__name = name
       
   445 
       
   446   def PerformWork(self, thread_pool):
       
   447     """Perform the work of this work item and report the results.
       
   448 
       
   449     Args:
       
   450       thread_pool: The AdaptiveThreadPool instance associated with this
       
   451         thread.
       
   452 
       
   453     Returns:
       
   454       A tuple (status, instruction) of the work status and an instruction
       
   455       for the ThreadGate.
       
   456     """
       
   457     raise NotImplementedError
       
   458 
       
   459   def __str__(self):
       
   460     return self.__name