thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py
changeset 2864 2e0b0af889be
parent 2413 d0b7dac5325c
child 3031 7678f72140e6
--- a/thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py	Sat Sep 05 14:04:24 2009 +0200
+++ b/thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py	Sun Sep 06 23:31:53 2009 +0200
@@ -43,9 +43,20 @@
 
 DEFAULT_BUCKET_SIZE = 5
 
+MAX_ETA_DELTA_DAYS = 30
+
 
 def _ParseQueueYaml(unused_self, root_path):
-  """Load the queue.yaml file and parse it."""
+  """Loads the queue.yaml file and parses it.
+
+  Args:
+    unused_self: Allows this function to be bound to a class member. Not used.
+    root_path: Directory containing queue.yaml. Not used.
+
+  Returns:
+    None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
+    populaeted from the queue.yaml.
+  """
   if root_path is None:
     return None
   for queueyaml in ('queue.yaml', 'queue.yml'):
@@ -61,8 +72,16 @@
   return None
 
 
-def _CompareEta(a, b):
-  """Python sort comparator for task ETAs."""
+def _CompareTasksByEta(a, b):
+  """Python sort comparator for tasks by estimated time of arrival (ETA).
+
+  Args:
+    a: A taskqueue_service_pb.TaskQueueAddRequest.
+    b: A taskqueue_service_pb.TaskQueueAddRequest.
+
+  Returns:
+    Standard 1/0/-1 comparison result.
+  """
   if a.eta_usec() > b.eta_usec():
     return 1
   if a.eta_usec() < b.eta_usec():
@@ -106,29 +125,63 @@
         available.
     """
     super(TaskQueueServiceStub, self).__init__(service_name)
-    self.taskqueues = {}
-    self.next_task_id = 1
-    self.root_path = root_path
+    self._taskqueues = {}
+    self._next_task_id = 1
+    self._root_path = root_path
+
+  def _Dynamic_Add(self, request, response):
+    """Local implementation of the Add RPC in TaskQueueService.
+
+    Must adhere to the '_Dynamic_' naming convention for stubbing to work.
+    See taskqueue_service.proto for a full description of the RPC.
 
-  def _Dynamic_Add(self, request, unused_response):
-    if not self._ValidQueue(request.queue_name()):
+    Args:
+      request: A taskqueue_service_pb.TaskQueueAddRequest.
+      response: A taskqueue_service_pb.TaskQueueAddResponse.
+    """
+    if request.eta_usec() < 0:
+      raise apiproxy_errors.ApplicationError(
+          taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA)
+
+    eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6)
+    max_eta = (datetime.datetime.utcnow() +
+               datetime.timedelta(days=MAX_ETA_DELTA_DAYS))
+    if eta > max_eta:
+      raise apiproxy_errors.ApplicationError(
+          taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA)
+
+    if not self._IsValidQueue(request.queue_name()):
       raise apiproxy_errors.ApplicationError(
           taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
-      return
 
     if not request.task_name():
-      request.set_task_name('task%d' % self.next_task_id)
-      self.next_task_id += 1
+      request.set_task_name('task%d' % self._next_task_id)
+      response.set_chosen_task_name(request.task_name())
+      self._next_task_id += 1
 
-    tasks = self.taskqueues.setdefault(request.queue_name(), [])
+    tasks = self._taskqueues.setdefault(request.queue_name(), [])
+    for task in tasks:
+      if task.task_name() == request.task_name():
+        raise apiproxy_errors.ApplicationError(
+            taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
     tasks.append(request)
-    tasks.sort(_CompareEta)
-    return
+    tasks.sort(_CompareTasksByEta)
+
+  def _IsValidQueue(self, queue_name):
+    """Determines whether a queue is valid, i.e. tasks can be added to it.
 
-  def _ValidQueue(self, queue_name):
+    Valid queues are the 'default' queue, plus any queues in the queue.yaml
+    file.
+
+    Args:
+      queue_name: the name of the queue to validate.
+
+    Returns:
+      True iff queue is valid.
+    """
     if queue_name == 'default':
       return True
-    queue_info = self.queue_yaml_parser(self.root_path)
+    queue_info = self.queue_yaml_parser(self._root_path)
     if queue_info and queue_info.queue:
       for entry in queue_info.queue:
         if entry.name == queue_name:
@@ -140,10 +193,16 @@
 
     Returns:
       A list of dictionaries, where each dictionary contains one queue's
-      attributes.
+      attributes. E.g.:
+        [{'name': 'some-queue',
+          'max_rate': '1/s',
+          'bucket_size': 5,
+          'oldest_task': '2009/02/02 05:37:42',
+          'eta_delta': '0:00:06.342511 ago',
+          'tasks_in_queue': 12}, ...]
     """
     queues = []
-    queue_info = self.queue_yaml_parser(self.root_path)
+    queue_info = self.queue_yaml_parser(self._root_path)
     has_default = False
     if queue_info and queue_info.queue:
       for entry in queue_info.queue:
@@ -158,7 +217,7 @@
         else:
           queue['bucket_size'] = DEFAULT_BUCKET_SIZE
 
-        tasks = self.taskqueues.setdefault(entry.name, [])
+        tasks = self._taskqueues.setdefault(entry.name, [])
         if tasks:
           queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
           queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
@@ -173,7 +232,7 @@
       queue['max_rate'] = DEFAULT_RATE
       queue['bucket_size'] = DEFAULT_BUCKET_SIZE
 
-      tasks = self.taskqueues.get('default', [])
+      tasks = self._taskqueues.get('default', [])
       if tasks:
         queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
         queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
@@ -190,9 +249,24 @@
 
     Returns:
       A list of dictionaries, where each dictionary contains one task's
-      attributes.
+      attributes. E.g.
+        [{'name': 'task-123',
+          'url': '/update',
+          'method': 'GET',
+          'eta': '2009/02/02 05:37:42',
+          'eta_delta': '0:00:06.342511 ago',
+          'body': '',
+          'headers': {'X-AppEngine-QueueName': 'update-queue',
+                      'X-AppEngine-TaskName': 'task-123',
+                      'X-AppEngine-TaskRetryCount': '0',
+                      'X-AppEngine-Development-Payload': '1',
+                      'Content-Length': 0,
+                      'Content-Type': 'application/octet-streamn'}, ...]
+
+    Raises:
+      ValueError: A task request contains an unknown HTTP method type.
     """
-    tasks = self.taskqueues.get(queue_name, [])
+    tasks = self._taskqueues.get(queue_name, [])
     result_tasks = []
     for task_request in tasks:
       task = {}
@@ -200,16 +274,18 @@
       task['name'] = task_request.task_name()
       task['url'] = task_request.url()
       method = task_request.method()
-      if (method == taskqueue_service_pb.TaskQueueAddRequest.GET):
+      if method == taskqueue_service_pb.TaskQueueAddRequest.GET:
         task['method'] = 'GET'
-      elif (method == taskqueue_service_pb.TaskQueueAddRequest.POST):
+      elif method == taskqueue_service_pb.TaskQueueAddRequest.POST:
         task['method'] = 'POST'
-      elif (method == taskqueue_service_pb.TaskQueueAddRequest.HEAD):
+      elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD:
         task['method'] = 'HEAD'
-      elif (method == taskqueue_service_pb.TaskQueueAddRequest.PUT):
+      elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT:
         task['method'] = 'PUT'
-      elif (method == taskqueue_service_pb.TaskQueueAddRequest.DELETE):
+      elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE:
         task['method'] = 'DELETE'
+      else:
+        raise ValueError('Unexpected method: %d' % method)
 
       task['eta'] = _FormatEta(task_request.eta_usec())
       task['eta_delta'] = _EtaDelta(task_request.eta_usec())
@@ -236,7 +312,7 @@
       queue_name: the name of the queue to delete the task from.
       task_name: the name of the task to delete.
     """
-    tasks = self.taskqueues.get(queue_name, [])
+    tasks = self._taskqueues.get(queue_name, [])
     for task in tasks:
       if task.task_name() == task_name:
         tasks.remove(task)
@@ -248,4 +324,4 @@
     Args:
       queue_name: the name of the queue to remove tasks from.
     """
-    self.taskqueues[queue_name] = []
+    self._taskqueues[queue_name] = []