--- a/thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py Sat Sep 05 14:04:24 2009 +0200
+++ b/thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py Sun Sep 06 23:31:53 2009 +0200
@@ -43,9 +43,20 @@
DEFAULT_BUCKET_SIZE = 5
+MAX_ETA_DELTA_DAYS = 30
+
def _ParseQueueYaml(unused_self, root_path):
- """Load the queue.yaml file and parse it."""
+ """Loads the queue.yaml file and parses it.
+
+ Args:
+ unused_self: Allows this function to be bound to a class member. Not used.
+ root_path: Directory containing queue.yaml. Not used.
+
+ Returns:
+ None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
+ populaeted from the queue.yaml.
+ """
if root_path is None:
return None
for queueyaml in ('queue.yaml', 'queue.yml'):
@@ -61,8 +72,16 @@
return None
-def _CompareEta(a, b):
- """Python sort comparator for task ETAs."""
+def _CompareTasksByEta(a, b):
+ """Python sort comparator for tasks by estimated time of arrival (ETA).
+
+ Args:
+ a: A taskqueue_service_pb.TaskQueueAddRequest.
+ b: A taskqueue_service_pb.TaskQueueAddRequest.
+
+ Returns:
+ Standard 1/0/-1 comparison result.
+ """
if a.eta_usec() > b.eta_usec():
return 1
if a.eta_usec() < b.eta_usec():
@@ -106,29 +125,63 @@
available.
"""
super(TaskQueueServiceStub, self).__init__(service_name)
- self.taskqueues = {}
- self.next_task_id = 1
- self.root_path = root_path
+ self._taskqueues = {}
+ self._next_task_id = 1
+ self._root_path = root_path
+
+ def _Dynamic_Add(self, request, response):
+ """Local implementation of the Add RPC in TaskQueueService.
+
+ Must adhere to the '_Dynamic_' naming convention for stubbing to work.
+ See taskqueue_service.proto for a full description of the RPC.
- def _Dynamic_Add(self, request, unused_response):
- if not self._ValidQueue(request.queue_name()):
+ Args:
+ request: A taskqueue_service_pb.TaskQueueAddRequest.
+ response: A taskqueue_service_pb.TaskQueueAddResponse.
+ """
+ if request.eta_usec() < 0:
+ raise apiproxy_errors.ApplicationError(
+ taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA)
+
+ eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6)
+ max_eta = (datetime.datetime.utcnow() +
+ datetime.timedelta(days=MAX_ETA_DELTA_DAYS))
+ if eta > max_eta:
+ raise apiproxy_errors.ApplicationError(
+ taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA)
+
+ if not self._IsValidQueue(request.queue_name()):
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
- return
if not request.task_name():
- request.set_task_name('task%d' % self.next_task_id)
- self.next_task_id += 1
+ request.set_task_name('task%d' % self._next_task_id)
+ response.set_chosen_task_name(request.task_name())
+ self._next_task_id += 1
- tasks = self.taskqueues.setdefault(request.queue_name(), [])
+ tasks = self._taskqueues.setdefault(request.queue_name(), [])
+ for task in tasks:
+ if task.task_name() == request.task_name():
+ raise apiproxy_errors.ApplicationError(
+ taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
tasks.append(request)
- tasks.sort(_CompareEta)
- return
+ tasks.sort(_CompareTasksByEta)
+
+ def _IsValidQueue(self, queue_name):
+ """Determines whether a queue is valid, i.e. tasks can be added to it.
- def _ValidQueue(self, queue_name):
+ Valid queues are the 'default' queue, plus any queues in the queue.yaml
+ file.
+
+ Args:
+ queue_name: the name of the queue to validate.
+
+ Returns:
+ True iff queue is valid.
+ """
if queue_name == 'default':
return True
- queue_info = self.queue_yaml_parser(self.root_path)
+ queue_info = self.queue_yaml_parser(self._root_path)
if queue_info and queue_info.queue:
for entry in queue_info.queue:
if entry.name == queue_name:
@@ -140,10 +193,16 @@
Returns:
A list of dictionaries, where each dictionary contains one queue's
- attributes.
+ attributes. E.g.:
+ [{'name': 'some-queue',
+ 'max_rate': '1/s',
+ 'bucket_size': 5,
+ 'oldest_task': '2009/02/02 05:37:42',
+ 'eta_delta': '0:00:06.342511 ago',
+ 'tasks_in_queue': 12}, ...]
"""
queues = []
- queue_info = self.queue_yaml_parser(self.root_path)
+ queue_info = self.queue_yaml_parser(self._root_path)
has_default = False
if queue_info and queue_info.queue:
for entry in queue_info.queue:
@@ -158,7 +217,7 @@
else:
queue['bucket_size'] = DEFAULT_BUCKET_SIZE
- tasks = self.taskqueues.setdefault(entry.name, [])
+ tasks = self._taskqueues.setdefault(entry.name, [])
if tasks:
queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
@@ -173,7 +232,7 @@
queue['max_rate'] = DEFAULT_RATE
queue['bucket_size'] = DEFAULT_BUCKET_SIZE
- tasks = self.taskqueues.get('default', [])
+ tasks = self._taskqueues.get('default', [])
if tasks:
queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
@@ -190,9 +249,24 @@
Returns:
A list of dictionaries, where each dictionary contains one task's
- attributes.
+ attributes. E.g.
+ [{'name': 'task-123',
+ 'url': '/update',
+ 'method': 'GET',
+ 'eta': '2009/02/02 05:37:42',
+ 'eta_delta': '0:00:06.342511 ago',
+ 'body': '',
+ 'headers': {'X-AppEngine-QueueName': 'update-queue',
+ 'X-AppEngine-TaskName': 'task-123',
+ 'X-AppEngine-TaskRetryCount': '0',
+ 'X-AppEngine-Development-Payload': '1',
+ 'Content-Length': 0,
+ 'Content-Type': 'application/octet-streamn'}, ...]
+
+ Raises:
+ ValueError: A task request contains an unknown HTTP method type.
"""
- tasks = self.taskqueues.get(queue_name, [])
+ tasks = self._taskqueues.get(queue_name, [])
result_tasks = []
for task_request in tasks:
task = {}
@@ -200,16 +274,18 @@
task['name'] = task_request.task_name()
task['url'] = task_request.url()
method = task_request.method()
- if (method == taskqueue_service_pb.TaskQueueAddRequest.GET):
+ if method == taskqueue_service_pb.TaskQueueAddRequest.GET:
task['method'] = 'GET'
- elif (method == taskqueue_service_pb.TaskQueueAddRequest.POST):
+ elif method == taskqueue_service_pb.TaskQueueAddRequest.POST:
task['method'] = 'POST'
- elif (method == taskqueue_service_pb.TaskQueueAddRequest.HEAD):
+ elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD:
task['method'] = 'HEAD'
- elif (method == taskqueue_service_pb.TaskQueueAddRequest.PUT):
+ elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT:
task['method'] = 'PUT'
- elif (method == taskqueue_service_pb.TaskQueueAddRequest.DELETE):
+ elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE:
task['method'] = 'DELETE'
+ else:
+ raise ValueError('Unexpected method: %d' % method)
task['eta'] = _FormatEta(task_request.eta_usec())
task['eta_delta'] = _EtaDelta(task_request.eta_usec())
@@ -236,7 +312,7 @@
queue_name: the name of the queue to delete the task from.
task_name: the name of the task to delete.
"""
- tasks = self.taskqueues.get(queue_name, [])
+ tasks = self._taskqueues.get(queue_name, [])
for task in tasks:
if task.task_name() == task_name:
tasks.remove(task)
@@ -248,4 +324,4 @@
Args:
queue_name: the name of the queue to remove tasks from.
"""
- self.taskqueues[queue_name] = []
+ self._taskqueues[queue_name] = []