|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """Provides thread-pool-like functionality for workers accessing App Engine. |
|
19 |
|
20 The pool adapts to slow or timing out requests by reducing the number of |
|
21 active workers, or increasing the number when requests latency reduces. |
|
22 """ |
|
23 |
|
24 |
|
25 |
|
26 import logging |
|
27 import Queue |
|
28 import sys |
|
29 import threading |
|
30 import time |
|
31 import traceback |
|
32 |
|
33 from google.appengine.tools.requeue import ReQueue |
|
34 |
|
35 logger = logging.getLogger('google.appengine.tools.adaptive_thread_pool') |
|
36 |
|
37 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT' |
|
38 |
|
39 INITIAL_BACKOFF = 1.0 |
|
40 |
|
41 BACKOFF_FACTOR = 2.0 |
|
42 |
|
43 |
|
44 class Error(Exception): |
|
45 """Base-class for exceptions in this module.""" |
|
46 |
|
47 |
|
48 class WorkItemError(Error): |
|
49 """Error while processing a WorkItem.""" |
|
50 |
|
51 |
|
52 class RetryException(Error): |
|
53 """A non-fatal exception that indicates that a work item should be retried.""" |
|
54 |
|
55 |
|
56 def InterruptibleSleep(sleep_time): |
|
57 """Puts thread to sleep, checking this threads exit_flag four times a second. |
|
58 |
|
59 Args: |
|
60 sleep_time: Time to sleep. |
|
61 """ |
|
62 slept = 0.0 |
|
63 epsilon = .0001 |
|
64 thread = threading.currentThread() |
|
65 while slept < sleep_time - epsilon: |
|
66 remaining = sleep_time - slept |
|
67 this_sleep_time = min(remaining, 0.25) |
|
68 time.sleep(this_sleep_time) |
|
69 slept += this_sleep_time |
|
70 if thread.exit_flag: |
|
71 return |
|
72 |
|
73 |
|
74 class WorkerThread(threading.Thread): |
|
75 """A WorkerThread to execute WorkItems. |
|
76 |
|
77 Attributes: |
|
78 exit_flag: A boolean indicating whether this thread should stop |
|
79 its work and exit. |
|
80 """ |
|
81 |
|
82 def __init__(self, thread_pool, thread_gate, name=None): |
|
83 """Initialize a WorkerThread instance. |
|
84 |
|
85 Args: |
|
86 thread_pool: An AdaptiveThreadPool instance. |
|
87 thread_gate: A ThreadGate instance. |
|
88 name: A name for this WorkerThread. |
|
89 """ |
|
90 threading.Thread.__init__(self) |
|
91 |
|
92 self.setDaemon(True) |
|
93 |
|
94 self.exit_flag = False |
|
95 self.__error = None |
|
96 self.__traceback = None |
|
97 self.__thread_pool = thread_pool |
|
98 self.__work_queue = thread_pool.requeue |
|
99 self.__thread_gate = thread_gate |
|
100 if not name: |
|
101 self.__name = 'Anonymous_' + self.__class__.__name__ |
|
102 else: |
|
103 self.__name = name |
|
104 |
|
105 def run(self): |
|
106 """Perform the work of the thread.""" |
|
107 logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__) |
|
108 |
|
109 try: |
|
110 self.WorkOnItems() |
|
111 except: |
|
112 self.SetError() |
|
113 |
|
114 logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__) |
|
115 |
|
116 def SetError(self): |
|
117 """Sets the error and traceback information for this thread. |
|
118 |
|
119 This must be called from an exception handler. |
|
120 """ |
|
121 if not self.__error: |
|
122 exc_info = sys.exc_info() |
|
123 self.__error = exc_info[1] |
|
124 self.__traceback = exc_info[2] |
|
125 logger.exception('[%s] %s:', self.getName(), self.__class__.__name__) |
|
126 |
|
127 def WorkOnItems(self): |
|
128 """Perform the work of a WorkerThread.""" |
|
129 while not self.exit_flag: |
|
130 item = None |
|
131 self.__thread_gate.StartWork() |
|
132 try: |
|
133 status, instruction = WorkItem.FAILURE, ThreadGate.DECREASE |
|
134 try: |
|
135 if self.exit_flag: |
|
136 instruction = ThreadGate.HOLD |
|
137 break |
|
138 |
|
139 try: |
|
140 item = self.__work_queue.get(block=True, timeout=1.0) |
|
141 except Queue.Empty: |
|
142 instruction = ThreadGate.HOLD |
|
143 continue |
|
144 if item == _THREAD_SHOULD_EXIT or self.exit_flag: |
|
145 status, instruction = WorkItem.SUCCESS, ThreadGate.HOLD |
|
146 break |
|
147 |
|
148 logger.debug('[%s] Got work item %s', self.getName(), item) |
|
149 |
|
150 status, instruction = item.PerformWork(self.__thread_pool) |
|
151 except RetryException: |
|
152 status, instruction = WorkItem.RETRY, ThreadGate.HOLD |
|
153 except: |
|
154 self.SetError() |
|
155 raise |
|
156 |
|
157 finally: |
|
158 try: |
|
159 if item: |
|
160 if status == WorkItem.SUCCESS: |
|
161 self.__work_queue.task_done() |
|
162 elif status == WorkItem.RETRY: |
|
163 try: |
|
164 self.__work_queue.reput(item, block=False) |
|
165 except Queue.Full: |
|
166 logger.error('[%s] Failed to reput work item.', self.getName()) |
|
167 raise Error('Failed to reput work item') |
|
168 else: |
|
169 if not self.__error: |
|
170 if item.error: |
|
171 self.__error = item.error |
|
172 self.__traceback = item.traceback |
|
173 else: |
|
174 self.__error = WorkItemError( |
|
175 'Fatal error while processing %s' % item) |
|
176 raise self.__error |
|
177 |
|
178 finally: |
|
179 self.__thread_gate.FinishWork(instruction=instruction) |
|
180 |
|
181 def CheckError(self): |
|
182 """If an error is present, then log it.""" |
|
183 if self.__error: |
|
184 logger.error('Error in %s: %s', self.getName(), self.__error) |
|
185 if self.__traceback: |
|
186 logger.debug('%s', ''.join(traceback.format_exception( |
|
187 self.__error.__class__, |
|
188 self.__error, |
|
189 self.__traceback))) |
|
190 |
|
191 def __str__(self): |
|
192 return self.__name |
|
193 |
|
194 |
|
195 class AdaptiveThreadPool(object): |
|
196 """A thread pool which processes WorkItems from a queue. |
|
197 |
|
198 Attributes: |
|
199 requeue: The requeue instance which holds work items for this |
|
200 thread pool. |
|
201 """ |
|
202 |
|
203 def __init__(self, |
|
204 num_threads, |
|
205 queue_size=None, |
|
206 base_thread_name=None, |
|
207 worker_thread_factory=WorkerThread, |
|
208 queue_factory=Queue.Queue): |
|
209 """Initialize an AdaptiveThreadPool. |
|
210 |
|
211 An adaptive thread pool executes WorkItems using a number of |
|
212 WorkerThreads. WorkItems represent items of work that may |
|
213 succeed, soft fail, or hard fail. In addition, a completed work |
|
214 item can signal this AdaptiveThreadPool to enable more or fewer |
|
215 threads. Initially one thread is active. Soft failures are |
|
216 reqeueud to be retried. Hard failures cause this |
|
217 AdaptiveThreadPool to shut down entirely. See the WorkItem class |
|
218 for more details. |
|
219 |
|
220 Args: |
|
221 num_threads: The number of threads to use. |
|
222 queue_size: The size of the work item queue to use. |
|
223 base_thread_name: A string from which worker thread names are derived. |
|
224 worker_thread_factory: A factory which procudes WorkerThreads. |
|
225 queue_factory: Used for dependency injection. |
|
226 """ |
|
227 if queue_size is None: |
|
228 queue_size = num_threads |
|
229 self.requeue = ReQueue(queue_size, queue_factory=queue_factory) |
|
230 self.__thread_gate = ThreadGate(num_threads) |
|
231 self.__num_threads = num_threads |
|
232 self.__threads = [] |
|
233 for i in xrange(num_threads): |
|
234 thread = worker_thread_factory(self, self.__thread_gate) |
|
235 if base_thread_name: |
|
236 base = base_thread_name |
|
237 else: |
|
238 base = thread.__class__.__name__ |
|
239 thread.name = '%s-%d' % (base, i) |
|
240 self.__threads.append(thread) |
|
241 thread.start() |
|
242 |
|
243 def num_threads(self): |
|
244 """Return the number of threads in this thread pool.""" |
|
245 return self.__num_threads |
|
246 |
|
247 def Threads(self): |
|
248 """Yields the registered threads.""" |
|
249 for thread in self.__threads: |
|
250 yield thread |
|
251 |
|
252 def SubmitItem(self, item, block=True, timeout=0.0): |
|
253 """Submit a WorkItem to the AdaptiveThreadPool. |
|
254 |
|
255 Args: |
|
256 item: A WorkItem instance. |
|
257 block: Whether to block on submitting if the submit queue is full. |
|
258 timeout: Time wait for room in the queue if block is True, 0.0 to |
|
259 block indefinitely. |
|
260 |
|
261 Raises: |
|
262 Queue.Full if the submit queue is full. |
|
263 """ |
|
264 self.requeue.put(item, block=block, timeout=timeout) |
|
265 |
|
266 def QueuedItemCount(self): |
|
267 """Returns the number of items currently in the queue.""" |
|
268 return self.requeue.qsize() |
|
269 |
|
270 def Shutdown(self): |
|
271 """Shutdown the thread pool. |
|
272 |
|
273 Tasks may remain unexecuted in the submit queue. |
|
274 """ |
|
275 while not self.requeue.empty(): |
|
276 try: |
|
277 unused_item = self.requeue.get_nowait() |
|
278 self.requeue.task_done() |
|
279 except Queue.Empty: |
|
280 pass |
|
281 for thread in self.__threads: |
|
282 thread.exit_flag = True |
|
283 self.requeue.put(_THREAD_SHOULD_EXIT) |
|
284 self.__thread_gate.EnableAllThreads() |
|
285 |
|
286 def Wait(self): |
|
287 """Wait until all work items have been completed.""" |
|
288 self.requeue.join() |
|
289 |
|
290 def JoinThreads(self): |
|
291 """Wait for all threads to exit.""" |
|
292 for thread in self.__threads: |
|
293 logger.debug('Waiting for %s to exit' % str(thread)) |
|
294 thread.join() |
|
295 |
|
296 def CheckErrors(self): |
|
297 """Output logs for any errors that occurred in the worker threads.""" |
|
298 for thread in self.__threads: |
|
299 thread.CheckError() |
|
300 |
|
301 |
|
302 class ThreadGate(object): |
|
303 """Manage the number of active worker threads. |
|
304 |
|
305 The ThreadGate limits the number of threads that are simultaneously |
|
306 active in order to implement adaptive rate control. |
|
307 |
|
308 Initially the ThreadGate allows only one thread to be active. For |
|
309 each successful work item, another thread is activated and for each |
|
310 failed item, the number of active threads is reduced by one. When only |
|
311 one thread is active, failures will cause exponential backoff. |
|
312 |
|
313 For example, a ThreadGate instance, thread_gate can be used in a number |
|
314 of threads as so: |
|
315 |
|
316 # Block until this thread is enabled for work. |
|
317 thread_gate.StartWork() |
|
318 try: |
|
319 status = DoSomeWorkInvolvingLimitedSharedResources() |
|
320 suceeded = IsStatusGood(status) |
|
321 badly_failed = IsStatusVeryBad(status) |
|
322 finally: |
|
323 if suceeded: |
|
324 # Suceeded, add more simultaneously enabled threads to the task. |
|
325 thread_gate.FinishWork(instruction=ThreadGate.INCREASE) |
|
326 elif badly_failed: |
|
327 # Failed, or succeeded but with high resource load, reduce number of |
|
328 # workers. |
|
329 thread_gate.FinishWork(instruction=ThreadGate.DECREASE) |
|
330 else: |
|
331 # We succeeded, but don't want to add more workers to the task. |
|
332 thread_gate.FinishWork(instruction=ThreadGate.HOLD) |
|
333 |
|
334 the thread_gate will enable and disable/backoff threads in response to |
|
335 resource load conditions. |
|
336 |
|
337 StartWork can block indefinitely. FinishWork, while not |
|
338 lock-free, should never block absent a demonic scheduler. |
|
339 """ |
|
340 |
|
341 INCREASE = 'increase' |
|
342 HOLD = 'hold' |
|
343 DECREASE = 'decrease' |
|
344 |
|
345 def __init__(self, |
|
346 num_threads, |
|
347 sleep=InterruptibleSleep): |
|
348 """Constructor for ThreadGate instances. |
|
349 |
|
350 Args: |
|
351 num_threads: The total number of threads using this gate. |
|
352 sleep: Used for dependency injection. |
|
353 """ |
|
354 self.__enabled_count = 1 |
|
355 self.__lock = threading.Lock() |
|
356 self.__thread_semaphore = threading.Semaphore(self.__enabled_count) |
|
357 self.__num_threads = num_threads |
|
358 self.__backoff_time = 0 |
|
359 self.__sleep = sleep |
|
360 |
|
361 def num_threads(self): |
|
362 return self.__num_threads |
|
363 |
|
364 def EnableThread(self): |
|
365 """Enable one more worker thread.""" |
|
366 self.__lock.acquire() |
|
367 try: |
|
368 self.__enabled_count += 1 |
|
369 finally: |
|
370 self.__lock.release() |
|
371 self.__thread_semaphore.release() |
|
372 |
|
373 def EnableAllThreads(self): |
|
374 """Enable all worker threads.""" |
|
375 for unused_idx in xrange(self.__num_threads - self.__enabled_count): |
|
376 self.EnableThread() |
|
377 |
|
378 def StartWork(self): |
|
379 """Starts a critical section in which the number of workers is limited. |
|
380 |
|
381 Starts a critical section which allows self.__enabled_count |
|
382 simultaneously operating threads. The critical section is ended by |
|
383 calling self.FinishWork(). |
|
384 """ |
|
385 self.__thread_semaphore.acquire() |
|
386 if self.__backoff_time > 0.0: |
|
387 if not threading.currentThread().exit_flag: |
|
388 logger.info('Backing off due to errors: %.1f seconds', |
|
389 self.__backoff_time) |
|
390 self.__sleep(self.__backoff_time) |
|
391 |
|
392 def FinishWork(self, instruction=None): |
|
393 """Ends a critical section started with self.StartWork().""" |
|
394 if not instruction or instruction == ThreadGate.HOLD: |
|
395 self.__thread_semaphore.release() |
|
396 |
|
397 elif instruction == ThreadGate.INCREASE: |
|
398 if self.__backoff_time > 0.0: |
|
399 logger.info('Resetting backoff to 0.0') |
|
400 self.__backoff_time = 0.0 |
|
401 do_enable = False |
|
402 self.__lock.acquire() |
|
403 try: |
|
404 if self.__num_threads > self.__enabled_count: |
|
405 do_enable = True |
|
406 self.__enabled_count += 1 |
|
407 finally: |
|
408 self.__lock.release() |
|
409 if do_enable: |
|
410 logger.debug('Increasing active thread count to %d', |
|
411 self.__enabled_count) |
|
412 self.__thread_semaphore.release() |
|
413 self.__thread_semaphore.release() |
|
414 |
|
415 elif instruction == ThreadGate.DECREASE: |
|
416 do_disable = False |
|
417 self.__lock.acquire() |
|
418 try: |
|
419 if self.__enabled_count > 1: |
|
420 do_disable = True |
|
421 self.__enabled_count -= 1 |
|
422 else: |
|
423 if self.__backoff_time == 0.0: |
|
424 self.__backoff_time = INITIAL_BACKOFF |
|
425 else: |
|
426 self.__backoff_time *= BACKOFF_FACTOR |
|
427 finally: |
|
428 self.__lock.release() |
|
429 if do_disable: |
|
430 logger.debug('Decreasing the number of active threads to %d', |
|
431 self.__enabled_count) |
|
432 else: |
|
433 self.__thread_semaphore.release() |
|
434 |
|
435 |
|
436 class WorkItem(object): |
|
437 """Holds a unit of work.""" |
|
438 |
|
439 SUCCESS = 'success' |
|
440 RETRY = 'retry' |
|
441 FAILURE = 'failure' |
|
442 |
|
443 def __init__(self, name): |
|
444 self.__name = name |
|
445 |
|
446 def PerformWork(self, thread_pool): |
|
447 """Perform the work of this work item and report the results. |
|
448 |
|
449 Args: |
|
450 thread_pool: The AdaptiveThreadPool instance associated with this |
|
451 thread. |
|
452 |
|
453 Returns: |
|
454 A tuple (status, instruction) of the work status and an instruction |
|
455 for the ThreadGate. |
|
456 """ |
|
457 raise NotImplementedError |
|
458 |
|
459 def __str__(self): |
|
460 return self.__name |