|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """A thread-safe queue in which removed objects put back to the front.""" |
|
19 |
|
20 |
|
21 import logging |
|
22 import Queue |
|
23 import threading |
|
24 import time |
|
25 |
|
26 logger = logging.getLogger('google.appengine.tools.requeue') |
|
27 |
|
28 |
|
29 class ReQueue(object): |
|
30 """A special thread-safe queue. |
|
31 |
|
32 A ReQueue allows unfinished work items to be returned with a call to |
|
33 reput(). When an item is reput, task_done() should *not* be called |
|
34 in addition, getting an item that has been reput does not increase |
|
35 the number of outstanding tasks. |
|
36 |
|
37 This class shares an interface with Queue.Queue and provides the |
|
38 additional reput method. |
|
39 """ |
|
40 |
|
41 def __init__(self, |
|
42 queue_capacity, |
|
43 requeue_capacity=None, |
|
44 queue_factory=Queue.Queue, |
|
45 get_time=time.time): |
|
46 """Initialize a ReQueue instance. |
|
47 |
|
48 Args: |
|
49 queue_capacity: The number of items that can be put in the ReQueue. |
|
50 requeue_capacity: The numer of items that can be reput in the ReQueue. |
|
51 queue_factory: Used for dependency injection. |
|
52 get_time: Used for dependency injection. |
|
53 """ |
|
54 if requeue_capacity is None: |
|
55 requeue_capacity = queue_capacity |
|
56 |
|
57 self.get_time = get_time |
|
58 self.queue = queue_factory(queue_capacity) |
|
59 self.requeue = queue_factory(requeue_capacity) |
|
60 self.lock = threading.Lock() |
|
61 self.put_cond = threading.Condition(self.lock) |
|
62 self.get_cond = threading.Condition(self.lock) |
|
63 |
|
64 def _DoWithTimeout(self, |
|
65 action, |
|
66 exc, |
|
67 wait_cond, |
|
68 done_cond, |
|
69 lock, |
|
70 timeout=None, |
|
71 block=True): |
|
72 """Performs the given action with a timeout. |
|
73 |
|
74 The action must be non-blocking, and raise an instance of exc on a |
|
75 recoverable failure. If the action fails with an instance of exc, |
|
76 we wait on wait_cond before trying again. Failure after the |
|
77 timeout is reached is propagated as an exception. Success is |
|
78 signalled by notifying on done_cond and returning the result of |
|
79 the action. If action raises any exception besides an instance of |
|
80 exc, it is immediately propagated. |
|
81 |
|
82 Args: |
|
83 action: A callable that performs a non-blocking action. |
|
84 exc: An exception type that is thrown by the action to indicate |
|
85 a recoverable error. |
|
86 wait_cond: A condition variable which should be waited on when |
|
87 action throws exc. |
|
88 done_cond: A condition variable to signal if the action returns. |
|
89 lock: The lock used by wait_cond and done_cond. |
|
90 timeout: A non-negative float indicating the maximum time to wait. |
|
91 block: Whether to block if the action cannot complete immediately. |
|
92 |
|
93 Returns: |
|
94 The result of the action, if it is successful. |
|
95 |
|
96 Raises: |
|
97 ValueError: If the timeout argument is negative. |
|
98 """ |
|
99 if timeout is not None and timeout < 0.0: |
|
100 raise ValueError('\'timeout\' must not be a negative number') |
|
101 if not block: |
|
102 timeout = 0.0 |
|
103 result = None |
|
104 success = False |
|
105 start_time = self.get_time() |
|
106 lock.acquire() |
|
107 try: |
|
108 while not success: |
|
109 try: |
|
110 result = action() |
|
111 success = True |
|
112 except Exception, e: |
|
113 if not isinstance(e, exc): |
|
114 raise e |
|
115 if timeout is not None: |
|
116 elapsed_time = self.get_time() - start_time |
|
117 timeout -= elapsed_time |
|
118 if timeout <= 0.0: |
|
119 raise e |
|
120 wait_cond.wait(timeout) |
|
121 finally: |
|
122 if success: |
|
123 done_cond.notify() |
|
124 lock.release() |
|
125 return result |
|
126 |
|
127 def put(self, item, block=True, timeout=None): |
|
128 """Put an item into the requeue. |
|
129 |
|
130 Args: |
|
131 item: An item to add to the requeue. |
|
132 block: Whether to block if the requeue is full. |
|
133 timeout: Maximum on how long to wait until the queue is non-full. |
|
134 |
|
135 Raises: |
|
136 Queue.Full if the queue is full and the timeout expires. |
|
137 """ |
|
138 def PutAction(): |
|
139 self.queue.put(item, block=False) |
|
140 self._DoWithTimeout(PutAction, |
|
141 Queue.Full, |
|
142 self.get_cond, |
|
143 self.put_cond, |
|
144 self.lock, |
|
145 timeout=timeout, |
|
146 block=block) |
|
147 |
|
148 def reput(self, item, block=True, timeout=None): |
|
149 """Re-put an item back into the requeue. |
|
150 |
|
151 Re-putting an item does not increase the number of outstanding |
|
152 tasks, so the reput item should be uniquely associated with an |
|
153 item that was previously removed from the requeue and for which |
|
154 TaskDone has not been called. |
|
155 |
|
156 Args: |
|
157 item: An item to add to the requeue. |
|
158 block: Whether to block if the requeue is full. |
|
159 timeout: Maximum on how long to wait until the queue is non-full. |
|
160 |
|
161 Raises: |
|
162 Queue.Full is the queue is full and the timeout expires. |
|
163 """ |
|
164 def ReputAction(): |
|
165 self.requeue.put(item, block=False) |
|
166 self._DoWithTimeout(ReputAction, |
|
167 Queue.Full, |
|
168 self.get_cond, |
|
169 self.put_cond, |
|
170 self.lock, |
|
171 timeout=timeout, |
|
172 block=block) |
|
173 |
|
174 def get(self, block=True, timeout=None): |
|
175 """Get an item from the requeue. |
|
176 |
|
177 Args: |
|
178 block: Whether to block if the requeue is empty. |
|
179 timeout: Maximum on how long to wait until the requeue is non-empty. |
|
180 |
|
181 Returns: |
|
182 An item from the requeue. |
|
183 |
|
184 Raises: |
|
185 Queue.Empty if the queue is empty and the timeout expires. |
|
186 """ |
|
187 def GetAction(): |
|
188 try: |
|
189 result = self.requeue.get(block=False) |
|
190 self.requeue.task_done() |
|
191 except Queue.Empty: |
|
192 result = self.queue.get(block=False) |
|
193 return result |
|
194 return self._DoWithTimeout(GetAction, |
|
195 Queue.Empty, |
|
196 self.put_cond, |
|
197 self.get_cond, |
|
198 self.lock, |
|
199 timeout=timeout, |
|
200 block=block) |
|
201 |
|
202 def join(self): |
|
203 """Blocks until all of the items in the requeue have been processed.""" |
|
204 self.queue.join() |
|
205 |
|
206 def task_done(self): |
|
207 """Indicate that a previously enqueued item has been fully processed.""" |
|
208 self.queue.task_done() |
|
209 |
|
210 def empty(self): |
|
211 """Returns true if the requeue is empty.""" |
|
212 return self.queue.empty() and self.requeue.empty() |
|
213 |
|
214 def get_nowait(self): |
|
215 """Try to get an item from the queue without blocking.""" |
|
216 return self.get(block=False) |
|
217 |
|
218 def qsize(self): |
|
219 return self.queue.qsize() + self.requeue.qsize() |