--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/tools/requeue.py Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A thread-safe queue in which removed objects put back to the front."""
+
+
+import logging
+import Queue
+import threading
+import time
+
+logger = logging.getLogger('google.appengine.tools.requeue')
+
+
+class ReQueue(object):
+ """A special thread-safe queue.
+
+ A ReQueue allows unfinished work items to be returned with a call to
+ reput(). When an item is reput, task_done() should *not* be called
+ in addition, getting an item that has been reput does not increase
+ the number of outstanding tasks.
+
+ This class shares an interface with Queue.Queue and provides the
+ additional reput method.
+ """
+
+ def __init__(self,
+ queue_capacity,
+ requeue_capacity=None,
+ queue_factory=Queue.Queue,
+ get_time=time.time):
+ """Initialize a ReQueue instance.
+
+ Args:
+ queue_capacity: The number of items that can be put in the ReQueue.
+ requeue_capacity: The numer of items that can be reput in the ReQueue.
+ queue_factory: Used for dependency injection.
+ get_time: Used for dependency injection.
+ """
+ if requeue_capacity is None:
+ requeue_capacity = queue_capacity
+
+ self.get_time = get_time
+ self.queue = queue_factory(queue_capacity)
+ self.requeue = queue_factory(requeue_capacity)
+ self.lock = threading.Lock()
+ self.put_cond = threading.Condition(self.lock)
+ self.get_cond = threading.Condition(self.lock)
+
+ def _DoWithTimeout(self,
+ action,
+ exc,
+ wait_cond,
+ done_cond,
+ lock,
+ timeout=None,
+ block=True):
+ """Performs the given action with a timeout.
+
+ The action must be non-blocking, and raise an instance of exc on a
+ recoverable failure. If the action fails with an instance of exc,
+ we wait on wait_cond before trying again. Failure after the
+ timeout is reached is propagated as an exception. Success is
+ signalled by notifying on done_cond and returning the result of
+ the action. If action raises any exception besides an instance of
+ exc, it is immediately propagated.
+
+ Args:
+ action: A callable that performs a non-blocking action.
+ exc: An exception type that is thrown by the action to indicate
+ a recoverable error.
+ wait_cond: A condition variable which should be waited on when
+ action throws exc.
+ done_cond: A condition variable to signal if the action returns.
+ lock: The lock used by wait_cond and done_cond.
+ timeout: A non-negative float indicating the maximum time to wait.
+ block: Whether to block if the action cannot complete immediately.
+
+ Returns:
+ The result of the action, if it is successful.
+
+ Raises:
+ ValueError: If the timeout argument is negative.
+ """
+ if timeout is not None and timeout < 0.0:
+ raise ValueError('\'timeout\' must not be a negative number')
+ if not block:
+ timeout = 0.0
+ result = None
+ success = False
+ start_time = self.get_time()
+ lock.acquire()
+ try:
+ while not success:
+ try:
+ result = action()
+ success = True
+ except Exception, e:
+ if not isinstance(e, exc):
+ raise e
+ if timeout is not None:
+ elapsed_time = self.get_time() - start_time
+ timeout -= elapsed_time
+ if timeout <= 0.0:
+ raise e
+ wait_cond.wait(timeout)
+ finally:
+ if success:
+ done_cond.notify()
+ lock.release()
+ return result
+
+ def put(self, item, block=True, timeout=None):
+ """Put an item into the requeue.
+
+ Args:
+ item: An item to add to the requeue.
+ block: Whether to block if the requeue is full.
+ timeout: Maximum on how long to wait until the queue is non-full.
+
+ Raises:
+ Queue.Full if the queue is full and the timeout expires.
+ """
+ def PutAction():
+ self.queue.put(item, block=False)
+ self._DoWithTimeout(PutAction,
+ Queue.Full,
+ self.get_cond,
+ self.put_cond,
+ self.lock,
+ timeout=timeout,
+ block=block)
+
+ def reput(self, item, block=True, timeout=None):
+ """Re-put an item back into the requeue.
+
+ Re-putting an item does not increase the number of outstanding
+ tasks, so the reput item should be uniquely associated with an
+ item that was previously removed from the requeue and for which
+ TaskDone has not been called.
+
+ Args:
+ item: An item to add to the requeue.
+ block: Whether to block if the requeue is full.
+ timeout: Maximum on how long to wait until the queue is non-full.
+
+ Raises:
+ Queue.Full is the queue is full and the timeout expires.
+ """
+ def ReputAction():
+ self.requeue.put(item, block=False)
+ self._DoWithTimeout(ReputAction,
+ Queue.Full,
+ self.get_cond,
+ self.put_cond,
+ self.lock,
+ timeout=timeout,
+ block=block)
+
+ def get(self, block=True, timeout=None):
+ """Get an item from the requeue.
+
+ Args:
+ block: Whether to block if the requeue is empty.
+ timeout: Maximum on how long to wait until the requeue is non-empty.
+
+ Returns:
+ An item from the requeue.
+
+ Raises:
+ Queue.Empty if the queue is empty and the timeout expires.
+ """
+ def GetAction():
+ try:
+ result = self.requeue.get(block=False)
+ self.requeue.task_done()
+ except Queue.Empty:
+ result = self.queue.get(block=False)
+ return result
+ return self._DoWithTimeout(GetAction,
+ Queue.Empty,
+ self.put_cond,
+ self.get_cond,
+ self.lock,
+ timeout=timeout,
+ block=block)
+
+ def join(self):
+ """Blocks until all of the items in the requeue have been processed."""
+ self.queue.join()
+
+ def task_done(self):
+ """Indicate that a previously enqueued item has been fully processed."""
+ self.queue.task_done()
+
+ def empty(self):
+ """Returns true if the requeue is empty."""
+ return self.queue.empty() and self.requeue.empty()
+
+ def get_nowait(self):
+ """Try to get an item from the queue without blocking."""
+ return self.get(block=False)
+
+ def qsize(self):
+ return self.queue.qsize() + self.requeue.qsize()