thirdparty/google_appengine/google/appengine/ext/deferred/deferred.py
changeset 2864 2e0b0af889be
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thirdparty/google_appengine/google/appengine/ext/deferred/deferred.py	Sun Sep 06 23:31:53 2009 +0200
@@ -0,0 +1,267 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A module that handles deferred execution of callables via the task queue.
+
+Tasks consist of a callable and arguments to pass to it. The callable and its
+arguments are serialized and put on the task queue, which deserializes and
+executes them. The following callables can be used as tasks:
+
+1) Functions defined in the top level of a module
+2) Classes defined in the top level of a module
+3) Instances of classes in (2) that implement __call__
+4) Instance methods of objects of classes in (2)
+5) Class methods of classes in (2)
+6) Built-in functions
+7) Built-in methods
+
+The following callables can NOT be used as tasks:
+1) Nested functions or closures
+2) Nested classes or objects of them
+3) Lambda functions
+4) Static methods
+
+The arguments to the callable, and the object (in the case of method or object
+calls) must all be pickleable.
+
+If you want your tasks to execute reliably, don't use mutable global variables;
+they are not serialized with the task and may not be the same when your task
+executes as they were when it was enqueued (in fact, they will almost certainly
+be different).
+
+If your app relies on manipulating the import path, make sure that the function
+you are deferring is defined in a module that can be found without import path
+manipulation. Alternately, you can include deferred.TaskHandler in your own
+webapp application instead of using the easy-install method detailed below.
+
+When you create a deferred task using deferred.defer, the task is serialized,
+and an attempt is made to add it directly to the task queue. If the task is too
+big (larger than about 10 kilobytes when serialized), a datastore entry will be
+created for the task, and a new task will be enqueued, which will fetch the
+original task from the datastore and execute it. This is much less efficient
+than the direct execution model, so it's a good idea to minimize the size of
+your tasks when possible.
+
+In order for tasks to be processed, you need to set up the handler. Add the
+following to your app.yaml handlers section:
+
+handlers:
+- url: /_ah/queue/deferred
+  script: $PYTHON_LIB/google/appengine/ext/deferred/__init__.py
+  login: admin
+
+By default, the deferred module uses the URL above, and the default queue.
+
+Example usage:
+
+  def do_something_later(key, amount):
+    entity = MyModel.get(key)
+    entity.total += amount
+    entity.put()
+
+  # Use default URL and queue name, no task name, execute ASAP.
+  deferred.defer(do_something_later, 20)
+
+  # Providing non-default task queue arguments
+  deferred.defer(do_something_later, 20, _queue="foo", countdown=60)
+"""
+
+
+
+
+
+import logging
+import pickle
+import types
+
+from google.appengine.api.labs import taskqueue
+from google.appengine.ext import db
+from google.appengine.ext import webapp
+from google.appengine.ext.webapp.util import run_wsgi_app
+
+
+_TASKQUEUE_HEADERS = {"Content-Type": "application/octet-stream"}
+_DEFAULT_URL = "/_ah/queue/deferred"
+_DEFAULT_QUEUE = "default"
+
+
+class Error(Exception):
+  """Base class for exceptions in this module."""
+
+
+class PermanentTaskFailure(Error):
+  """Indicates that a task failed, and will never succeed."""
+
+
+def run(data):
+  """Unpickles and executes a task.
+
+  Args:
+    data: A pickled tuple of (function, args, kwargs) to execute.
+  Returns:
+    The return value of the function invocation.
+  """
+  try:
+    func, args, kwds = pickle.loads(data)
+  except Exception, e:
+    raise PermanentTaskFailure(e)
+  else:
+    return func(*args, **kwds)
+
+
+class _DeferredTaskEntity(db.Model):
+  """Datastore representation of a deferred task.
+
+  This is used in cases when the deferred task is too big to be included as
+  payload with the task queue entry.
+  """
+  data = db.BlobProperty(required=True)
+
+
+def run_from_datastore(key):
+  """Retrieves a task from the datastore and executes it.
+
+  Args:
+    key: The datastore key of a _DeferredTaskEntity storing the task.
+  Returns:
+    The return value of the function invocation.
+  """
+  entity = _DeferredTaskEntity.get(key)
+  if not entity:
+    raise PermanentTaskFailure()
+  try:
+    ret = run(entity.data)
+    entity.delete()
+  except PermanentTaskFailure:
+    entity.delete()
+    raise
+
+
+def invoke_member(obj, membername, *args, **kwargs):
+  """Retrieves a member of an object, then calls it with the provided arguments.
+
+  Args:
+    obj: The object to operate on.
+    membername: The name of the member to retrieve from ojb.
+    args: Positional arguments to pass to the method.
+    kwargs: Keyword arguments to pass to the method.
+  Returns:
+    The return value of the method invocation.
+  """
+  return getattr(obj, membername)(*args, **kwargs)
+
+
+def _curry_callable(obj, *args, **kwargs):
+  """Takes a callable and arguments and returns a task queue tuple.
+
+  The returned tuple consists of (callable, args, kwargs), and can be pickled
+  and unpickled safely.
+
+  Args:
+    obj: The callable to curry. See the module docstring for restrictions.
+    args: Positional arguments to call the callable with.
+    kwargs: Keyword arguments to call the callable with.
+  Returns:
+    A tuple consisting of (callable, args, kwargs) that can be evaluated by
+    run() with equivalent effect of executing the function directly.
+  Raises:
+    ValueError: If the passed in object is not of a valid callable type.
+  """
+  if isinstance(obj, types.MethodType):
+    return (invoke_member, (obj.im_self, obj.im_func.__name__) + args, kwargs)
+  elif isinstance(obj, types.BuiltinMethodType):
+    if not obj.__self__:
+      return (obj, args, kwargs)
+    else:
+      return (invoke_member, (obj.__self__, obj.__name__) + args, kwargs)
+  elif isinstance(obj, types.ObjectType) and hasattr(obj, "__call__"):
+    return (obj, args, kwargs)
+  elif isinstance(obj, (types.FunctionType, types.BuiltinFunctionType,
+                        types.ClassType, types.UnboundMethodType)):
+    return (obj, args, kwargs)
+  else:
+    raise ValueError("obj must be callable")
+
+
+def serialize(obj, *args, **kwargs):
+  """Serializes a callable into a format recognized by the deferred executor.
+
+  Args:
+    obj: The callable to serialize. See module docstring for restrictions.
+    args: Positional arguments to call the callable with.
+    kwargs: Keyword arguments to call the callable with.
+  Returns:
+    A serialized representation of the callable.
+  """
+  curried = _curry_callable(obj, *args, **kwargs)
+  return pickle.dumps(curried, protocol=pickle.HIGHEST_PROTOCOL)
+
+
+def defer(obj, *args, **kwargs):
+  """Defers a callable for execution later.
+
+  The default deferred URL of /_ah/queue/deferred will be used unless an
+  alternate URL is explicitly specified. If you want to use the default URL for
+  a queue, specify _url=None. If you specify a different URL, you will need to
+  install the handler on that URL (see the module docstring for details).
+
+  Args:
+    obj: The callable to execute. See module docstring for restrictions.
+    _countdown, _eta, _name, _url, _queue: Passed through to the task queue -
+      see the task queue documentation for details.
+    args: Positional arguments to call the callable with.
+    kwargs: Any other keyword arguments are passed through to the callable.
+  """
+  taskargs = dict((x, kwargs.pop(("_%s" % x), None))
+                  for x in ("countdown", "eta", "name"))
+  taskargs["url"] = kwargs.pop("_url", _DEFAULT_URL)
+  taskargs["headers"] = _TASKQUEUE_HEADERS
+  queue = kwargs.pop("_queue", _DEFAULT_QUEUE)
+  pickled = serialize(obj, *args, **kwargs)
+  try:
+    task = taskqueue.Task(payload=pickled, **taskargs)
+    task.add(queue)
+  except taskqueue.TaskTooLargeError:
+    key = _DeferredTaskEntity(data=pickled).put()
+    pickled = serialize(run_from_datastore, str(key))
+    task = taskqueue.Task(payload=pickled, **taskargs)
+    task.add(queue)
+
+
+class TaskHandler(webapp.RequestHandler):
+  """A webapp handler class that processes deferred invocations."""
+
+  def post(self):
+    headers = ["%s:%s" % (k, v) for k, v in self.request.headers.items()
+               if k.lower().startswith("x-appengine-")]
+    logging.info(", ".join(headers))
+
+    try:
+      run(self.request.body)
+    except PermanentTaskFailure, e:
+      logging.exception("Permanent failure attempting to execute task")
+
+
+application = webapp.WSGIApplication([(".*", TaskHandler)])
+
+
+def main():
+  run_wsgi_app(application)
+
+
+if __name__ == "__main__":
+  main()