thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py
changeset 2864 2e0b0af889be
parent 2413 d0b7dac5325c
child 3031 7678f72140e6
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
    41 
    41 
    42 DEFAULT_RATE = '5.00/s'
    42 DEFAULT_RATE = '5.00/s'
    43 
    43 
    44 DEFAULT_BUCKET_SIZE = 5
    44 DEFAULT_BUCKET_SIZE = 5
    45 
    45 
       
    46 MAX_ETA_DELTA_DAYS = 30
       
    47 
    46 
    48 
    47 def _ParseQueueYaml(unused_self, root_path):
    49 def _ParseQueueYaml(unused_self, root_path):
    48   """Load the queue.yaml file and parse it."""
    50   """Loads the queue.yaml file and parses it.
       
    51 
       
    52   Args:
       
    53     unused_self: Allows this function to be bound to a class member. Not used.
       
    54     root_path: Directory containing queue.yaml. Not used.
       
    55 
       
    56   Returns:
       
    57     None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
       
    58     populaeted from the queue.yaml.
       
    59   """
    49   if root_path is None:
    60   if root_path is None:
    50     return None
    61     return None
    51   for queueyaml in ('queue.yaml', 'queue.yml'):
    62   for queueyaml in ('queue.yaml', 'queue.yml'):
    52     try:
    63     try:
    53       fh = open(os.path.join(root_path, queueyaml), 'r')
    64       fh = open(os.path.join(root_path, queueyaml), 'r')
    59     finally:
    70     finally:
    60       fh.close()
    71       fh.close()
    61   return None
    72   return None
    62 
    73 
    63 
    74 
    64 def _CompareEta(a, b):
    75 def _CompareTasksByEta(a, b):
    65   """Python sort comparator for task ETAs."""
    76   """Python sort comparator for tasks by estimated time of arrival (ETA).
       
    77 
       
    78   Args:
       
    79     a: A taskqueue_service_pb.TaskQueueAddRequest.
       
    80     b: A taskqueue_service_pb.TaskQueueAddRequest.
       
    81 
       
    82   Returns:
       
    83     Standard 1/0/-1 comparison result.
       
    84   """
    66   if a.eta_usec() > b.eta_usec():
    85   if a.eta_usec() > b.eta_usec():
    67     return 1
    86     return 1
    68   if a.eta_usec() < b.eta_usec():
    87   if a.eta_usec() < b.eta_usec():
    69     return -1
    88     return -1
    70   return 0
    89   return 0
   104       root_path: Root path to the directory of the application which may contain
   123       root_path: Root path to the directory of the application which may contain
   105         a queue.yaml file. If None, then it's assumed no queue.yaml file is
   124         a queue.yaml file. If None, then it's assumed no queue.yaml file is
   106         available.
   125         available.
   107     """
   126     """
   108     super(TaskQueueServiceStub, self).__init__(service_name)
   127     super(TaskQueueServiceStub, self).__init__(service_name)
   109     self.taskqueues = {}
   128     self._taskqueues = {}
   110     self.next_task_id = 1
   129     self._next_task_id = 1
   111     self.root_path = root_path
   130     self._root_path = root_path
   112 
   131 
   113   def _Dynamic_Add(self, request, unused_response):
   132   def _Dynamic_Add(self, request, response):
   114     if not self._ValidQueue(request.queue_name()):
   133     """Local implementation of the Add RPC in TaskQueueService.
       
   134 
       
   135     Must adhere to the '_Dynamic_' naming convention for stubbing to work.
       
   136     See taskqueue_service.proto for a full description of the RPC.
       
   137 
       
   138     Args:
       
   139       request: A taskqueue_service_pb.TaskQueueAddRequest.
       
   140       response: A taskqueue_service_pb.TaskQueueAddResponse.
       
   141     """
       
   142     if request.eta_usec() < 0:
       
   143       raise apiproxy_errors.ApplicationError(
       
   144           taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA)
       
   145 
       
   146     eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6)
       
   147     max_eta = (datetime.datetime.utcnow() +
       
   148                datetime.timedelta(days=MAX_ETA_DELTA_DAYS))
       
   149     if eta > max_eta:
       
   150       raise apiproxy_errors.ApplicationError(
       
   151           taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA)
       
   152 
       
   153     if not self._IsValidQueue(request.queue_name()):
   115       raise apiproxy_errors.ApplicationError(
   154       raise apiproxy_errors.ApplicationError(
   116           taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
   155           taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
   117       return
       
   118 
   156 
   119     if not request.task_name():
   157     if not request.task_name():
   120       request.set_task_name('task%d' % self.next_task_id)
   158       request.set_task_name('task%d' % self._next_task_id)
   121       self.next_task_id += 1
   159       response.set_chosen_task_name(request.task_name())
   122 
   160       self._next_task_id += 1
   123     tasks = self.taskqueues.setdefault(request.queue_name(), [])
   161 
       
   162     tasks = self._taskqueues.setdefault(request.queue_name(), [])
       
   163     for task in tasks:
       
   164       if task.task_name() == request.task_name():
       
   165         raise apiproxy_errors.ApplicationError(
       
   166             taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
   124     tasks.append(request)
   167     tasks.append(request)
   125     tasks.sort(_CompareEta)
   168     tasks.sort(_CompareTasksByEta)
   126     return
   169 
   127 
   170   def _IsValidQueue(self, queue_name):
   128   def _ValidQueue(self, queue_name):
   171     """Determines whether a queue is valid, i.e. tasks can be added to it.
       
   172 
       
   173     Valid queues are the 'default' queue, plus any queues in the queue.yaml
       
   174     file.
       
   175 
       
   176     Args:
       
   177       queue_name: the name of the queue to validate.
       
   178 
       
   179     Returns:
       
   180       True iff queue is valid.
       
   181     """
   129     if queue_name == 'default':
   182     if queue_name == 'default':
   130       return True
   183       return True
   131     queue_info = self.queue_yaml_parser(self.root_path)
   184     queue_info = self.queue_yaml_parser(self._root_path)
   132     if queue_info and queue_info.queue:
   185     if queue_info and queue_info.queue:
   133       for entry in queue_info.queue:
   186       for entry in queue_info.queue:
   134         if entry.name == queue_name:
   187         if entry.name == queue_name:
   135           return True
   188           return True
   136     return False
   189     return False
   138   def GetQueues(self):
   191   def GetQueues(self):
   139     """Gets all the applications's queues.
   192     """Gets all the applications's queues.
   140 
   193 
   141     Returns:
   194     Returns:
   142       A list of dictionaries, where each dictionary contains one queue's
   195       A list of dictionaries, where each dictionary contains one queue's
   143       attributes.
   196       attributes. E.g.:
       
   197         [{'name': 'some-queue',
       
   198           'max_rate': '1/s',
       
   199           'bucket_size': 5,
       
   200           'oldest_task': '2009/02/02 05:37:42',
       
   201           'eta_delta': '0:00:06.342511 ago',
       
   202           'tasks_in_queue': 12}, ...]
   144     """
   203     """
   145     queues = []
   204     queues = []
   146     queue_info = self.queue_yaml_parser(self.root_path)
   205     queue_info = self.queue_yaml_parser(self._root_path)
   147     has_default = False
   206     has_default = False
   148     if queue_info and queue_info.queue:
   207     if queue_info and queue_info.queue:
   149       for entry in queue_info.queue:
   208       for entry in queue_info.queue:
   150         if entry.name == 'default':
   209         if entry.name == 'default':
   151           has_default = True
   210           has_default = True
   156         if entry.bucket_size:
   215         if entry.bucket_size:
   157           queue['bucket_size'] = entry.bucket_size
   216           queue['bucket_size'] = entry.bucket_size
   158         else:
   217         else:
   159           queue['bucket_size'] = DEFAULT_BUCKET_SIZE
   218           queue['bucket_size'] = DEFAULT_BUCKET_SIZE
   160 
   219 
   161         tasks = self.taskqueues.setdefault(entry.name, [])
   220         tasks = self._taskqueues.setdefault(entry.name, [])
   162         if tasks:
   221         if tasks:
   163           queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
   222           queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
   164           queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
   223           queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
   165         else:
   224         else:
   166           queue['oldest_task'] = ''
   225           queue['oldest_task'] = ''
   171       queues.append(queue)
   230       queues.append(queue)
   172       queue['name'] = 'default'
   231       queue['name'] = 'default'
   173       queue['max_rate'] = DEFAULT_RATE
   232       queue['max_rate'] = DEFAULT_RATE
   174       queue['bucket_size'] = DEFAULT_BUCKET_SIZE
   233       queue['bucket_size'] = DEFAULT_BUCKET_SIZE
   175 
   234 
   176       tasks = self.taskqueues.get('default', [])
   235       tasks = self._taskqueues.get('default', [])
   177       if tasks:
   236       if tasks:
   178         queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
   237         queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
   179         queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
   238         queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
   180       else:
   239       else:
   181         queue['oldest_task'] = ''
   240         queue['oldest_task'] = ''
   188     Args:
   247     Args:
   189       queue_name: Queue's name to return tasks for.
   248       queue_name: Queue's name to return tasks for.
   190 
   249 
   191     Returns:
   250     Returns:
   192       A list of dictionaries, where each dictionary contains one task's
   251       A list of dictionaries, where each dictionary contains one task's
   193       attributes.
   252       attributes. E.g.
   194     """
   253         [{'name': 'task-123',
   195     tasks = self.taskqueues.get(queue_name, [])
   254           'url': '/update',
       
   255           'method': 'GET',
       
   256           'eta': '2009/02/02 05:37:42',
       
   257           'eta_delta': '0:00:06.342511 ago',
       
   258           'body': '',
       
   259           'headers': {'X-AppEngine-QueueName': 'update-queue',
       
   260                       'X-AppEngine-TaskName': 'task-123',
       
   261                       'X-AppEngine-TaskRetryCount': '0',
       
   262                       'X-AppEngine-Development-Payload': '1',
       
   263                       'Content-Length': 0,
       
   264                       'Content-Type': 'application/octet-streamn'}, ...]
       
   265 
       
   266     Raises:
       
   267       ValueError: A task request contains an unknown HTTP method type.
       
   268     """
       
   269     tasks = self._taskqueues.get(queue_name, [])
   196     result_tasks = []
   270     result_tasks = []
   197     for task_request in tasks:
   271     for task_request in tasks:
   198       task = {}
   272       task = {}
   199       result_tasks.append(task)
   273       result_tasks.append(task)
   200       task['name'] = task_request.task_name()
   274       task['name'] = task_request.task_name()
   201       task['url'] = task_request.url()
   275       task['url'] = task_request.url()
   202       method = task_request.method()
   276       method = task_request.method()
   203       if (method == taskqueue_service_pb.TaskQueueAddRequest.GET):
   277       if method == taskqueue_service_pb.TaskQueueAddRequest.GET:
   204         task['method'] = 'GET'
   278         task['method'] = 'GET'
   205       elif (method == taskqueue_service_pb.TaskQueueAddRequest.POST):
   279       elif method == taskqueue_service_pb.TaskQueueAddRequest.POST:
   206         task['method'] = 'POST'
   280         task['method'] = 'POST'
   207       elif (method == taskqueue_service_pb.TaskQueueAddRequest.HEAD):
   281       elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD:
   208         task['method'] = 'HEAD'
   282         task['method'] = 'HEAD'
   209       elif (method == taskqueue_service_pb.TaskQueueAddRequest.PUT):
   283       elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT:
   210         task['method'] = 'PUT'
   284         task['method'] = 'PUT'
   211       elif (method == taskqueue_service_pb.TaskQueueAddRequest.DELETE):
   285       elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE:
   212         task['method'] = 'DELETE'
   286         task['method'] = 'DELETE'
       
   287       else:
       
   288         raise ValueError('Unexpected method: %d' % method)
   213 
   289 
   214       task['eta'] = _FormatEta(task_request.eta_usec())
   290       task['eta'] = _FormatEta(task_request.eta_usec())
   215       task['eta_delta'] = _EtaDelta(task_request.eta_usec())
   291       task['eta_delta'] = _EtaDelta(task_request.eta_usec())
   216       task['body'] = base64.b64encode(task_request.body())
   292       task['body'] = base64.b64encode(task_request.body())
   217       headers = urlfetch._CaselessDict()
   293       headers = urlfetch._CaselessDict()
   234 
   310 
   235     Args:
   311     Args:
   236       queue_name: the name of the queue to delete the task from.
   312       queue_name: the name of the queue to delete the task from.
   237       task_name: the name of the task to delete.
   313       task_name: the name of the task to delete.
   238     """
   314     """
   239     tasks = self.taskqueues.get(queue_name, [])
   315     tasks = self._taskqueues.get(queue_name, [])
   240     for task in tasks:
   316     for task in tasks:
   241       if task.task_name() == task_name:
   317       if task.task_name() == task_name:
   242         tasks.remove(task)
   318         tasks.remove(task)
   243         return
   319         return
   244 
   320 
   246     """Removes all tasks from a queue.
   322     """Removes all tasks from a queue.
   247 
   323 
   248     Args:
   324     Args:
   249       queue_name: the name of the queue to remove tasks from.
   325       queue_name: the name of the queue to remove tasks from.
   250     """
   326     """
   251     self.taskqueues[queue_name] = []
   327     self._taskqueues[queue_name] = []