thirdparty/google_appengine/google/appengine/tools/requeue.py
changeset 2864 2e0b0af889be
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/tools/requeue.py	Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A thread-safe queue in which removed objects put back to the front."""
+
+
+import logging
+import Queue
+import threading
+import time
+
+logger = logging.getLogger('google.appengine.tools.requeue')
+
+
+class ReQueue(object):
+  """A special thread-safe queue.
+
+  A ReQueue allows unfinished work items to be returned with a call to
+  reput().  When an item is reput, task_done() should *not* be called
+  in addition, getting an item that has been reput does not increase
+  the number of outstanding tasks.
+
+  This class shares an interface with Queue.Queue and provides the
+  additional reput method.
+  """
+
+  def __init__(self,
+               queue_capacity,
+               requeue_capacity=None,
+               queue_factory=Queue.Queue,
+               get_time=time.time):
+    """Initialize a ReQueue instance.
+
+    Args:
+      queue_capacity: The number of items that can be put in the ReQueue.
+      requeue_capacity: The numer of items that can be reput in the ReQueue.
+      queue_factory: Used for dependency injection.
+      get_time: Used for dependency injection.
+    """
+    if requeue_capacity is None:
+      requeue_capacity = queue_capacity
+
+    self.get_time = get_time
+    self.queue = queue_factory(queue_capacity)
+    self.requeue = queue_factory(requeue_capacity)
+    self.lock = threading.Lock()
+    self.put_cond = threading.Condition(self.lock)
+    self.get_cond = threading.Condition(self.lock)
+
+  def _DoWithTimeout(self,
+                     action,
+                     exc,
+                     wait_cond,
+                     done_cond,
+                     lock,
+                     timeout=None,
+                     block=True):
+    """Performs the given action with a timeout.
+
+    The action must be non-blocking, and raise an instance of exc on a
+    recoverable failure.  If the action fails with an instance of exc,
+    we wait on wait_cond before trying again.  Failure after the
+    timeout is reached is propagated as an exception.  Success is
+    signalled by notifying on done_cond and returning the result of
+    the action.  If action raises any exception besides an instance of
+    exc, it is immediately propagated.
+
+    Args:
+      action: A callable that performs a non-blocking action.
+      exc: An exception type that is thrown by the action to indicate
+        a recoverable error.
+      wait_cond: A condition variable which should be waited on when
+        action throws exc.
+      done_cond: A condition variable to signal if the action returns.
+      lock: The lock used by wait_cond and done_cond.
+      timeout: A non-negative float indicating the maximum time to wait.
+      block: Whether to block if the action cannot complete immediately.
+
+    Returns:
+      The result of the action, if it is successful.
+
+    Raises:
+      ValueError: If the timeout argument is negative.
+    """
+    if timeout is not None and timeout < 0.0:
+      raise ValueError('\'timeout\' must not be a negative  number')
+    if not block:
+      timeout = 0.0
+    result = None
+    success = False
+    start_time = self.get_time()
+    lock.acquire()
+    try:
+      while not success:
+        try:
+          result = action()
+          success = True
+        except Exception, e:
+          if not isinstance(e, exc):
+            raise e
+          if timeout is not None:
+            elapsed_time = self.get_time() - start_time
+            timeout -= elapsed_time
+            if timeout <= 0.0:
+              raise e
+          wait_cond.wait(timeout)
+    finally:
+      if success:
+        done_cond.notify()
+      lock.release()
+    return result
+
+  def put(self, item, block=True, timeout=None):
+    """Put an item into the requeue.
+
+    Args:
+      item: An item to add to the requeue.
+      block: Whether to block if the requeue is full.
+      timeout: Maximum on how long to wait until the queue is non-full.
+
+    Raises:
+      Queue.Full if the queue is full and the timeout expires.
+    """
+    def PutAction():
+      self.queue.put(item, block=False)
+    self._DoWithTimeout(PutAction,
+                        Queue.Full,
+                        self.get_cond,
+                        self.put_cond,
+                        self.lock,
+                        timeout=timeout,
+                        block=block)
+
+  def reput(self, item, block=True, timeout=None):
+    """Re-put an item back into the requeue.
+
+    Re-putting an item does not increase the number of outstanding
+    tasks, so the reput item should be uniquely associated with an
+    item that was previously removed from the requeue and for which
+    TaskDone has not been called.
+
+    Args:
+      item: An item to add to the requeue.
+      block: Whether to block if the requeue is full.
+      timeout: Maximum on how long to wait until the queue is non-full.
+
+    Raises:
+      Queue.Full is the queue is full and the timeout expires.
+    """
+    def ReputAction():
+      self.requeue.put(item, block=False)
+    self._DoWithTimeout(ReputAction,
+                        Queue.Full,
+                        self.get_cond,
+                        self.put_cond,
+                        self.lock,
+                        timeout=timeout,
+                        block=block)
+
+  def get(self, block=True, timeout=None):
+    """Get an item from the requeue.
+
+    Args:
+      block: Whether to block if the requeue is empty.
+      timeout: Maximum on how long to wait until the requeue is non-empty.
+
+    Returns:
+      An item from the requeue.
+
+    Raises:
+      Queue.Empty if the queue is empty and the timeout expires.
+    """
+    def GetAction():
+      try:
+        result = self.requeue.get(block=False)
+        self.requeue.task_done()
+      except Queue.Empty:
+        result = self.queue.get(block=False)
+      return result
+    return self._DoWithTimeout(GetAction,
+                               Queue.Empty,
+                               self.put_cond,
+                               self.get_cond,
+                               self.lock,
+                               timeout=timeout,
+                               block=block)
+
+  def join(self):
+    """Blocks until all of the items in the requeue have been processed."""
+    self.queue.join()
+
+  def task_done(self):
+    """Indicate that a previously enqueued item has been fully processed."""
+    self.queue.task_done()
+
+  def empty(self):
+    """Returns true if the requeue is empty."""
+    return self.queue.empty() and self.requeue.empty()
+
+  def get_nowait(self):
+    """Try to get an item from the queue without blocking."""
+    return self.get(block=False)
+
+  def qsize(self):
+    return self.queue.qsize() + self.requeue.qsize()