thirdparty/google_appengine/google/appengine/tools/requeue.py
changeset 2864 2e0b0af889be
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """A thread-safe queue in which removed objects put back to the front."""
       
    19 
       
    20 
       
    21 import logging
       
    22 import Queue
       
    23 import threading
       
    24 import time
       
    25 
       
    26 logger = logging.getLogger('google.appengine.tools.requeue')
       
    27 
       
    28 
       
    29 class ReQueue(object):
       
    30   """A special thread-safe queue.
       
    31 
       
    32   A ReQueue allows unfinished work items to be returned with a call to
       
    33   reput().  When an item is reput, task_done() should *not* be called
       
    34   in addition, getting an item that has been reput does not increase
       
    35   the number of outstanding tasks.
       
    36 
       
    37   This class shares an interface with Queue.Queue and provides the
       
    38   additional reput method.
       
    39   """
       
    40 
       
    41   def __init__(self,
       
    42                queue_capacity,
       
    43                requeue_capacity=None,
       
    44                queue_factory=Queue.Queue,
       
    45                get_time=time.time):
       
    46     """Initialize a ReQueue instance.
       
    47 
       
    48     Args:
       
    49       queue_capacity: The number of items that can be put in the ReQueue.
       
    50       requeue_capacity: The numer of items that can be reput in the ReQueue.
       
    51       queue_factory: Used for dependency injection.
       
    52       get_time: Used for dependency injection.
       
    53     """
       
    54     if requeue_capacity is None:
       
    55       requeue_capacity = queue_capacity
       
    56 
       
    57     self.get_time = get_time
       
    58     self.queue = queue_factory(queue_capacity)
       
    59     self.requeue = queue_factory(requeue_capacity)
       
    60     self.lock = threading.Lock()
       
    61     self.put_cond = threading.Condition(self.lock)
       
    62     self.get_cond = threading.Condition(self.lock)
       
    63 
       
    64   def _DoWithTimeout(self,
       
    65                      action,
       
    66                      exc,
       
    67                      wait_cond,
       
    68                      done_cond,
       
    69                      lock,
       
    70                      timeout=None,
       
    71                      block=True):
       
    72     """Performs the given action with a timeout.
       
    73 
       
    74     The action must be non-blocking, and raise an instance of exc on a
       
    75     recoverable failure.  If the action fails with an instance of exc,
       
    76     we wait on wait_cond before trying again.  Failure after the
       
    77     timeout is reached is propagated as an exception.  Success is
       
    78     signalled by notifying on done_cond and returning the result of
       
    79     the action.  If action raises any exception besides an instance of
       
    80     exc, it is immediately propagated.
       
    81 
       
    82     Args:
       
    83       action: A callable that performs a non-blocking action.
       
    84       exc: An exception type that is thrown by the action to indicate
       
    85         a recoverable error.
       
    86       wait_cond: A condition variable which should be waited on when
       
    87         action throws exc.
       
    88       done_cond: A condition variable to signal if the action returns.
       
    89       lock: The lock used by wait_cond and done_cond.
       
    90       timeout: A non-negative float indicating the maximum time to wait.
       
    91       block: Whether to block if the action cannot complete immediately.
       
    92 
       
    93     Returns:
       
    94       The result of the action, if it is successful.
       
    95 
       
    96     Raises:
       
    97       ValueError: If the timeout argument is negative.
       
    98     """
       
    99     if timeout is not None and timeout < 0.0:
       
   100       raise ValueError('\'timeout\' must not be a negative  number')
       
   101     if not block:
       
   102       timeout = 0.0
       
   103     result = None
       
   104     success = False
       
   105     start_time = self.get_time()
       
   106     lock.acquire()
       
   107     try:
       
   108       while not success:
       
   109         try:
       
   110           result = action()
       
   111           success = True
       
   112         except Exception, e:
       
   113           if not isinstance(e, exc):
       
   114             raise e
       
   115           if timeout is not None:
       
   116             elapsed_time = self.get_time() - start_time
       
   117             timeout -= elapsed_time
       
   118             if timeout <= 0.0:
       
   119               raise e
       
   120           wait_cond.wait(timeout)
       
   121     finally:
       
   122       if success:
       
   123         done_cond.notify()
       
   124       lock.release()
       
   125     return result
       
   126 
       
   127   def put(self, item, block=True, timeout=None):
       
   128     """Put an item into the requeue.
       
   129 
       
   130     Args:
       
   131       item: An item to add to the requeue.
       
   132       block: Whether to block if the requeue is full.
       
   133       timeout: Maximum on how long to wait until the queue is non-full.
       
   134 
       
   135     Raises:
       
   136       Queue.Full if the queue is full and the timeout expires.
       
   137     """
       
   138     def PutAction():
       
   139       self.queue.put(item, block=False)
       
   140     self._DoWithTimeout(PutAction,
       
   141                         Queue.Full,
       
   142                         self.get_cond,
       
   143                         self.put_cond,
       
   144                         self.lock,
       
   145                         timeout=timeout,
       
   146                         block=block)
       
   147 
       
   148   def reput(self, item, block=True, timeout=None):
       
   149     """Re-put an item back into the requeue.
       
   150 
       
   151     Re-putting an item does not increase the number of outstanding
       
   152     tasks, so the reput item should be uniquely associated with an
       
   153     item that was previously removed from the requeue and for which
       
   154     TaskDone has not been called.
       
   155 
       
   156     Args:
       
   157       item: An item to add to the requeue.
       
   158       block: Whether to block if the requeue is full.
       
   159       timeout: Maximum on how long to wait until the queue is non-full.
       
   160 
       
   161     Raises:
       
   162       Queue.Full is the queue is full and the timeout expires.
       
   163     """
       
   164     def ReputAction():
       
   165       self.requeue.put(item, block=False)
       
   166     self._DoWithTimeout(ReputAction,
       
   167                         Queue.Full,
       
   168                         self.get_cond,
       
   169                         self.put_cond,
       
   170                         self.lock,
       
   171                         timeout=timeout,
       
   172                         block=block)
       
   173 
       
   174   def get(self, block=True, timeout=None):
       
   175     """Get an item from the requeue.
       
   176 
       
   177     Args:
       
   178       block: Whether to block if the requeue is empty.
       
   179       timeout: Maximum on how long to wait until the requeue is non-empty.
       
   180 
       
   181     Returns:
       
   182       An item from the requeue.
       
   183 
       
   184     Raises:
       
   185       Queue.Empty if the queue is empty and the timeout expires.
       
   186     """
       
   187     def GetAction():
       
   188       try:
       
   189         result = self.requeue.get(block=False)
       
   190         self.requeue.task_done()
       
   191       except Queue.Empty:
       
   192         result = self.queue.get(block=False)
       
   193       return result
       
   194     return self._DoWithTimeout(GetAction,
       
   195                                Queue.Empty,
       
   196                                self.put_cond,
       
   197                                self.get_cond,
       
   198                                self.lock,
       
   199                                timeout=timeout,
       
   200                                block=block)
       
   201 
       
   202   def join(self):
       
   203     """Blocks until all of the items in the requeue have been processed."""
       
   204     self.queue.join()
       
   205 
       
   206   def task_done(self):
       
   207     """Indicate that a previously enqueued item has been fully processed."""
       
   208     self.queue.task_done()
       
   209 
       
   210   def empty(self):
       
   211     """Returns true if the requeue is empty."""
       
   212     return self.queue.empty() and self.requeue.empty()
       
   213 
       
   214   def get_nowait(self):
       
   215     """Try to get an item from the queue without blocking."""
       
   216     return self.get(block=False)
       
   217 
       
   218   def qsize(self):
       
   219     return self.queue.qsize() + self.requeue.qsize()