thirdparty/google_appengine/google/appengine/ext/deferred/deferred.py
changeset 2864 2e0b0af889be
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """A module that handles deferred execution of callables via the task queue.
       
    19 
       
    20 Tasks consist of a callable and arguments to pass to it. The callable and its
       
    21 arguments are serialized and put on the task queue, which deserializes and
       
    22 executes them. The following callables can be used as tasks:
       
    23 
       
    24 1) Functions defined in the top level of a module
       
    25 2) Classes defined in the top level of a module
       
    26 3) Instances of classes in (2) that implement __call__
       
    27 4) Instance methods of objects of classes in (2)
       
    28 5) Class methods of classes in (2)
       
    29 6) Built-in functions
       
    30 7) Built-in methods
       
    31 
       
    32 The following callables can NOT be used as tasks:
       
    33 1) Nested functions or closures
       
    34 2) Nested classes or objects of them
       
    35 3) Lambda functions
       
    36 4) Static methods
       
    37 
       
    38 The arguments to the callable, and the object (in the case of method or object
       
    39 calls) must all be pickleable.
       
    40 
       
    41 If you want your tasks to execute reliably, don't use mutable global variables;
       
    42 they are not serialized with the task and may not be the same when your task
       
    43 executes as they were when it was enqueued (in fact, they will almost certainly
       
    44 be different).
       
    45 
       
    46 If your app relies on manipulating the import path, make sure that the function
       
    47 you are deferring is defined in a module that can be found without import path
       
    48 manipulation. Alternately, you can include deferred.TaskHandler in your own
       
    49 webapp application instead of using the easy-install method detailed below.
       
    50 
       
    51 When you create a deferred task using deferred.defer, the task is serialized,
       
    52 and an attempt is made to add it directly to the task queue. If the task is too
       
    53 big (larger than about 10 kilobytes when serialized), a datastore entry will be
       
    54 created for the task, and a new task will be enqueued, which will fetch the
       
    55 original task from the datastore and execute it. This is much less efficient
       
    56 than the direct execution model, so it's a good idea to minimize the size of
       
    57 your tasks when possible.
       
    58 
       
    59 In order for tasks to be processed, you need to set up the handler. Add the
       
    60 following to your app.yaml handlers section:
       
    61 
       
    62 handlers:
       
    63 - url: /_ah/queue/deferred
       
    64   script: $PYTHON_LIB/google/appengine/ext/deferred/__init__.py
       
    65   login: admin
       
    66 
       
    67 By default, the deferred module uses the URL above, and the default queue.
       
    68 
       
    69 Example usage:
       
    70 
       
    71   def do_something_later(key, amount):
       
    72     entity = MyModel.get(key)
       
    73     entity.total += amount
       
    74     entity.put()
       
    75 
       
    76   # Use default URL and queue name, no task name, execute ASAP.
       
    77   deferred.defer(do_something_later, 20)
       
    78 
       
    79   # Providing non-default task queue arguments
       
    80   deferred.defer(do_something_later, 20, _queue="foo", countdown=60)
       
    81 """
       
    82 
       
    83 
       
    84 
       
    85 
       
    86 
       
    87 import logging
       
    88 import pickle
       
    89 import types
       
    90 
       
    91 from google.appengine.api.labs import taskqueue
       
    92 from google.appengine.ext import db
       
    93 from google.appengine.ext import webapp
       
    94 from google.appengine.ext.webapp.util import run_wsgi_app
       
    95 
       
    96 
       
    97 _TASKQUEUE_HEADERS = {"Content-Type": "application/octet-stream"}
       
    98 _DEFAULT_URL = "/_ah/queue/deferred"
       
    99 _DEFAULT_QUEUE = "default"
       
   100 
       
   101 
       
   102 class Error(Exception):
       
   103   """Base class for exceptions in this module."""
       
   104 
       
   105 
       
   106 class PermanentTaskFailure(Error):
       
   107   """Indicates that a task failed, and will never succeed."""
       
   108 
       
   109 
       
   110 def run(data):
       
   111   """Unpickles and executes a task.
       
   112 
       
   113   Args:
       
   114     data: A pickled tuple of (function, args, kwargs) to execute.
       
   115   Returns:
       
   116     The return value of the function invocation.
       
   117   """
       
   118   try:
       
   119     func, args, kwds = pickle.loads(data)
       
   120   except Exception, e:
       
   121     raise PermanentTaskFailure(e)
       
   122   else:
       
   123     return func(*args, **kwds)
       
   124 
       
   125 
       
   126 class _DeferredTaskEntity(db.Model):
       
   127   """Datastore representation of a deferred task.
       
   128 
       
   129   This is used in cases when the deferred task is too big to be included as
       
   130   payload with the task queue entry.
       
   131   """
       
   132   data = db.BlobProperty(required=True)
       
   133 
       
   134 
       
   135 def run_from_datastore(key):
       
   136   """Retrieves a task from the datastore and executes it.
       
   137 
       
   138   Args:
       
   139     key: The datastore key of a _DeferredTaskEntity storing the task.
       
   140   Returns:
       
   141     The return value of the function invocation.
       
   142   """
       
   143   entity = _DeferredTaskEntity.get(key)
       
   144   if not entity:
       
   145     raise PermanentTaskFailure()
       
   146   try:
       
   147     ret = run(entity.data)
       
   148     entity.delete()
       
   149   except PermanentTaskFailure:
       
   150     entity.delete()
       
   151     raise
       
   152 
       
   153 
       
   154 def invoke_member(obj, membername, *args, **kwargs):
       
   155   """Retrieves a member of an object, then calls it with the provided arguments.
       
   156 
       
   157   Args:
       
   158     obj: The object to operate on.
       
   159     membername: The name of the member to retrieve from ojb.
       
   160     args: Positional arguments to pass to the method.
       
   161     kwargs: Keyword arguments to pass to the method.
       
   162   Returns:
       
   163     The return value of the method invocation.
       
   164   """
       
   165   return getattr(obj, membername)(*args, **kwargs)
       
   166 
       
   167 
       
   168 def _curry_callable(obj, *args, **kwargs):
       
   169   """Takes a callable and arguments and returns a task queue tuple.
       
   170 
       
   171   The returned tuple consists of (callable, args, kwargs), and can be pickled
       
   172   and unpickled safely.
       
   173 
       
   174   Args:
       
   175     obj: The callable to curry. See the module docstring for restrictions.
       
   176     args: Positional arguments to call the callable with.
       
   177     kwargs: Keyword arguments to call the callable with.
       
   178   Returns:
       
   179     A tuple consisting of (callable, args, kwargs) that can be evaluated by
       
   180     run() with equivalent effect of executing the function directly.
       
   181   Raises:
       
   182     ValueError: If the passed in object is not of a valid callable type.
       
   183   """
       
   184   if isinstance(obj, types.MethodType):
       
   185     return (invoke_member, (obj.im_self, obj.im_func.__name__) + args, kwargs)
       
   186   elif isinstance(obj, types.BuiltinMethodType):
       
   187     if not obj.__self__:
       
   188       return (obj, args, kwargs)
       
   189     else:
       
   190       return (invoke_member, (obj.__self__, obj.__name__) + args, kwargs)
       
   191   elif isinstance(obj, types.ObjectType) and hasattr(obj, "__call__"):
       
   192     return (obj, args, kwargs)
       
   193   elif isinstance(obj, (types.FunctionType, types.BuiltinFunctionType,
       
   194                         types.ClassType, types.UnboundMethodType)):
       
   195     return (obj, args, kwargs)
       
   196   else:
       
   197     raise ValueError("obj must be callable")
       
   198 
       
   199 
       
   200 def serialize(obj, *args, **kwargs):
       
   201   """Serializes a callable into a format recognized by the deferred executor.
       
   202 
       
   203   Args:
       
   204     obj: The callable to serialize. See module docstring for restrictions.
       
   205     args: Positional arguments to call the callable with.
       
   206     kwargs: Keyword arguments to call the callable with.
       
   207   Returns:
       
   208     A serialized representation of the callable.
       
   209   """
       
   210   curried = _curry_callable(obj, *args, **kwargs)
       
   211   return pickle.dumps(curried, protocol=pickle.HIGHEST_PROTOCOL)
       
   212 
       
   213 
       
   214 def defer(obj, *args, **kwargs):
       
   215   """Defers a callable for execution later.
       
   216 
       
   217   The default deferred URL of /_ah/queue/deferred will be used unless an
       
   218   alternate URL is explicitly specified. If you want to use the default URL for
       
   219   a queue, specify _url=None. If you specify a different URL, you will need to
       
   220   install the handler on that URL (see the module docstring for details).
       
   221 
       
   222   Args:
       
   223     obj: The callable to execute. See module docstring for restrictions.
       
   224     _countdown, _eta, _name, _url, _queue: Passed through to the task queue -
       
   225       see the task queue documentation for details.
       
   226     args: Positional arguments to call the callable with.
       
   227     kwargs: Any other keyword arguments are passed through to the callable.
       
   228   """
       
   229   taskargs = dict((x, kwargs.pop(("_%s" % x), None))
       
   230                   for x in ("countdown", "eta", "name"))
       
   231   taskargs["url"] = kwargs.pop("_url", _DEFAULT_URL)
       
   232   taskargs["headers"] = _TASKQUEUE_HEADERS
       
   233   queue = kwargs.pop("_queue", _DEFAULT_QUEUE)
       
   234   pickled = serialize(obj, *args, **kwargs)
       
   235   try:
       
   236     task = taskqueue.Task(payload=pickled, **taskargs)
       
   237     task.add(queue)
       
   238   except taskqueue.TaskTooLargeError:
       
   239     key = _DeferredTaskEntity(data=pickled).put()
       
   240     pickled = serialize(run_from_datastore, str(key))
       
   241     task = taskqueue.Task(payload=pickled, **taskargs)
       
   242     task.add(queue)
       
   243 
       
   244 
       
   245 class TaskHandler(webapp.RequestHandler):
       
   246   """A webapp handler class that processes deferred invocations."""
       
   247 
       
   248   def post(self):
       
   249     headers = ["%s:%s" % (k, v) for k, v in self.request.headers.items()
       
   250                if k.lower().startswith("x-appengine-")]
       
   251     logging.info(", ".join(headers))
       
   252 
       
   253     try:
       
   254       run(self.request.body)
       
   255     except PermanentTaskFailure, e:
       
   256       logging.exception("Permanent failure attempting to execute task")
       
   257 
       
   258 
       
   259 application = webapp.WSGIApplication([(".*", TaskHandler)])
       
   260 
       
   261 
       
   262 def main():
       
   263   run_wsgi_app(application)
       
   264 
       
   265 
       
   266 if __name__ == "__main__":
       
   267   main()