41 |
41 |
42 DEFAULT_RATE = '5.00/s' |
42 DEFAULT_RATE = '5.00/s' |
43 |
43 |
44 DEFAULT_BUCKET_SIZE = 5 |
44 DEFAULT_BUCKET_SIZE = 5 |
45 |
45 |
|
46 MAX_ETA_DELTA_DAYS = 30 |
|
47 |
46 |
48 |
47 def _ParseQueueYaml(unused_self, root_path): |
49 def _ParseQueueYaml(unused_self, root_path): |
48 """Load the queue.yaml file and parse it.""" |
50 """Loads the queue.yaml file and parses it. |
|
51 |
|
52 Args: |
|
53 unused_self: Allows this function to be bound to a class member. Not used. |
|
54 root_path: Directory containing queue.yaml. Not used. |
|
55 |
|
56 Returns: |
|
57 None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object |
|
58 populaeted from the queue.yaml. |
|
59 """ |
49 if root_path is None: |
60 if root_path is None: |
50 return None |
61 return None |
51 for queueyaml in ('queue.yaml', 'queue.yml'): |
62 for queueyaml in ('queue.yaml', 'queue.yml'): |
52 try: |
63 try: |
53 fh = open(os.path.join(root_path, queueyaml), 'r') |
64 fh = open(os.path.join(root_path, queueyaml), 'r') |
104 root_path: Root path to the directory of the application which may contain |
123 root_path: Root path to the directory of the application which may contain |
105 a queue.yaml file. If None, then it's assumed no queue.yaml file is |
124 a queue.yaml file. If None, then it's assumed no queue.yaml file is |
106 available. |
125 available. |
107 """ |
126 """ |
108 super(TaskQueueServiceStub, self).__init__(service_name) |
127 super(TaskQueueServiceStub, self).__init__(service_name) |
109 self.taskqueues = {} |
128 self._taskqueues = {} |
110 self.next_task_id = 1 |
129 self._next_task_id = 1 |
111 self.root_path = root_path |
130 self._root_path = root_path |
112 |
131 |
113 def _Dynamic_Add(self, request, unused_response): |
132 def _Dynamic_Add(self, request, response): |
114 if not self._ValidQueue(request.queue_name()): |
133 """Local implementation of the Add RPC in TaskQueueService. |
|
134 |
|
135 Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
|
136 See taskqueue_service.proto for a full description of the RPC. |
|
137 |
|
138 Args: |
|
139 request: A taskqueue_service_pb.TaskQueueAddRequest. |
|
140 response: A taskqueue_service_pb.TaskQueueAddResponse. |
|
141 """ |
|
142 if request.eta_usec() < 0: |
|
143 raise apiproxy_errors.ApplicationError( |
|
144 taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA) |
|
145 |
|
146 eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6) |
|
147 max_eta = (datetime.datetime.utcnow() + |
|
148 datetime.timedelta(days=MAX_ETA_DELTA_DAYS)) |
|
149 if eta > max_eta: |
|
150 raise apiproxy_errors.ApplicationError( |
|
151 taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA) |
|
152 |
|
153 if not self._IsValidQueue(request.queue_name()): |
115 raise apiproxy_errors.ApplicationError( |
154 raise apiproxy_errors.ApplicationError( |
116 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
155 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
117 return |
|
118 |
156 |
119 if not request.task_name(): |
157 if not request.task_name(): |
120 request.set_task_name('task%d' % self.next_task_id) |
158 request.set_task_name('task%d' % self._next_task_id) |
121 self.next_task_id += 1 |
159 response.set_chosen_task_name(request.task_name()) |
122 |
160 self._next_task_id += 1 |
123 tasks = self.taskqueues.setdefault(request.queue_name(), []) |
161 |
|
162 tasks = self._taskqueues.setdefault(request.queue_name(), []) |
|
163 for task in tasks: |
|
164 if task.task_name() == request.task_name(): |
|
165 raise apiproxy_errors.ApplicationError( |
|
166 taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS) |
124 tasks.append(request) |
167 tasks.append(request) |
125 tasks.sort(_CompareEta) |
168 tasks.sort(_CompareTasksByEta) |
126 return |
169 |
127 |
170 def _IsValidQueue(self, queue_name): |
128 def _ValidQueue(self, queue_name): |
171 """Determines whether a queue is valid, i.e. tasks can be added to it. |
|
172 |
|
173 Valid queues are the 'default' queue, plus any queues in the queue.yaml |
|
174 file. |
|
175 |
|
176 Args: |
|
177 queue_name: the name of the queue to validate. |
|
178 |
|
179 Returns: |
|
180 True iff queue is valid. |
|
181 """ |
129 if queue_name == 'default': |
182 if queue_name == 'default': |
130 return True |
183 return True |
131 queue_info = self.queue_yaml_parser(self.root_path) |
184 queue_info = self.queue_yaml_parser(self._root_path) |
132 if queue_info and queue_info.queue: |
185 if queue_info and queue_info.queue: |
133 for entry in queue_info.queue: |
186 for entry in queue_info.queue: |
134 if entry.name == queue_name: |
187 if entry.name == queue_name: |
135 return True |
188 return True |
136 return False |
189 return False |
138 def GetQueues(self): |
191 def GetQueues(self): |
139 """Gets all the applications's queues. |
192 """Gets all the applications's queues. |
140 |
193 |
141 Returns: |
194 Returns: |
142 A list of dictionaries, where each dictionary contains one queue's |
195 A list of dictionaries, where each dictionary contains one queue's |
143 attributes. |
196 attributes. E.g.: |
|
197 [{'name': 'some-queue', |
|
198 'max_rate': '1/s', |
|
199 'bucket_size': 5, |
|
200 'oldest_task': '2009/02/02 05:37:42', |
|
201 'eta_delta': '0:00:06.342511 ago', |
|
202 'tasks_in_queue': 12}, ...] |
144 """ |
203 """ |
145 queues = [] |
204 queues = [] |
146 queue_info = self.queue_yaml_parser(self.root_path) |
205 queue_info = self.queue_yaml_parser(self._root_path) |
147 has_default = False |
206 has_default = False |
148 if queue_info and queue_info.queue: |
207 if queue_info and queue_info.queue: |
149 for entry in queue_info.queue: |
208 for entry in queue_info.queue: |
150 if entry.name == 'default': |
209 if entry.name == 'default': |
151 has_default = True |
210 has_default = True |
156 if entry.bucket_size: |
215 if entry.bucket_size: |
157 queue['bucket_size'] = entry.bucket_size |
216 queue['bucket_size'] = entry.bucket_size |
158 else: |
217 else: |
159 queue['bucket_size'] = DEFAULT_BUCKET_SIZE |
218 queue['bucket_size'] = DEFAULT_BUCKET_SIZE |
160 |
219 |
161 tasks = self.taskqueues.setdefault(entry.name, []) |
220 tasks = self._taskqueues.setdefault(entry.name, []) |
162 if tasks: |
221 if tasks: |
163 queue['oldest_task'] = _FormatEta(tasks[0].eta_usec()) |
222 queue['oldest_task'] = _FormatEta(tasks[0].eta_usec()) |
164 queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec()) |
223 queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec()) |
165 else: |
224 else: |
166 queue['oldest_task'] = '' |
225 queue['oldest_task'] = '' |
171 queues.append(queue) |
230 queues.append(queue) |
172 queue['name'] = 'default' |
231 queue['name'] = 'default' |
173 queue['max_rate'] = DEFAULT_RATE |
232 queue['max_rate'] = DEFAULT_RATE |
174 queue['bucket_size'] = DEFAULT_BUCKET_SIZE |
233 queue['bucket_size'] = DEFAULT_BUCKET_SIZE |
175 |
234 |
176 tasks = self.taskqueues.get('default', []) |
235 tasks = self._taskqueues.get('default', []) |
177 if tasks: |
236 if tasks: |
178 queue['oldest_task'] = _FormatEta(tasks[0].eta_usec()) |
237 queue['oldest_task'] = _FormatEta(tasks[0].eta_usec()) |
179 queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec()) |
238 queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec()) |
180 else: |
239 else: |
181 queue['oldest_task'] = '' |
240 queue['oldest_task'] = '' |
188 Args: |
247 Args: |
189 queue_name: Queue's name to return tasks for. |
248 queue_name: Queue's name to return tasks for. |
190 |
249 |
191 Returns: |
250 Returns: |
192 A list of dictionaries, where each dictionary contains one task's |
251 A list of dictionaries, where each dictionary contains one task's |
193 attributes. |
252 attributes. E.g. |
194 """ |
253 [{'name': 'task-123', |
195 tasks = self.taskqueues.get(queue_name, []) |
254 'url': '/update', |
|
255 'method': 'GET', |
|
256 'eta': '2009/02/02 05:37:42', |
|
257 'eta_delta': '0:00:06.342511 ago', |
|
258 'body': '', |
|
259 'headers': {'X-AppEngine-QueueName': 'update-queue', |
|
260 'X-AppEngine-TaskName': 'task-123', |
|
261 'X-AppEngine-TaskRetryCount': '0', |
|
262 'X-AppEngine-Development-Payload': '1', |
|
263 'Content-Length': 0, |
|
264 'Content-Type': 'application/octet-streamn'}, ...] |
|
265 |
|
266 Raises: |
|
267 ValueError: A task request contains an unknown HTTP method type. |
|
268 """ |
|
269 tasks = self._taskqueues.get(queue_name, []) |
196 result_tasks = [] |
270 result_tasks = [] |
197 for task_request in tasks: |
271 for task_request in tasks: |
198 task = {} |
272 task = {} |
199 result_tasks.append(task) |
273 result_tasks.append(task) |
200 task['name'] = task_request.task_name() |
274 task['name'] = task_request.task_name() |
201 task['url'] = task_request.url() |
275 task['url'] = task_request.url() |
202 method = task_request.method() |
276 method = task_request.method() |
203 if (method == taskqueue_service_pb.TaskQueueAddRequest.GET): |
277 if method == taskqueue_service_pb.TaskQueueAddRequest.GET: |
204 task['method'] = 'GET' |
278 task['method'] = 'GET' |
205 elif (method == taskqueue_service_pb.TaskQueueAddRequest.POST): |
279 elif method == taskqueue_service_pb.TaskQueueAddRequest.POST: |
206 task['method'] = 'POST' |
280 task['method'] = 'POST' |
207 elif (method == taskqueue_service_pb.TaskQueueAddRequest.HEAD): |
281 elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD: |
208 task['method'] = 'HEAD' |
282 task['method'] = 'HEAD' |
209 elif (method == taskqueue_service_pb.TaskQueueAddRequest.PUT): |
283 elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT: |
210 task['method'] = 'PUT' |
284 task['method'] = 'PUT' |
211 elif (method == taskqueue_service_pb.TaskQueueAddRequest.DELETE): |
285 elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE: |
212 task['method'] = 'DELETE' |
286 task['method'] = 'DELETE' |
|
287 else: |
|
288 raise ValueError('Unexpected method: %d' % method) |
213 |
289 |
214 task['eta'] = _FormatEta(task_request.eta_usec()) |
290 task['eta'] = _FormatEta(task_request.eta_usec()) |
215 task['eta_delta'] = _EtaDelta(task_request.eta_usec()) |
291 task['eta_delta'] = _EtaDelta(task_request.eta_usec()) |
216 task['body'] = base64.b64encode(task_request.body()) |
292 task['body'] = base64.b64encode(task_request.body()) |
217 headers = urlfetch._CaselessDict() |
293 headers = urlfetch._CaselessDict() |