thirdparty/google_appengine/google/appengine/api/labs/taskqueue/taskqueue_stub.py
changeset 3031 7678f72140e6
parent 2864 2e0b0af889be
equal deleted inserted replaced
3030:09cae668b536 3031:7678f72140e6
    28 
    28 
    29 
    29 
    30 import base64
    30 import base64
    31 import datetime
    31 import datetime
    32 import os
    32 import os
       
    33 import random
       
    34 import time
    33 
    35 
    34 import taskqueue_service_pb
    36 import taskqueue_service_pb
    35 
    37 
    36 from google.appengine.api import apiproxy_stub
    38 from google.appengine.api import apiproxy_stub
    37 from google.appengine.api import queueinfo
    39 from google.appengine.api import queueinfo
   127     super(TaskQueueServiceStub, self).__init__(service_name)
   129     super(TaskQueueServiceStub, self).__init__(service_name)
   128     self._taskqueues = {}
   130     self._taskqueues = {}
   129     self._next_task_id = 1
   131     self._next_task_id = 1
   130     self._root_path = root_path
   132     self._root_path = root_path
   131 
   133 
       
   134     self._app_queues = {}
       
   135 
   132   def _Dynamic_Add(self, request, response):
   136   def _Dynamic_Add(self, request, response):
   133     """Local implementation of the Add RPC in TaskQueueService.
   137     """Local implementation of the Add RPC in TaskQueueService.
   134 
   138 
   135     Must adhere to the '_Dynamic_' naming convention for stubbing to work.
   139     Must adhere to the '_Dynamic_' naming convention for stubbing to work.
   136     See taskqueue_service.proto for a full description of the RPC.
   140     See taskqueue_service.proto for a full description of the RPC.
   198           'max_rate': '1/s',
   202           'max_rate': '1/s',
   199           'bucket_size': 5,
   203           'bucket_size': 5,
   200           'oldest_task': '2009/02/02 05:37:42',
   204           'oldest_task': '2009/02/02 05:37:42',
   201           'eta_delta': '0:00:06.342511 ago',
   205           'eta_delta': '0:00:06.342511 ago',
   202           'tasks_in_queue': 12}, ...]
   206           'tasks_in_queue': 12}, ...]
       
   207       The list of queues always includes the default queue.
   203     """
   208     """
   204     queues = []
   209     queues = []
   205     queue_info = self.queue_yaml_parser(self._root_path)
   210     queue_info = self.queue_yaml_parser(self._root_path)
   206     has_default = False
   211     has_default = False
   207     if queue_info and queue_info.queue:
   212     if queue_info and queue_info.queue:
   323 
   328 
   324     Args:
   329     Args:
   325       queue_name: the name of the queue to remove tasks from.
   330       queue_name: the name of the queue to remove tasks from.
   326     """
   331     """
   327     self._taskqueues[queue_name] = []
   332     self._taskqueues[queue_name] = []
       
   333 
       
   334   def _Dynamic_UpdateQueue(self, request, unused_response):
       
   335     """Local implementation of the UpdateQueue RPC in TaskQueueService.
       
   336 
       
   337     Must adhere to the '_Dynamic_' naming convention for stubbing to work.
       
   338     See taskqueue_service.proto for a full description of the RPC.
       
   339 
       
   340     Args:
       
   341       request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
       
   342       unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
       
   343                        Not used.
       
   344     """
       
   345     queues = self._app_queues.setdefault(request.app_id(), {})
       
   346     defensive_copy = taskqueue_service_pb.TaskQueueUpdateQueueRequest()
       
   347     defensive_copy.CopyFrom(request)
       
   348     queues[request.queue_name()] = defensive_copy
       
   349 
       
   350   def _Dynamic_FetchQueues(self, request, response):
       
   351     """Local implementation of the FetchQueues RPC in TaskQueueService.
       
   352 
       
   353     Must adhere to the '_Dynamic_' naming convention for stubbing to work.
       
   354     See taskqueue_service.proto for a full description of the RPC.
       
   355 
       
   356     Args:
       
   357       request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
       
   358       response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
       
   359     """
       
   360     queues = self._app_queues.get(request.app_id(), {})
       
   361     for unused_key, queue in sorted(queues.items()[:request.max_rows()]):
       
   362       response_queue = response.add_queue()
       
   363       response_queue.set_queue_name(queue.queue_name())
       
   364       response_queue.set_bucket_refill_per_second(
       
   365           queue.bucket_refill_per_second())
       
   366       response_queue.set_bucket_capacity(queue.bucket_capacity())
       
   367       response_queue.set_user_specified_rate(queue.user_specified_rate())
       
   368 
       
   369   def _Dynamic_FetchQueueStats(self, request, response):
       
   370     """Local 'random' implementation of the TaskQueueService.FetchQueueStats.
       
   371 
       
   372     This implementation just populates the stats with random numbers.
       
   373     Must adhere to the '_Dynamic_' naming convention for stubbing to work.
       
   374     See taskqueue_service.proto for a full description of the RPC.
       
   375 
       
   376     Args:
       
   377       request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest.
       
   378       response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse.
       
   379     """
       
   380     for _ in request.queue_name_list():
       
   381       stats = response.add_queuestats()
       
   382       stats.set_num_tasks(random.randint(0, request.max_num_tasks()))
       
   383       if stats.num_tasks() == 0:
       
   384         stats.set_oldest_eta_usec(-1)
       
   385       else:
       
   386         now = datetime.datetime.utcnow()
       
   387         now_sec = time.mktime(now.timetuple())
       
   388         stats.set_oldest_eta_usec(now_sec * 1e6 + random.randint(-1e6, 1e6))
       
   389 
       
   390       if random.randint(0, 9) > 0:
       
   391         scanner_info = stats.mutable_scanner_info()
       
   392         scanner_info.set_executed_last_minute(random.randint(0, 10))
       
   393         scanner_info.set_executed_last_hour(scanner_info.executed_last_minute()
       
   394                                             + random.randint(0, 100))
       
   395         scanner_info.set_sampling_duration_seconds(random.random() * 10000.0)
       
   396     return