app/soc/cron/job.py
changeset 2211 f7497180d037
child 2213 c0f52da7a808
equal deleted inserted replaced
2210:1095b52ed667 2211:f7497180d037
       
     1 #!/usr/bin/python2.5
       
     2 #
       
     3 # Copyright 2009 the Melange authors.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #   http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 
       
    17 """Cron jobs.
       
    18 """
       
    19 
       
    20 __authors__ = [
       
    21     '"Sverre Rabbelier" <sverre@rabbelier.nl>',
       
    22   ]
       
    23 
       
    24 
       
    25 import logging
       
    26 
       
    27 from google.appengine.ext import db
       
    28 from google.appengine.runtime import DeadlineExceededError
       
    29 
       
    30 from soc.models.job import Job
       
    31 
       
    32 
       
    33 class Handler(object):
       
    34   """A handler that dispatches a cron job.
       
    35 
       
    36   The tasks that are mapped into tasks will be called when a worker
       
    37   has claimed the job. However, there is no guarantee as to how long
       
    38   the task will be allowed to run. If an Exception is raised the task
       
    39   is automatically rescheduled for execution.
       
    40   """
       
    41 
       
    42   def __init__(self):
       
    43     """Constructs a new Handler with all known jobs set.
       
    44     """
       
    45 
       
    46     self.tasks = {}
       
    47     self.tasks['sendAcceptanceEmail'] = logging.info
       
    48 
       
    49   def claimJob(self, job_key):
       
    50     """A transaction to claim a job.
       
    51     """
       
    52 
       
    53     job = Job.get_by_id(job_key)
       
    54 
       
    55     if job.status != 'waiting':
       
    56       raise db.Rollback()
       
    57 
       
    58     job.status = 'started'
       
    59 
       
    60     if job.put():
       
    61       return job
       
    62     else:
       
    63       return None
       
    64 
       
    65   def freeJob(self, job_key):
       
    66     """A transaction to free a job.
       
    67     """
       
    68 
       
    69     job = Job.get_by_id(job_key)
       
    70 
       
    71     job.status = 'waiting'
       
    72 
       
    73     return job.put()
       
    74 
       
    75   def failJob(self, job_key):
       
    76     """A transaction to fail a job.
       
    77     """
       
    78 
       
    79     job = Job.get_by_id(job_key)
       
    80 
       
    81     job.errors += 1
       
    82 
       
    83     if job.errors > 5:
       
    84       job.status = 'aborted'
       
    85     else:
       
    86       job.status = 'waiting'
       
    87 
       
    88     job_id = job.key().id()
       
    89     logging.warning("job %d now failed %d time(s)" % (job_id, job.errors))
       
    90 
       
    91     return job.put()
       
    92 
       
    93   def finishJob(self, job_key):
       
    94     """A transaction to finish a job.
       
    95     """
       
    96 
       
    97     job = Job.get_by_id(job_key)
       
    98     job.status = 'finished'
       
    99 
       
   100     return job.put()
       
   101 
       
   102   def abortJob(self, job_key):
       
   103     """A transaction to abort a job.
       
   104     """
       
   105 
       
   106     job = Job.get_by_id(job_key)
       
   107     job.status = 'aborted'
       
   108 
       
   109     return job.put()
       
   110 
       
   111   def handle(self, job_key):
       
   112     """Handle one job.
       
   113     """
       
   114 
       
   115     try:
       
   116       job = db.run_in_transaction(self.claimJob, job_key)
       
   117 
       
   118       if not job:
       
   119         # someone already claimed the job
       
   120         return True
       
   121 
       
   122       if job.task_name not in self.tasks:
       
   123         logging.error("Unknown job %s" % job.task_name)
       
   124         db.run_in_transaction(self.abortJob, job_key)
       
   125         return True
       
   126 
       
   127       task = self.tasks[job.task_name]
       
   128 
       
   129       # excecute the actual job
       
   130       task(job)
       
   131 
       
   132       db.run_in_transaction(self.finishJob, job_key)
       
   133       return True
       
   134     except DeadlineExceededError, exception:
       
   135       db.run_in_transaction(self.freeJob, job_key)
       
   136       return False
       
   137     except Exception, exception:
       
   138       logging.exception(exception)
       
   139       db.run_in_transaction(self.failJob, job_key)
       
   140       return True
       
   141 
       
   142 
       
   143 handler = Handler()