127 super(TaskQueueServiceStub, self).__init__(service_name) |
129 super(TaskQueueServiceStub, self).__init__(service_name) |
128 self._taskqueues = {} |
130 self._taskqueues = {} |
129 self._next_task_id = 1 |
131 self._next_task_id = 1 |
130 self._root_path = root_path |
132 self._root_path = root_path |
131 |
133 |
|
134 self._app_queues = {} |
|
135 |
132 def _Dynamic_Add(self, request, response): |
136 def _Dynamic_Add(self, request, response): |
133 """Local implementation of the Add RPC in TaskQueueService. |
137 """Local implementation of the Add RPC in TaskQueueService. |
134 |
138 |
135 Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
139 Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
136 See taskqueue_service.proto for a full description of the RPC. |
140 See taskqueue_service.proto for a full description of the RPC. |
198 'max_rate': '1/s', |
202 'max_rate': '1/s', |
199 'bucket_size': 5, |
203 'bucket_size': 5, |
200 'oldest_task': '2009/02/02 05:37:42', |
204 'oldest_task': '2009/02/02 05:37:42', |
201 'eta_delta': '0:00:06.342511 ago', |
205 'eta_delta': '0:00:06.342511 ago', |
202 'tasks_in_queue': 12}, ...] |
206 'tasks_in_queue': 12}, ...] |
|
207 The list of queues always includes the default queue. |
203 """ |
208 """ |
204 queues = [] |
209 queues = [] |
205 queue_info = self.queue_yaml_parser(self._root_path) |
210 queue_info = self.queue_yaml_parser(self._root_path) |
206 has_default = False |
211 has_default = False |
207 if queue_info and queue_info.queue: |
212 if queue_info and queue_info.queue: |
323 |
328 |
324 Args: |
329 Args: |
325 queue_name: the name of the queue to remove tasks from. |
330 queue_name: the name of the queue to remove tasks from. |
326 """ |
331 """ |
327 self._taskqueues[queue_name] = [] |
332 self._taskqueues[queue_name] = [] |
|
333 |
|
334 def _Dynamic_UpdateQueue(self, request, unused_response): |
|
335 """Local implementation of the UpdateQueue RPC in TaskQueueService. |
|
336 |
|
337 Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
|
338 See taskqueue_service.proto for a full description of the RPC. |
|
339 |
|
340 Args: |
|
341 request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest. |
|
342 unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse. |
|
343 Not used. |
|
344 """ |
|
345 queues = self._app_queues.setdefault(request.app_id(), {}) |
|
346 defensive_copy = taskqueue_service_pb.TaskQueueUpdateQueueRequest() |
|
347 defensive_copy.CopyFrom(request) |
|
348 queues[request.queue_name()] = defensive_copy |
|
349 |
|
350 def _Dynamic_FetchQueues(self, request, response): |
|
351 """Local implementation of the FetchQueues RPC in TaskQueueService. |
|
352 |
|
353 Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
|
354 See taskqueue_service.proto for a full description of the RPC. |
|
355 |
|
356 Args: |
|
357 request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest. |
|
358 response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse. |
|
359 """ |
|
360 queues = self._app_queues.get(request.app_id(), {}) |
|
361 for unused_key, queue in sorted(queues.items()[:request.max_rows()]): |
|
362 response_queue = response.add_queue() |
|
363 response_queue.set_queue_name(queue.queue_name()) |
|
364 response_queue.set_bucket_refill_per_second( |
|
365 queue.bucket_refill_per_second()) |
|
366 response_queue.set_bucket_capacity(queue.bucket_capacity()) |
|
367 response_queue.set_user_specified_rate(queue.user_specified_rate()) |
|
368 |
|
369 def _Dynamic_FetchQueueStats(self, request, response): |
|
370 """Local 'random' implementation of the TaskQueueService.FetchQueueStats. |
|
371 |
|
372 This implementation just populates the stats with random numbers. |
|
373 Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
|
374 See taskqueue_service.proto for a full description of the RPC. |
|
375 |
|
376 Args: |
|
377 request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest. |
|
378 response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse. |
|
379 """ |
|
380 for _ in request.queue_name_list(): |
|
381 stats = response.add_queuestats() |
|
382 stats.set_num_tasks(random.randint(0, request.max_num_tasks())) |
|
383 if stats.num_tasks() == 0: |
|
384 stats.set_oldest_eta_usec(-1) |
|
385 else: |
|
386 now = datetime.datetime.utcnow() |
|
387 now_sec = time.mktime(now.timetuple()) |
|
388 stats.set_oldest_eta_usec(now_sec * 1e6 + random.randint(-1e6, 1e6)) |
|
389 |
|
390 if random.randint(0, 9) > 0: |
|
391 scanner_info = stats.mutable_scanner_info() |
|
392 scanner_info.set_executed_last_minute(random.randint(0, 10)) |
|
393 scanner_info.set_executed_last_hour(scanner_info.executed_last_minute() |
|
394 + random.randint(0, 100)) |
|
395 scanner_info.set_sampling_duration_seconds(random.random() * 10000.0) |
|
396 return |