|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """A module that handles deferred execution of callables via the task queue. |
|
19 |
|
20 Tasks consist of a callable and arguments to pass to it. The callable and its |
|
21 arguments are serialized and put on the task queue, which deserializes and |
|
22 executes them. The following callables can be used as tasks: |
|
23 |
|
24 1) Functions defined in the top level of a module |
|
25 2) Classes defined in the top level of a module |
|
26 3) Instances of classes in (2) that implement __call__ |
|
27 4) Instance methods of objects of classes in (2) |
|
28 5) Class methods of classes in (2) |
|
29 6) Built-in functions |
|
30 7) Built-in methods |
|
31 |
|
32 The following callables can NOT be used as tasks: |
|
33 1) Nested functions or closures |
|
34 2) Nested classes or objects of them |
|
35 3) Lambda functions |
|
36 4) Static methods |
|
37 |
|
38 The arguments to the callable, and the object (in the case of method or object |
|
39 calls) must all be pickleable. |
|
40 |
|
41 If you want your tasks to execute reliably, don't use mutable global variables; |
|
42 they are not serialized with the task and may not be the same when your task |
|
43 executes as they were when it was enqueued (in fact, they will almost certainly |
|
44 be different). |
|
45 |
|
46 If your app relies on manipulating the import path, make sure that the function |
|
47 you are deferring is defined in a module that can be found without import path |
|
48 manipulation. Alternately, you can include deferred.TaskHandler in your own |
|
49 webapp application instead of using the easy-install method detailed below. |
|
50 |
|
51 When you create a deferred task using deferred.defer, the task is serialized, |
|
52 and an attempt is made to add it directly to the task queue. If the task is too |
|
53 big (larger than about 10 kilobytes when serialized), a datastore entry will be |
|
54 created for the task, and a new task will be enqueued, which will fetch the |
|
55 original task from the datastore and execute it. This is much less efficient |
|
56 than the direct execution model, so it's a good idea to minimize the size of |
|
57 your tasks when possible. |
|
58 |
|
59 In order for tasks to be processed, you need to set up the handler. Add the |
|
60 following to your app.yaml handlers section: |
|
61 |
|
62 handlers: |
|
63 - url: /_ah/queue/deferred |
|
64 script: $PYTHON_LIB/google/appengine/ext/deferred/__init__.py |
|
65 login: admin |
|
66 |
|
67 By default, the deferred module uses the URL above, and the default queue. |
|
68 |
|
69 Example usage: |
|
70 |
|
71 def do_something_later(key, amount): |
|
72 entity = MyModel.get(key) |
|
73 entity.total += amount |
|
74 entity.put() |
|
75 |
|
76 # Use default URL and queue name, no task name, execute ASAP. |
|
77 deferred.defer(do_something_later, 20) |
|
78 |
|
79 # Providing non-default task queue arguments |
|
80 deferred.defer(do_something_later, 20, _queue="foo", countdown=60) |
|
81 """ |
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 import logging |
|
88 import pickle |
|
89 import types |
|
90 |
|
91 from google.appengine.api.labs import taskqueue |
|
92 from google.appengine.ext import db |
|
93 from google.appengine.ext import webapp |
|
94 from google.appengine.ext.webapp.util import run_wsgi_app |
|
95 |
|
96 |
|
97 _TASKQUEUE_HEADERS = {"Content-Type": "application/octet-stream"} |
|
98 _DEFAULT_URL = "/_ah/queue/deferred" |
|
99 _DEFAULT_QUEUE = "default" |
|
100 |
|
101 |
|
102 class Error(Exception): |
|
103 """Base class for exceptions in this module.""" |
|
104 |
|
105 |
|
106 class PermanentTaskFailure(Error): |
|
107 """Indicates that a task failed, and will never succeed.""" |
|
108 |
|
109 |
|
110 def run(data): |
|
111 """Unpickles and executes a task. |
|
112 |
|
113 Args: |
|
114 data: A pickled tuple of (function, args, kwargs) to execute. |
|
115 Returns: |
|
116 The return value of the function invocation. |
|
117 """ |
|
118 try: |
|
119 func, args, kwds = pickle.loads(data) |
|
120 except Exception, e: |
|
121 raise PermanentTaskFailure(e) |
|
122 else: |
|
123 return func(*args, **kwds) |
|
124 |
|
125 |
|
126 class _DeferredTaskEntity(db.Model): |
|
127 """Datastore representation of a deferred task. |
|
128 |
|
129 This is used in cases when the deferred task is too big to be included as |
|
130 payload with the task queue entry. |
|
131 """ |
|
132 data = db.BlobProperty(required=True) |
|
133 |
|
134 |
|
135 def run_from_datastore(key): |
|
136 """Retrieves a task from the datastore and executes it. |
|
137 |
|
138 Args: |
|
139 key: The datastore key of a _DeferredTaskEntity storing the task. |
|
140 Returns: |
|
141 The return value of the function invocation. |
|
142 """ |
|
143 entity = _DeferredTaskEntity.get(key) |
|
144 if not entity: |
|
145 raise PermanentTaskFailure() |
|
146 try: |
|
147 ret = run(entity.data) |
|
148 entity.delete() |
|
149 except PermanentTaskFailure: |
|
150 entity.delete() |
|
151 raise |
|
152 |
|
153 |
|
154 def invoke_member(obj, membername, *args, **kwargs): |
|
155 """Retrieves a member of an object, then calls it with the provided arguments. |
|
156 |
|
157 Args: |
|
158 obj: The object to operate on. |
|
159 membername: The name of the member to retrieve from ojb. |
|
160 args: Positional arguments to pass to the method. |
|
161 kwargs: Keyword arguments to pass to the method. |
|
162 Returns: |
|
163 The return value of the method invocation. |
|
164 """ |
|
165 return getattr(obj, membername)(*args, **kwargs) |
|
166 |
|
167 |
|
168 def _curry_callable(obj, *args, **kwargs): |
|
169 """Takes a callable and arguments and returns a task queue tuple. |
|
170 |
|
171 The returned tuple consists of (callable, args, kwargs), and can be pickled |
|
172 and unpickled safely. |
|
173 |
|
174 Args: |
|
175 obj: The callable to curry. See the module docstring for restrictions. |
|
176 args: Positional arguments to call the callable with. |
|
177 kwargs: Keyword arguments to call the callable with. |
|
178 Returns: |
|
179 A tuple consisting of (callable, args, kwargs) that can be evaluated by |
|
180 run() with equivalent effect of executing the function directly. |
|
181 Raises: |
|
182 ValueError: If the passed in object is not of a valid callable type. |
|
183 """ |
|
184 if isinstance(obj, types.MethodType): |
|
185 return (invoke_member, (obj.im_self, obj.im_func.__name__) + args, kwargs) |
|
186 elif isinstance(obj, types.BuiltinMethodType): |
|
187 if not obj.__self__: |
|
188 return (obj, args, kwargs) |
|
189 else: |
|
190 return (invoke_member, (obj.__self__, obj.__name__) + args, kwargs) |
|
191 elif isinstance(obj, types.ObjectType) and hasattr(obj, "__call__"): |
|
192 return (obj, args, kwargs) |
|
193 elif isinstance(obj, (types.FunctionType, types.BuiltinFunctionType, |
|
194 types.ClassType, types.UnboundMethodType)): |
|
195 return (obj, args, kwargs) |
|
196 else: |
|
197 raise ValueError("obj must be callable") |
|
198 |
|
199 |
|
200 def serialize(obj, *args, **kwargs): |
|
201 """Serializes a callable into a format recognized by the deferred executor. |
|
202 |
|
203 Args: |
|
204 obj: The callable to serialize. See module docstring for restrictions. |
|
205 args: Positional arguments to call the callable with. |
|
206 kwargs: Keyword arguments to call the callable with. |
|
207 Returns: |
|
208 A serialized representation of the callable. |
|
209 """ |
|
210 curried = _curry_callable(obj, *args, **kwargs) |
|
211 return pickle.dumps(curried, protocol=pickle.HIGHEST_PROTOCOL) |
|
212 |
|
213 |
|
214 def defer(obj, *args, **kwargs): |
|
215 """Defers a callable for execution later. |
|
216 |
|
217 The default deferred URL of /_ah/queue/deferred will be used unless an |
|
218 alternate URL is explicitly specified. If you want to use the default URL for |
|
219 a queue, specify _url=None. If you specify a different URL, you will need to |
|
220 install the handler on that URL (see the module docstring for details). |
|
221 |
|
222 Args: |
|
223 obj: The callable to execute. See module docstring for restrictions. |
|
224 _countdown, _eta, _name, _url, _queue: Passed through to the task queue - |
|
225 see the task queue documentation for details. |
|
226 args: Positional arguments to call the callable with. |
|
227 kwargs: Any other keyword arguments are passed through to the callable. |
|
228 """ |
|
229 taskargs = dict((x, kwargs.pop(("_%s" % x), None)) |
|
230 for x in ("countdown", "eta", "name")) |
|
231 taskargs["url"] = kwargs.pop("_url", _DEFAULT_URL) |
|
232 taskargs["headers"] = _TASKQUEUE_HEADERS |
|
233 queue = kwargs.pop("_queue", _DEFAULT_QUEUE) |
|
234 pickled = serialize(obj, *args, **kwargs) |
|
235 try: |
|
236 task = taskqueue.Task(payload=pickled, **taskargs) |
|
237 task.add(queue) |
|
238 except taskqueue.TaskTooLargeError: |
|
239 key = _DeferredTaskEntity(data=pickled).put() |
|
240 pickled = serialize(run_from_datastore, str(key)) |
|
241 task = taskqueue.Task(payload=pickled, **taskargs) |
|
242 task.add(queue) |
|
243 |
|
244 |
|
245 class TaskHandler(webapp.RequestHandler): |
|
246 """A webapp handler class that processes deferred invocations.""" |
|
247 |
|
248 def post(self): |
|
249 headers = ["%s:%s" % (k, v) for k, v in self.request.headers.items() |
|
250 if k.lower().startswith("x-appengine-")] |
|
251 logging.info(", ".join(headers)) |
|
252 |
|
253 try: |
|
254 run(self.request.body) |
|
255 except PermanentTaskFailure, e: |
|
256 logging.exception("Permanent failure attempting to execute task") |
|
257 |
|
258 |
|
259 application = webapp.WSGIApplication([(".*", TaskHandler)]) |
|
260 |
|
261 |
|
262 def main(): |
|
263 run_wsgi_app(application) |
|
264 |
|
265 |
|
266 if __name__ == "__main__": |
|
267 main() |