diff -r 27971a13089f -r 2e0b0af889be thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py --- a/thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py Sat Sep 05 14:04:24 2009 +0200 +++ b/thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py Sun Sep 06 23:31:53 2009 +0200 @@ -43,9 +43,20 @@ DEFAULT_BUCKET_SIZE = 5 +MAX_ETA_DELTA_DAYS = 30 + def _ParseQueueYaml(unused_self, root_path): - """Load the queue.yaml file and parse it.""" + """Loads the queue.yaml file and parses it. + + Args: + unused_self: Allows this function to be bound to a class member. Not used. + root_path: Directory containing queue.yaml. Not used. + + Returns: + None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object + populaeted from the queue.yaml. + """ if root_path is None: return None for queueyaml in ('queue.yaml', 'queue.yml'): @@ -61,8 +72,16 @@ return None -def _CompareEta(a, b): - """Python sort comparator for task ETAs.""" +def _CompareTasksByEta(a, b): + """Python sort comparator for tasks by estimated time of arrival (ETA). + + Args: + a: A taskqueue_service_pb.TaskQueueAddRequest. + b: A taskqueue_service_pb.TaskQueueAddRequest. + + Returns: + Standard 1/0/-1 comparison result. + """ if a.eta_usec() > b.eta_usec(): return 1 if a.eta_usec() < b.eta_usec(): @@ -106,29 +125,63 @@ available. """ super(TaskQueueServiceStub, self).__init__(service_name) - self.taskqueues = {} - self.next_task_id = 1 - self.root_path = root_path + self._taskqueues = {} + self._next_task_id = 1 + self._root_path = root_path + + def _Dynamic_Add(self, request, response): + """Local implementation of the Add RPC in TaskQueueService. + + Must adhere to the '_Dynamic_' naming convention for stubbing to work. + See taskqueue_service.proto for a full description of the RPC. - def _Dynamic_Add(self, request, unused_response): - if not self._ValidQueue(request.queue_name()): + Args: + request: A taskqueue_service_pb.TaskQueueAddRequest. + response: A taskqueue_service_pb.TaskQueueAddResponse. + """ + if request.eta_usec() < 0: + raise apiproxy_errors.ApplicationError( + taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA) + + eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6) + max_eta = (datetime.datetime.utcnow() + + datetime.timedelta(days=MAX_ETA_DELTA_DAYS)) + if eta > max_eta: + raise apiproxy_errors.ApplicationError( + taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA) + + if not self._IsValidQueue(request.queue_name()): raise apiproxy_errors.ApplicationError( taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) - return if not request.task_name(): - request.set_task_name('task%d' % self.next_task_id) - self.next_task_id += 1 + request.set_task_name('task%d' % self._next_task_id) + response.set_chosen_task_name(request.task_name()) + self._next_task_id += 1 - tasks = self.taskqueues.setdefault(request.queue_name(), []) + tasks = self._taskqueues.setdefault(request.queue_name(), []) + for task in tasks: + if task.task_name() == request.task_name(): + raise apiproxy_errors.ApplicationError( + taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS) tasks.append(request) - tasks.sort(_CompareEta) - return + tasks.sort(_CompareTasksByEta) + + def _IsValidQueue(self, queue_name): + """Determines whether a queue is valid, i.e. tasks can be added to it. - def _ValidQueue(self, queue_name): + Valid queues are the 'default' queue, plus any queues in the queue.yaml + file. + + Args: + queue_name: the name of the queue to validate. + + Returns: + True iff queue is valid. + """ if queue_name == 'default': return True - queue_info = self.queue_yaml_parser(self.root_path) + queue_info = self.queue_yaml_parser(self._root_path) if queue_info and queue_info.queue: for entry in queue_info.queue: if entry.name == queue_name: @@ -140,10 +193,16 @@ Returns: A list of dictionaries, where each dictionary contains one queue's - attributes. + attributes. E.g.: + [{'name': 'some-queue', + 'max_rate': '1/s', + 'bucket_size': 5, + 'oldest_task': '2009/02/02 05:37:42', + 'eta_delta': '0:00:06.342511 ago', + 'tasks_in_queue': 12}, ...] """ queues = [] - queue_info = self.queue_yaml_parser(self.root_path) + queue_info = self.queue_yaml_parser(self._root_path) has_default = False if queue_info and queue_info.queue: for entry in queue_info.queue: @@ -158,7 +217,7 @@ else: queue['bucket_size'] = DEFAULT_BUCKET_SIZE - tasks = self.taskqueues.setdefault(entry.name, []) + tasks = self._taskqueues.setdefault(entry.name, []) if tasks: queue['oldest_task'] = _FormatEta(tasks[0].eta_usec()) queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec()) @@ -173,7 +232,7 @@ queue['max_rate'] = DEFAULT_RATE queue['bucket_size'] = DEFAULT_BUCKET_SIZE - tasks = self.taskqueues.get('default', []) + tasks = self._taskqueues.get('default', []) if tasks: queue['oldest_task'] = _FormatEta(tasks[0].eta_usec()) queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec()) @@ -190,9 +249,24 @@ Returns: A list of dictionaries, where each dictionary contains one task's - attributes. + attributes. E.g. + [{'name': 'task-123', + 'url': '/update', + 'method': 'GET', + 'eta': '2009/02/02 05:37:42', + 'eta_delta': '0:00:06.342511 ago', + 'body': '', + 'headers': {'X-AppEngine-QueueName': 'update-queue', + 'X-AppEngine-TaskName': 'task-123', + 'X-AppEngine-TaskRetryCount': '0', + 'X-AppEngine-Development-Payload': '1', + 'Content-Length': 0, + 'Content-Type': 'application/octet-streamn'}, ...] + + Raises: + ValueError: A task request contains an unknown HTTP method type. """ - tasks = self.taskqueues.get(queue_name, []) + tasks = self._taskqueues.get(queue_name, []) result_tasks = [] for task_request in tasks: task = {} @@ -200,16 +274,18 @@ task['name'] = task_request.task_name() task['url'] = task_request.url() method = task_request.method() - if (method == taskqueue_service_pb.TaskQueueAddRequest.GET): + if method == taskqueue_service_pb.TaskQueueAddRequest.GET: task['method'] = 'GET' - elif (method == taskqueue_service_pb.TaskQueueAddRequest.POST): + elif method == taskqueue_service_pb.TaskQueueAddRequest.POST: task['method'] = 'POST' - elif (method == taskqueue_service_pb.TaskQueueAddRequest.HEAD): + elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD: task['method'] = 'HEAD' - elif (method == taskqueue_service_pb.TaskQueueAddRequest.PUT): + elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT: task['method'] = 'PUT' - elif (method == taskqueue_service_pb.TaskQueueAddRequest.DELETE): + elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE: task['method'] = 'DELETE' + else: + raise ValueError('Unexpected method: %d' % method) task['eta'] = _FormatEta(task_request.eta_usec()) task['eta_delta'] = _EtaDelta(task_request.eta_usec()) @@ -236,7 +312,7 @@ queue_name: the name of the queue to delete the task from. task_name: the name of the task to delete. """ - tasks = self.taskqueues.get(queue_name, []) + tasks = self._taskqueues.get(queue_name, []) for task in tasks: if task.task_name() == task_name: tasks.remove(task) @@ -248,4 +324,4 @@ Args: queue_name: the name of the queue to remove tasks from. """ - self.taskqueues[queue_name] = [] + self._taskqueues[queue_name] = []