thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 1278 a7766286a7be
child 2273 e4cb9c53db3e
equal deleted inserted replaced
1277:5c931bd3dc1e 1278:a7766286a7be
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2007 Google Inc.
       
     4 #
       
     5 # Licensed under the Apache License, Version 2.0 (the "License");
       
     6 # you may not use this file except in compliance with the License.
       
     7 # You may obtain a copy of the License at
       
     8 #
       
     9 #     http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 # Unless required by applicable law or agreed to in writing, software
       
    12 # distributed under the License is distributed on an "AS IS" BASIS,
       
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 # See the License for the specific language governing permissions and
       
    15 # limitations under the License.
       
    16 #
       
    17 
       
    18 """Imports CSV data over HTTP.
       
    19 
       
    20 Usage:
       
    21   %(arg0)s [flags]
       
    22 
       
    23     --debug                 Show debugging information. (Optional)
       
    24     --app_id=<string>       Application ID of endpoint (Optional for
       
    25                             *.appspot.com)
       
    26     --auth_domain=<domain>  The auth domain to use for logging in and for
       
    27                             UserProperties. (Default: gmail.com)
       
    28     --bandwidth_limit=<int> The maximum number of bytes per second for the
       
    29                             aggregate transfer of data to the server. Bursts
       
    30     --batch_size=<int>      Number of Entity objects to include in each post to
       
    31                             the URL endpoint. The more data per row/Entity, the
       
    32                             smaller the batch size should be. (Default 10)
       
    33     --config_file=<path>    File containing Model and Loader definitions.
       
    34                             (Required)
       
    35     --db_filename=<path>    Specific progress database to write to, or to
       
    36                             resume from. If not supplied, then a new database
       
    37                             will be started, named:
       
    38                             bulkloader-progress-TIMESTAMP.
       
    39                             The special filename "skip" may be used to simply
       
    40                             skip reading/writing any progress information.
       
    41     --filename=<path>       Path to the CSV file to import. (Required)
       
    42     --http_limit=<int>      The maximum numer of HTTP requests per second to
       
    43                             send to the server. (Default: 8)
       
    44     --kind=<string>         Name of the Entity object kind to put in the
       
    45                             datastore. (Required)
       
    46     --num_threads=<int>     Number of threads to use for uploading entities
       
    47                             (Default 10)
       
    48                             may exceed this, but overall transfer rate is
       
    49                             restricted to this rate. (Default 250000)
       
    50     --rps_limit=<int>       The maximum number of records per second to
       
    51                             transfer to the server. (Default: 20)
       
    52     --url=<string>          URL endpoint to post to for importing data.
       
    53                             (Required)
       
    54 
       
    55 The exit status will be 0 on success, non-zero on import failure.
       
    56 
       
    57 Works with the remote_api mix-in library for google.appengine.ext.remote_api.
       
    58 Please look there for documentation about how to setup the server side.
       
    59 
       
    60 Example:
       
    61 
       
    62 %(arg0)s --url=http://app.appspot.com/remote_api --kind=Model \
       
    63  --filename=data.csv --config_file=loader_config.py
       
    64 
       
    65 """
       
    66 
       
    67 
       
    68 
       
    69 import csv
       
    70 import getopt
       
    71 import getpass
       
    72 import logging
       
    73 import new
       
    74 import os
       
    75 import Queue
       
    76 import signal
       
    77 import sys
       
    78 import threading
       
    79 import time
       
    80 import traceback
       
    81 import urllib2
       
    82 import urlparse
       
    83 
       
    84 from google.appengine.ext import db
       
    85 from google.appengine.ext.remote_api import remote_api_stub
       
    86 from google.appengine.tools import appengine_rpc
       
    87 
       
    88 try:
       
    89   import sqlite3
       
    90 except ImportError:
       
    91   pass
       
    92 
       
    93 UPLOADER_VERSION = '1'
       
    94 
       
    95 DEFAULT_THREAD_COUNT = 10
       
    96 
       
    97 DEFAULT_BATCH_SIZE = 10
       
    98 
       
    99 DEFAULT_QUEUE_SIZE = DEFAULT_THREAD_COUNT * 10
       
   100 
       
   101 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
       
   102 
       
   103 STATE_READ = 0
       
   104 STATE_SENDING = 1
       
   105 STATE_SENT = 2
       
   106 STATE_NOT_SENT = 3
       
   107 
       
   108 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
       
   109 
       
   110 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
       
   111 
       
   112 INITIAL_BACKOFF = 1.0
       
   113 
       
   114 BACKOFF_FACTOR = 2.0
       
   115 
       
   116 
       
   117 DEFAULT_BANDWIDTH_LIMIT = 250000
       
   118 
       
   119 DEFAULT_RPS_LIMIT = 20
       
   120 
       
   121 DEFAULT_REQUEST_LIMIT = 8
       
   122 
       
   123 BANDWIDTH_UP = 'http-bandwidth-up'
       
   124 BANDWIDTH_DOWN = 'http-bandwidth-down'
       
   125 REQUESTS = 'http-requests'
       
   126 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
       
   127 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
       
   128 HTTPS_REQUESTS = 'https-requests'
       
   129 RECORDS = 'records'
       
   130 
       
   131 
       
   132 def StateMessage(state):
       
   133   """Converts a numeric state identifier to a status message."""
       
   134   return ({
       
   135       STATE_READ: 'Batch read from file.',
       
   136       STATE_SENDING: 'Sending batch to server.',
       
   137       STATE_SENT: 'Batch successfully sent.',
       
   138       STATE_NOT_SENT: 'Error while sending batch.'
       
   139   }[state])
       
   140 
       
   141 
       
   142 class Error(Exception):
       
   143   """Base-class for exceptions in this module."""
       
   144 
       
   145 
       
   146 class FatalServerError(Error):
       
   147   """An unrecoverable error occurred while trying to post data to the server."""
       
   148 
       
   149 
       
   150 class ResumeError(Error):
       
   151   """Error while trying to resume a partial upload."""
       
   152 
       
   153 
       
   154 class ConfigurationError(Error):
       
   155   """Error in configuration options."""
       
   156 
       
   157 
       
   158 class AuthenticationError(Error):
       
   159   """Error while trying to authenticate with the server."""
       
   160 
       
   161 
       
   162 def GetCSVGeneratorFactory(csv_filename, batch_size,
       
   163                            openfile=open, create_csv_reader=csv.reader):
       
   164   """Return a factory that creates a CSV-based WorkItem generator.
       
   165 
       
   166   Args:
       
   167     csv_filename: File on disk containing CSV data.
       
   168     batch_size: Maximum number of CSV rows to stash into a WorkItem.
       
   169     openfile: Used for dependency injection.
       
   170     create_csv_reader: Used for dependency injection.
       
   171 
       
   172   Returns: A callable (accepting the Progress Queue and Progress
       
   173     Generators as input) which creates the WorkItem generator.
       
   174   """
       
   175 
       
   176   def CreateGenerator(progress_queue, progress_generator):
       
   177     """Initialize a CSV generator linked to a progress generator and queue.
       
   178 
       
   179     Args:
       
   180       progress_queue: A ProgressQueue instance to send progress information.
       
   181       progress_generator: A generator of progress information or None.
       
   182 
       
   183     Returns:
       
   184       A CSVGenerator instance.
       
   185     """
       
   186     return CSVGenerator(progress_queue,
       
   187                         progress_generator,
       
   188                         csv_filename,
       
   189                         batch_size,
       
   190                         openfile,
       
   191                         create_csv_reader)
       
   192   return CreateGenerator
       
   193 
       
   194 
       
   195 class CSVGenerator(object):
       
   196   """Reads a CSV file and generates WorkItems containing batches of records."""
       
   197 
       
   198   def __init__(self,
       
   199                progress_queue,
       
   200                progress_generator,
       
   201                csv_filename,
       
   202                batch_size,
       
   203                openfile,
       
   204                create_csv_reader):
       
   205     """Initializes a CSV generator.
       
   206 
       
   207     Args:
       
   208       progress_queue: A queue used for tracking progress information.
       
   209       progress_generator: A generator of prior progress information, or None
       
   210         if there is no prior status.
       
   211       csv_filename: File on disk containing CSV data.
       
   212       batch_size: Maximum number of CSV rows to stash into a WorkItem.
       
   213       openfile: Used for dependency injection of 'open'.
       
   214       create_csv_reader: Used for dependency injection of 'csv.reader'.
       
   215     """
       
   216     self.progress_queue = progress_queue
       
   217     self.progress_generator = progress_generator
       
   218     self.csv_filename = csv_filename
       
   219     self.batch_size = batch_size
       
   220     self.openfile = openfile
       
   221     self.create_csv_reader = create_csv_reader
       
   222     self.line_number = 1
       
   223     self.column_count = None
       
   224     self.read_rows = []
       
   225     self.reader = None
       
   226     self.row_count = 0
       
   227     self.sent_count = 0
       
   228 
       
   229   def _AdvanceTo(self, line):
       
   230     """Advance the reader to the given line.
       
   231 
       
   232     Args:
       
   233       line: A line number to advance to.
       
   234     """
       
   235     while self.line_number < line:
       
   236       self.reader.next()
       
   237       self.line_number += 1
       
   238       self.row_count += 1
       
   239       self.sent_count += 1
       
   240 
       
   241   def _ReadRows(self, key_start, key_end):
       
   242     """Attempts to read and encode rows [key_start, key_end].
       
   243 
       
   244     The encoded rows are stored in self.read_rows.
       
   245 
       
   246     Args:
       
   247       key_start: The starting line number.
       
   248       key_end: The ending line number.
       
   249 
       
   250     Raises:
       
   251       StopIteration: if the reader runs out of rows
       
   252       ResumeError: if there are an inconsistent number of columns.
       
   253     """
       
   254     assert self.line_number == key_start
       
   255     self.read_rows = []
       
   256     while self.line_number <= key_end:
       
   257       row = self.reader.next()
       
   258       self.row_count += 1
       
   259       if self.column_count is None:
       
   260         self.column_count = len(row)
       
   261       else:
       
   262         if self.column_count != len(row):
       
   263           raise ResumeError('Column count mismatch, %d: %s' %
       
   264                             (self.column_count, str(row)))
       
   265       self.read_rows.append((self.line_number, row))
       
   266       self.line_number += 1
       
   267 
       
   268   def _MakeItem(self, key_start, key_end, rows, progress_key=None):
       
   269     """Makes a WorkItem containing the given rows, with the given keys.
       
   270 
       
   271     Args:
       
   272       key_start: The start key for the WorkItem.
       
   273       key_end: The end key for the WorkItem.
       
   274       rows: A list of the rows for the WorkItem.
       
   275       progress_key: The progress key for the WorkItem
       
   276 
       
   277     Returns:
       
   278       A WorkItem instance for the given batch.
       
   279     """
       
   280     assert rows
       
   281 
       
   282     item = WorkItem(self.progress_queue, rows,
       
   283                     key_start, key_end,
       
   284                     progress_key=progress_key)
       
   285 
       
   286     return item
       
   287 
       
   288   def Batches(self):
       
   289     """Reads the CSV data file and generates WorkItems.
       
   290 
       
   291     Yields:
       
   292       Instances of class WorkItem
       
   293 
       
   294     Raises:
       
   295       ResumeError: If the progress database and data file indicate a different
       
   296         number of rows.
       
   297     """
       
   298     csv_file = self.openfile(self.csv_filename, 'r')
       
   299     csv_content = csv_file.read()
       
   300     if csv_content:
       
   301       has_headers = csv.Sniffer().has_header(csv_content)
       
   302     else:
       
   303       has_headers = False
       
   304     csv_file.seek(0)
       
   305     self.reader = self.create_csv_reader(csv_file, skipinitialspace=True)
       
   306     if has_headers:
       
   307       logging.info('The CSV file appears to have a header line, skipping.')
       
   308       self.reader.next()
       
   309 
       
   310     exhausted = False
       
   311 
       
   312     self.line_number = 1
       
   313     self.column_count = None
       
   314 
       
   315     logging.info('Starting import; maximum %d entities per post',
       
   316                  self.batch_size)
       
   317 
       
   318     state = None
       
   319     if self.progress_generator is not None:
       
   320       for progress_key, state, key_start, key_end in self.progress_generator:
       
   321         if key_start:
       
   322           try:
       
   323             self._AdvanceTo(key_start)
       
   324             self._ReadRows(key_start, key_end)
       
   325             yield self._MakeItem(key_start,
       
   326                                  key_end,
       
   327                                  self.read_rows,
       
   328                                  progress_key=progress_key)
       
   329           except StopIteration:
       
   330             logging.error('Mismatch between data file and progress database')
       
   331             raise ResumeError(
       
   332                 'Mismatch between data file and progress database')
       
   333         elif state == DATA_CONSUMED_TO_HERE:
       
   334           try:
       
   335             self._AdvanceTo(key_end + 1)
       
   336           except StopIteration:
       
   337             state = None
       
   338 
       
   339     if self.progress_generator is None or state == DATA_CONSUMED_TO_HERE:
       
   340       while not exhausted:
       
   341         key_start = self.line_number
       
   342         key_end = self.line_number + self.batch_size - 1
       
   343         try:
       
   344           self._ReadRows(key_start, key_end)
       
   345         except StopIteration:
       
   346           exhausted = True
       
   347           key_end = self.line_number - 1
       
   348         if key_start <= key_end:
       
   349           yield self._MakeItem(key_start, key_end, self.read_rows)
       
   350 
       
   351 
       
   352 class ReQueue(object):
       
   353   """A special thread-safe queue.
       
   354 
       
   355   A ReQueue allows unfinished work items to be returned with a call to
       
   356   reput().  When an item is reput, task_done() should *not* be called
       
   357   in addition, getting an item that has been reput does not increase
       
   358   the number of outstanding tasks.
       
   359 
       
   360   This class shares an interface with Queue.Queue and provides the
       
   361   additional Reput method.
       
   362   """
       
   363 
       
   364   def __init__(self,
       
   365                queue_capacity,
       
   366                requeue_capacity=None,
       
   367                queue_factory=Queue.Queue,
       
   368                get_time=time.time):
       
   369     """Initialize a ReQueue instance.
       
   370 
       
   371     Args:
       
   372       queue_capacity: The number of items that can be put in the ReQueue.
       
   373       requeue_capacity: The numer of items that can be reput in the ReQueue.
       
   374       queue_factory: Used for dependency injection.
       
   375       get_time: Used for dependency injection.
       
   376     """
       
   377     if requeue_capacity is None:
       
   378       requeue_capacity = queue_capacity
       
   379 
       
   380     self.get_time = get_time
       
   381     self.queue = queue_factory(queue_capacity)
       
   382     self.requeue = queue_factory(requeue_capacity)
       
   383     self.lock = threading.Lock()
       
   384     self.put_cond = threading.Condition(self.lock)
       
   385     self.get_cond = threading.Condition(self.lock)
       
   386 
       
   387   def _DoWithTimeout(self,
       
   388                      action,
       
   389                      exc,
       
   390                      wait_cond,
       
   391                      done_cond,
       
   392                      lock,
       
   393                      timeout=None,
       
   394                      block=True):
       
   395     """Performs the given action with a timeout.
       
   396 
       
   397     The action must be non-blocking, and raise an instance of exc on a
       
   398     recoverable failure.  If the action fails with an instance of exc,
       
   399     we wait on wait_cond before trying again.  Failure after the
       
   400     timeout is reached is propagated as an exception.  Success is
       
   401     signalled by notifying on done_cond and returning the result of
       
   402     the action.  If action raises any exception besides an instance of
       
   403     exc, it is immediately propagated.
       
   404 
       
   405     Args:
       
   406       action: A callable that performs a non-blocking action.
       
   407       exc: An exception type that is thrown by the action to indicate
       
   408         a recoverable error.
       
   409       wait_cond: A condition variable which should be waited on when
       
   410         action throws exc.
       
   411       done_cond: A condition variable to signal if the action returns.
       
   412       lock: The lock used by wait_cond and done_cond.
       
   413       timeout: A non-negative float indicating the maximum time to wait.
       
   414       block: Whether to block if the action cannot complete immediately.
       
   415 
       
   416     Returns:
       
   417       The result of the action, if it is successful.
       
   418 
       
   419     Raises:
       
   420       ValueError: If the timeout argument is negative.
       
   421     """
       
   422     if timeout is not None and timeout < 0.0:
       
   423       raise ValueError('\'timeout\' must not be a negative  number')
       
   424     if not block:
       
   425       timeout = 0.0
       
   426     result = None
       
   427     success = False
       
   428     start_time = self.get_time()
       
   429     lock.acquire()
       
   430     try:
       
   431       while not success:
       
   432         try:
       
   433           result = action()
       
   434           success = True
       
   435         except Exception, e:
       
   436           if not isinstance(e, exc):
       
   437             raise e
       
   438           if timeout is not None:
       
   439             elapsed_time = self.get_time() - start_time
       
   440             timeout -= elapsed_time
       
   441             if timeout <= 0.0:
       
   442               raise e
       
   443           wait_cond.wait(timeout)
       
   444     finally:
       
   445       if success:
       
   446         done_cond.notify()
       
   447       lock.release()
       
   448     return result
       
   449 
       
   450   def put(self, item, block=True, timeout=None):
       
   451     """Put an item into the requeue.
       
   452 
       
   453     Args:
       
   454       item: An item to add to the requeue.
       
   455       block: Whether to block if the requeue is full.
       
   456       timeout: Maximum on how long to wait until the queue is non-full.
       
   457 
       
   458     Raises:
       
   459       Queue.Full if the queue is full and the timeout expires.
       
   460     """
       
   461     def PutAction():
       
   462       self.queue.put(item, block=False)
       
   463     self._DoWithTimeout(PutAction,
       
   464                         Queue.Full,
       
   465                         self.get_cond,
       
   466                         self.put_cond,
       
   467                         self.lock,
       
   468                         timeout=timeout,
       
   469                         block=block)
       
   470 
       
   471   def reput(self, item, block=True, timeout=None):
       
   472     """Re-put an item back into the requeue.
       
   473 
       
   474     Re-putting an item does not increase the number of outstanding
       
   475     tasks, so the reput item should be uniquely associated with an
       
   476     item that was previously removed from the requeue and for which
       
   477     task_done has not been called.
       
   478 
       
   479     Args:
       
   480       item: An item to add to the requeue.
       
   481       block: Whether to block if the requeue is full.
       
   482       timeout: Maximum on how long to wait until the queue is non-full.
       
   483 
       
   484     Raises:
       
   485       Queue.Full is the queue is full and the timeout expires.
       
   486     """
       
   487     def ReputAction():
       
   488       self.requeue.put(item, block=False)
       
   489     self._DoWithTimeout(ReputAction,
       
   490                         Queue.Full,
       
   491                         self.get_cond,
       
   492                         self.put_cond,
       
   493                         self.lock,
       
   494                         timeout=timeout,
       
   495                         block=block)
       
   496 
       
   497   def get(self, block=True, timeout=None):
       
   498     """Get an item from the requeue.
       
   499 
       
   500     Args:
       
   501       block: Whether to block if the requeue is empty.
       
   502       timeout: Maximum on how long to wait until the requeue is non-empty.
       
   503 
       
   504     Returns:
       
   505       An item from the requeue.
       
   506 
       
   507     Raises:
       
   508       Queue.Empty if the queue is empty and the timeout expires.
       
   509     """
       
   510     def GetAction():
       
   511       try:
       
   512         result = self.requeue.get(block=False)
       
   513         self.requeue.task_done()
       
   514       except Queue.Empty:
       
   515         result = self.queue.get(block=False)
       
   516       return result
       
   517     return self._DoWithTimeout(GetAction,
       
   518                                Queue.Empty,
       
   519                                self.put_cond,
       
   520                                self.get_cond,
       
   521                                self.lock,
       
   522                                timeout=timeout,
       
   523                                block=block)
       
   524 
       
   525   def join(self):
       
   526     """Blocks until all of the items in the requeue have been processed."""
       
   527     self.queue.join()
       
   528 
       
   529   def task_done(self):
       
   530     """Indicate that a previously enqueued item has been fully processed."""
       
   531     self.queue.task_done()
       
   532 
       
   533   def empty(self):
       
   534     """Returns true if the requeue is empty."""
       
   535     return self.queue.empty() and self.requeue.empty()
       
   536 
       
   537   def get_nowait(self):
       
   538     """Try to get an item from the queue without blocking."""
       
   539     return self.get(block=False)
       
   540 
       
   541 
       
   542 class ThrottleHandler(urllib2.BaseHandler):
       
   543   """A urllib2 handler for http and https requests that adds to a throttle."""
       
   544 
       
   545   def __init__(self, throttle):
       
   546     """Initialize a ThrottleHandler.
       
   547 
       
   548     Args:
       
   549       throttle: A Throttle instance to call for bandwidth and http/https request
       
   550         throttling.
       
   551     """
       
   552     self.throttle = throttle
       
   553 
       
   554   def AddRequest(self, throttle_name, req):
       
   555     """Add to bandwidth throttle for given request.
       
   556 
       
   557     Args:
       
   558       throttle_name: The name of the bandwidth throttle to add to.
       
   559       req: The request whose size will be added to the throttle.
       
   560     """
       
   561     size = 0
       
   562     for key, value in req.headers.iteritems():
       
   563       size += len('%s: %s\n' % (key, value))
       
   564     for key, value in req.unredirected_hdrs.iteritems():
       
   565       size += len('%s: %s\n' % (key, value))
       
   566     (unused_scheme,
       
   567      unused_host_port, url_path,
       
   568      unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
       
   569     size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
       
   570     data = req.get_data()
       
   571     if data:
       
   572       size += len(data)
       
   573     self.throttle.AddTransfer(throttle_name, size)
       
   574 
       
   575   def AddResponse(self, throttle_name, res):
       
   576     """Add to bandwidth throttle for given response.
       
   577 
       
   578     Args:
       
   579       throttle_name: The name of the bandwidth throttle to add to.
       
   580       res: The response whose size will be added to the throttle.
       
   581     """
       
   582     content = res.read()
       
   583     def ReturnContent():
       
   584       return content
       
   585     res.read = ReturnContent
       
   586     size = len(content)
       
   587     headers = res.info()
       
   588     for key, value in headers.items():
       
   589       size += len('%s: %s\n' % (key, value))
       
   590     self.throttle.AddTransfer(throttle_name, size)
       
   591 
       
   592   def http_request(self, req):
       
   593     """Process an HTTP request.
       
   594 
       
   595     If the throttle is over quota, sleep first.  Then add request size to
       
   596     throttle before returning it to be sent.
       
   597 
       
   598     Args:
       
   599       req: A urllib2.Request object.
       
   600 
       
   601     Returns:
       
   602       The request passed in.
       
   603     """
       
   604     self.throttle.Sleep()
       
   605     self.AddRequest(BANDWIDTH_UP, req)
       
   606     return req
       
   607 
       
   608   def https_request(self, req):
       
   609     """Process an HTTPS request.
       
   610 
       
   611     If the throttle is over quota, sleep first.  Then add request size to
       
   612     throttle before returning it to be sent.
       
   613 
       
   614     Args:
       
   615       req: A urllib2.Request object.
       
   616 
       
   617     Returns:
       
   618       The request passed in.
       
   619     """
       
   620     self.throttle.Sleep()
       
   621     self.AddRequest(HTTPS_BANDWIDTH_UP, req)
       
   622     return req
       
   623 
       
   624   def http_response(self, unused_req, res):
       
   625     """Process an HTTP response.
       
   626 
       
   627     The size of the response is added to the bandwidth throttle and the request
       
   628     throttle is incremented by one.
       
   629 
       
   630     Args:
       
   631       unused_req: The urllib2 request for this response.
       
   632       res: A urllib2 response object.
       
   633 
       
   634     Returns:
       
   635       The response passed in.
       
   636     """
       
   637     self.AddResponse(BANDWIDTH_DOWN, res)
       
   638     self.throttle.AddTransfer(REQUESTS, 1)
       
   639     return res
       
   640 
       
   641   def https_response(self, unused_req, res):
       
   642     """Process an HTTPS response.
       
   643 
       
   644     The size of the response is added to the bandwidth throttle and the request
       
   645     throttle is incremented by one.
       
   646 
       
   647     Args:
       
   648       unused_req: The urllib2 request for this response.
       
   649       res: A urllib2 response object.
       
   650 
       
   651     Returns:
       
   652       The response passed in.
       
   653     """
       
   654     self.AddResponse(HTTPS_BANDWIDTH_DOWN, res)
       
   655     self.throttle.AddTransfer(HTTPS_REQUESTS, 1)
       
   656     return res
       
   657 
       
   658 
       
   659 class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer):
       
   660   """Provides a simplified RPC-style interface for HTTP requests.
       
   661 
       
   662   This RPC server uses a Throttle to prevent exceeding quotas.
       
   663   """
       
   664 
       
   665   def __init__(self, throttle, request_manager, *args, **kwargs):
       
   666     """Initialize a ThrottledHttpRpcServer.
       
   667 
       
   668     Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance.
       
   669 
       
   670     Args:
       
   671       throttle: A Throttles instance.
       
   672       request_manager: A RequestManager instance.
       
   673       args: Positional arguments to pass through to
       
   674         appengine_rpc.HttpRpcServer.__init__
       
   675       kwargs: Keyword arguments to pass through to
       
   676         appengine_rpc.HttpRpcServer.__init__
       
   677     """
       
   678     self.throttle = throttle
       
   679     appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs)
       
   680     request_manager.rpc_server = self
       
   681 
       
   682   def _GetOpener(self):
       
   683     """Returns an OpenerDirector that supports cookies and ignores redirects.
       
   684 
       
   685     Returns:
       
   686       A urllib2.OpenerDirector object.
       
   687     """
       
   688     opener = appengine_rpc.HttpRpcServer._GetOpener(self)
       
   689     opener.add_handler(ThrottleHandler(self.throttle))
       
   690 
       
   691     return opener
       
   692 
       
   693 
       
   694 def ThrottledHttpRpcServerFactory(throttle, request_manager):
       
   695   """Create a factory to produce ThrottledHttpRpcServer for a given throttle.
       
   696 
       
   697   Args:
       
   698     throttle: A Throttle instance to use for the ThrottledHttpRpcServer.
       
   699     request_manager: A RequestManager instance.
       
   700 
       
   701   Returns:
       
   702     A factory to produce a ThrottledHttpRpcServer.
       
   703   """
       
   704   def MakeRpcServer(*args, **kwargs):
       
   705     kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
       
   706     kwargs['save_cookies'] = True
       
   707     return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs)
       
   708   return MakeRpcServer
       
   709 
       
   710 
       
   711 class RequestManager(object):
       
   712   """A class which wraps a connection to the server."""
       
   713 
       
   714   source = 'google-bulkloader-%s' % UPLOADER_VERSION
       
   715   user_agent = source
       
   716 
       
   717   def __init__(self,
       
   718                app_id,
       
   719                host_port,
       
   720                url_path,
       
   721                kind,
       
   722                throttle):
       
   723     """Initialize a RequestManager object.
       
   724 
       
   725     Args:
       
   726       app_id: String containing the application id for requests.
       
   727       host_port: String containing the "host:port" pair; the port is optional.
       
   728       url_path: partial URL (path) to post entity data to.
       
   729       kind: Kind of the Entity records being posted.
       
   730       throttle: A Throttle instance.
       
   731     """
       
   732     self.app_id = app_id
       
   733     self.host_port = host_port
       
   734     self.host = host_port.split(':')[0]
       
   735     if url_path and url_path[0] != '/':
       
   736       url_path = '/' + url_path
       
   737     self.url_path = url_path
       
   738     self.kind = kind
       
   739     self.throttle = throttle
       
   740     self.credentials = None
       
   741     throttled_rpc_server_factory = ThrottledHttpRpcServerFactory(
       
   742         self.throttle, self)
       
   743     logging.debug('Configuring remote_api. app_id = %s, url_path = %s, '
       
   744                   'servername = %s' % (app_id, url_path, host_port))
       
   745     remote_api_stub.ConfigureRemoteDatastore(
       
   746         app_id,
       
   747         url_path,
       
   748         self.AuthFunction,
       
   749         servername=host_port,
       
   750         rpc_server_factory=throttled_rpc_server_factory)
       
   751     self.authenticated = False
       
   752 
       
   753   def Authenticate(self):
       
   754     """Invoke authentication if necessary."""
       
   755     self.rpc_server.Send(self.url_path, payload=None)
       
   756     self.authenticated = True
       
   757 
       
   758   def AuthFunction(self,
       
   759                    raw_input_fn=raw_input,
       
   760                    password_input_fn=getpass.getpass):
       
   761     """Prompts the user for a username and password.
       
   762 
       
   763     Caches the results the first time it is called and returns the
       
   764     same result every subsequent time.
       
   765 
       
   766     Args:
       
   767       raw_input_fn: Used for dependency injection.
       
   768       password_input_fn: Used for dependency injection.
       
   769 
       
   770     Returns:
       
   771       A pair of the username and password.
       
   772     """
       
   773     if self.credentials is not None:
       
   774       return self.credentials
       
   775     print 'Please enter login credentials for %s (%s)' % (
       
   776         self.host, self.app_id)
       
   777     email = raw_input_fn('Email: ')
       
   778     if email:
       
   779       password_prompt = 'Password for %s: ' % email
       
   780       password = password_input_fn(password_prompt)
       
   781     else:
       
   782       password = None
       
   783     self.credentials = (email, password)
       
   784     return self.credentials
       
   785 
       
   786   def _GetHeaders(self):
       
   787     """Constructs a dictionary of extra headers to send with a request."""
       
   788     headers = {
       
   789         'GAE-Uploader-Version': UPLOADER_VERSION,
       
   790         'GAE-Uploader-Kind': self.kind
       
   791         }
       
   792     return headers
       
   793 
       
   794   def EncodeContent(self, rows):
       
   795     """Encodes row data to the wire format.
       
   796 
       
   797     Args:
       
   798       rows: A list of pairs of a line number and a list of column values.
       
   799 
       
   800     Returns:
       
   801       A list of db.Model instances.
       
   802     """
       
   803     try:
       
   804       loader = Loader.RegisteredLoaders()[self.kind]
       
   805     except KeyError:
       
   806       logging.error('No Loader defined for kind %s.' % self.kind)
       
   807       raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
       
   808     entities = []
       
   809     for line_number, values in rows:
       
   810       key = loader.GenerateKey(line_number, values)
       
   811       entity = loader.CreateEntity(values, key_name=key)
       
   812       entities.extend(entity)
       
   813 
       
   814     return entities
       
   815 
       
   816   def PostEntities(self, item):
       
   817     """Posts Entity records to a remote endpoint over HTTP.
       
   818 
       
   819     Args:
       
   820       item: A workitem containing the entities to post.
       
   821 
       
   822     Returns:
       
   823       A pair of the estimated size of the request in bytes and the response
       
   824         from the server as a str.
       
   825     """
       
   826     entities = item.content
       
   827     db.put(entities)
       
   828 
       
   829 
       
   830 class WorkItem(object):
       
   831   """Holds a unit of uploading work.
       
   832 
       
   833   A WorkItem represents a number of entities that need to be uploaded to
       
   834   Google App Engine. These entities are encoded in the "content" field of
       
   835   the WorkItem, and will be POST'd as-is to the server.
       
   836 
       
   837   The entities are identified by a range of numeric keys, inclusively. In
       
   838   the case of a resumption of an upload, or a replay to correct errors,
       
   839   these keys must be able to identify the same set of entities.
       
   840 
       
   841   Note that keys specify a range. The entities do not have to sequentially
       
   842   fill the entire range, they must simply bound a range of valid keys.
       
   843   """
       
   844 
       
   845   def __init__(self, progress_queue, rows, key_start, key_end,
       
   846                progress_key=None):
       
   847     """Initialize the WorkItem instance.
       
   848 
       
   849     Args:
       
   850       progress_queue: A queue used for tracking progress information.
       
   851       rows: A list of pairs of a line number and a list of column values
       
   852       key_start: The (numeric) starting key, inclusive.
       
   853       key_end: The (numeric) ending key, inclusive.
       
   854       progress_key: If this WorkItem represents state from a prior run,
       
   855         then this will be the key within the progress database.
       
   856     """
       
   857     self.state = STATE_READ
       
   858 
       
   859     self.progress_queue = progress_queue
       
   860 
       
   861     assert isinstance(key_start, (int, long))
       
   862     assert isinstance(key_end, (int, long))
       
   863     assert key_start <= key_end
       
   864 
       
   865     self.key_start = key_start
       
   866     self.key_end = key_end
       
   867     self.progress_key = progress_key
       
   868 
       
   869     self.progress_event = threading.Event()
       
   870 
       
   871     self.rows = rows
       
   872     self.content = None
       
   873     self.count = len(rows)
       
   874 
       
   875   def MarkAsRead(self):
       
   876     """Mark this WorkItem as read/consumed from the data source."""
       
   877 
       
   878     assert self.state == STATE_READ
       
   879 
       
   880     self._StateTransition(STATE_READ, blocking=True)
       
   881 
       
   882     assert self.progress_key is not None
       
   883 
       
   884   def MarkAsSending(self):
       
   885     """Mark this WorkItem as in-process on being uploaded to the server."""
       
   886 
       
   887     assert self.state == STATE_READ or self.state == STATE_NOT_SENT
       
   888     assert self.progress_key is not None
       
   889 
       
   890     self._StateTransition(STATE_SENDING, blocking=True)
       
   891 
       
   892   def MarkAsSent(self):
       
   893     """Mark this WorkItem as sucessfully-sent to the server."""
       
   894 
       
   895     assert self.state == STATE_SENDING
       
   896     assert self.progress_key is not None
       
   897 
       
   898     self._StateTransition(STATE_SENT, blocking=False)
       
   899 
       
   900   def MarkAsError(self):
       
   901     """Mark this WorkItem as required manual error recovery."""
       
   902 
       
   903     assert self.state == STATE_SENDING
       
   904     assert self.progress_key is not None
       
   905 
       
   906     self._StateTransition(STATE_NOT_SENT, blocking=True)
       
   907 
       
   908   def _StateTransition(self, new_state, blocking=False):
       
   909     """Transition the work item to a new state, storing progress information.
       
   910 
       
   911     Args:
       
   912       new_state: The state to transition to.
       
   913       blocking: Whether to block for the progress thread to acknowledge the
       
   914         transition.
       
   915     """
       
   916     logging.debug('[%s-%s] %s' %
       
   917                   (self.key_start, self.key_end, StateMessage(self.state)))
       
   918     assert not self.progress_event.isSet()
       
   919 
       
   920     self.state = new_state
       
   921 
       
   922     self.progress_queue.put(self)
       
   923 
       
   924     if blocking:
       
   925       self.progress_event.wait()
       
   926 
       
   927       self.progress_event.clear()
       
   928 
       
   929 
       
   930 
       
   931 def InterruptibleSleep(sleep_time):
       
   932   """Puts thread to sleep, checking this threads exit_flag twice a second.
       
   933 
       
   934   Args:
       
   935     sleep_time: Time to sleep.
       
   936   """
       
   937   slept = 0.0
       
   938   epsilon = .0001
       
   939   thread = threading.currentThread()
       
   940   while slept < sleep_time - epsilon:
       
   941     remaining = sleep_time - slept
       
   942     this_sleep_time = min(remaining, 0.5)
       
   943     time.sleep(this_sleep_time)
       
   944     slept += this_sleep_time
       
   945     if thread.exit_flag:
       
   946       return
       
   947 
       
   948 
       
   949 class ThreadGate(object):
       
   950   """Manage the number of active worker threads.
       
   951 
       
   952   The ThreadGate limits the number of threads that are simultaneously
       
   953   uploading batches of records in order to implement adaptive rate
       
   954   control.  The number of simultaneous upload threads that it takes to
       
   955   start causing timeout varies widely over the course of the day, so
       
   956   adaptive rate control allows the uploader to do many uploads while
       
   957   reducing the error rate and thus increasing the throughput.
       
   958 
       
   959   Initially the ThreadGate allows only one uploader thread to be active.
       
   960   For each successful upload, another thread is activated and for each
       
   961   failed upload, the number of active threads is reduced by one.
       
   962   """
       
   963 
       
   964   def __init__(self, enabled, sleep=InterruptibleSleep):
       
   965     self.enabled = enabled
       
   966     self.enabled_count = 1
       
   967     self.lock = threading.Lock()
       
   968     self.thread_semaphore = threading.Semaphore(self.enabled_count)
       
   969     self._threads = []
       
   970     self.backoff_time = 0
       
   971     self.sleep = sleep
       
   972 
       
   973   def Register(self, thread):
       
   974     """Register a thread with the thread gate."""
       
   975     self._threads.append(thread)
       
   976 
       
   977   def Threads(self):
       
   978     """Yields the registered threads."""
       
   979     for thread in self._threads:
       
   980       yield thread
       
   981 
       
   982   def EnableThread(self):
       
   983     """Enable one more worker thread."""
       
   984     self.lock.acquire()
       
   985     try:
       
   986       self.enabled_count += 1
       
   987     finally:
       
   988       self.lock.release()
       
   989     self.thread_semaphore.release()
       
   990 
       
   991   def EnableAllThreads(self):
       
   992     """Enable all worker threads."""
       
   993     for unused_idx in range(len(self._threads) - self.enabled_count):
       
   994       self.EnableThread()
       
   995 
       
   996   def StartWork(self):
       
   997     """Starts a critical section in which the number of workers is limited.
       
   998 
       
   999     If thread throttling is enabled then this method starts a critical
       
  1000     section which allows self.enabled_count simultaneously operating
       
  1001     threads. The critical section is ended by calling self.FinishWork().
       
  1002     """
       
  1003     if self.enabled:
       
  1004       self.thread_semaphore.acquire()
       
  1005       if self.backoff_time > 0.0:
       
  1006         if not threading.currentThread().exit_flag:
       
  1007           logging.info('Backing off: %.1f seconds',
       
  1008                        self.backoff_time)
       
  1009         self.sleep(self.backoff_time)
       
  1010 
       
  1011   def FinishWork(self):
       
  1012     """Ends a critical section started with self.StartWork()."""
       
  1013     if self.enabled:
       
  1014       self.thread_semaphore.release()
       
  1015 
       
  1016   def IncreaseWorkers(self):
       
  1017     """Informs the throttler that an item was successfully sent.
       
  1018 
       
  1019     If thread throttling is enabled, this method will cause an
       
  1020     additional thread to run in the critical section.
       
  1021     """
       
  1022     if self.enabled:
       
  1023       if self.backoff_time > 0.0:
       
  1024         logging.info('Resetting backoff to 0.0')
       
  1025         self.backoff_time = 0.0
       
  1026       do_enable = False
       
  1027       self.lock.acquire()
       
  1028       try:
       
  1029         if self.enabled and len(self._threads) > self.enabled_count:
       
  1030           do_enable = True
       
  1031           self.enabled_count += 1
       
  1032       finally:
       
  1033         self.lock.release()
       
  1034       if do_enable:
       
  1035         self.thread_semaphore.release()
       
  1036 
       
  1037   def DecreaseWorkers(self):
       
  1038     """Informs the thread_gate that an item failed to send.
       
  1039 
       
  1040     If thread throttling is enabled, this method will cause the
       
  1041     throttler to allow one fewer thread in the critical section. If
       
  1042     there is only one thread remaining, failures will result in
       
  1043     exponential backoff until there is a success.
       
  1044     """
       
  1045     if self.enabled:
       
  1046       do_disable = False
       
  1047       self.lock.acquire()
       
  1048       try:
       
  1049         if self.enabled:
       
  1050           if self.enabled_count > 1:
       
  1051             do_disable = True
       
  1052             self.enabled_count -= 1
       
  1053           else:
       
  1054             if self.backoff_time == 0.0:
       
  1055               self.backoff_time = INITIAL_BACKOFF
       
  1056             else:
       
  1057               self.backoff_time *= BACKOFF_FACTOR
       
  1058       finally:
       
  1059         self.lock.release()
       
  1060       if do_disable:
       
  1061         self.thread_semaphore.acquire()
       
  1062 
       
  1063 
       
  1064 class Throttle(object):
       
  1065   """A base class for upload rate throttling.
       
  1066 
       
  1067   Transferring large number of records, too quickly, to an application
       
  1068   could trigger quota limits and cause the transfer process to halt.
       
  1069   In order to stay within the application's quota, we throttle the
       
  1070   data transfer to a specified limit (across all transfer threads).
       
  1071   This limit defaults to about half of the Google App Engine default
       
  1072   for an application, but can be manually adjusted faster/slower as
       
  1073   appropriate.
       
  1074 
       
  1075   This class tracks a moving average of some aspect of the transfer
       
  1076   rate (bandwidth, records per second, http connections per
       
  1077   second). It keeps two windows of counts of bytes transferred, on a
       
  1078   per-thread basis. One block is the "current" block, and the other is
       
  1079   the "prior" block. It will rotate the counts from current to prior
       
  1080   when ROTATE_PERIOD has passed.  Thus, the current block will
       
  1081   represent from 0 seconds to ROTATE_PERIOD seconds of activity
       
  1082   (determined by: time.time() - self.last_rotate).  The prior block
       
  1083   will always represent a full ROTATE_PERIOD.
       
  1084 
       
  1085   Sleeping is performed just before a transfer of another block, and is
       
  1086   based on the counts transferred *before* the next transfer. It really
       
  1087   does not matter how much will be transferred, but only that for all the
       
  1088   data transferred SO FAR that we have interspersed enough pauses to
       
  1089   ensure the aggregate transfer rate is within the specified limit.
       
  1090 
       
  1091   These counts are maintained on a per-thread basis, so we do not require
       
  1092   any interlocks around incrementing the counts. There IS an interlock on
       
  1093   the rotation of the counts because we do not want multiple threads to
       
  1094   multiply-rotate the counts.
       
  1095 
       
  1096   There are various race conditions in the computation and collection
       
  1097   of these counts. We do not require precise values, but simply to
       
  1098   keep the overall transfer within the bandwidth limits. If a given
       
  1099   pause is a little short, or a little long, then the aggregate delays
       
  1100   will be correct.
       
  1101   """
       
  1102 
       
  1103   ROTATE_PERIOD = 600
       
  1104 
       
  1105   def __init__(self,
       
  1106                get_time=time.time,
       
  1107                thread_sleep=InterruptibleSleep,
       
  1108                layout=None):
       
  1109     self.get_time = get_time
       
  1110     self.thread_sleep = thread_sleep
       
  1111 
       
  1112     self.start_time = get_time()
       
  1113     self.transferred = {}
       
  1114     self.prior_block = {}
       
  1115     self.totals = {}
       
  1116     self.throttles = {}
       
  1117 
       
  1118     self.last_rotate = {}
       
  1119     self.rotate_mutex = {}
       
  1120     if layout:
       
  1121       self.AddThrottles(layout)
       
  1122 
       
  1123   def AddThrottle(self, name, limit):
       
  1124     self.throttles[name] = limit
       
  1125     self.transferred[name] = {}
       
  1126     self.prior_block[name] = {}
       
  1127     self.totals[name] = {}
       
  1128     self.last_rotate[name] = self.get_time()
       
  1129     self.rotate_mutex[name] = threading.Lock()
       
  1130 
       
  1131   def AddThrottles(self, layout):
       
  1132     for key, value in layout.iteritems():
       
  1133       self.AddThrottle(key, value)
       
  1134 
       
  1135   def Register(self, thread):
       
  1136     """Register this thread with the throttler."""
       
  1137     thread_name = thread.getName()
       
  1138     for throttle_name in self.throttles.iterkeys():
       
  1139       self.transferred[throttle_name][thread_name] = 0
       
  1140       self.prior_block[throttle_name][thread_name] = 0
       
  1141       self.totals[throttle_name][thread_name] = 0
       
  1142 
       
  1143   def VerifyName(self, throttle_name):
       
  1144     if throttle_name not in self.throttles:
       
  1145       raise AssertionError('%s is not a registered throttle' % throttle_name)
       
  1146 
       
  1147   def AddTransfer(self, throttle_name, token_count):
       
  1148     """Add a count to the amount this thread has transferred.
       
  1149 
       
  1150     Each time a thread transfers some data, it should call this method to
       
  1151     note the amount sent. The counts may be rotated if sufficient time
       
  1152     has passed since the last rotation.
       
  1153 
       
  1154     Note: this method should only be called by the BulkLoaderThread
       
  1155     instances. The token count is allocated towards the
       
  1156     "current thread".
       
  1157 
       
  1158     Args:
       
  1159       throttle_name: The name of the throttle to add to.
       
  1160       token_count: The number to add to the throttle counter.
       
  1161     """
       
  1162     self.VerifyName(throttle_name)
       
  1163     transferred = self.transferred[throttle_name]
       
  1164     transferred[threading.currentThread().getName()] += token_count
       
  1165 
       
  1166     if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time():
       
  1167       self._RotateCounts(throttle_name)
       
  1168 
       
  1169   def Sleep(self, throttle_name=None):
       
  1170     """Possibly sleep in order to limit the transfer rate.
       
  1171 
       
  1172     Note that we sleep based on *prior* transfers rather than what we
       
  1173     may be about to transfer. The next transfer could put us under/over
       
  1174     and that will be rectified *after* that transfer. Net result is that
       
  1175     the average transfer rate will remain within bounds. Spiky behavior
       
  1176     or uneven rates among the threads could possibly bring the transfer
       
  1177     rate above the requested limit for short durations.
       
  1178 
       
  1179     Args:
       
  1180       throttle_name: The name of the throttle to sleep on.  If None or
       
  1181         omitted, then sleep on all throttles.
       
  1182     """
       
  1183     if throttle_name is None:
       
  1184       for throttle_name in self.throttles:
       
  1185         self.Sleep(throttle_name=throttle_name)
       
  1186       return
       
  1187 
       
  1188     self.VerifyName(throttle_name)
       
  1189 
       
  1190     thread = threading.currentThread()
       
  1191 
       
  1192     while True:
       
  1193       duration = self.get_time() - self.last_rotate[throttle_name]
       
  1194 
       
  1195       total = 0
       
  1196       for count in self.prior_block[throttle_name].values():
       
  1197         total += count
       
  1198 
       
  1199       if total:
       
  1200         duration += self.ROTATE_PERIOD
       
  1201 
       
  1202       for count in self.transferred[throttle_name].values():
       
  1203         total += count
       
  1204 
       
  1205       sleep_time = (float(total) / self.throttles[throttle_name]) - duration
       
  1206 
       
  1207       if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
       
  1208         break
       
  1209 
       
  1210       logging.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
       
  1211                     '(duration=%.1f ms, total=%d)',
       
  1212                     thread.getName(), throttle_name,
       
  1213                     sleep_time * 1000, duration * 1000, total)
       
  1214       self.thread_sleep(sleep_time)
       
  1215       if thread.exit_flag:
       
  1216         break
       
  1217       self._RotateCounts(throttle_name)
       
  1218 
       
  1219   def _RotateCounts(self, throttle_name):
       
  1220     """Rotate the transfer counters.
       
  1221 
       
  1222     If sufficient time has passed, then rotate the counters from active to
       
  1223     the prior-block of counts.
       
  1224 
       
  1225     This rotation is interlocked to ensure that multiple threads do not
       
  1226     over-rotate the counts.
       
  1227 
       
  1228     Args:
       
  1229       throttle_name: The name of the throttle to rotate.
       
  1230     """
       
  1231     self.VerifyName(throttle_name)
       
  1232     self.rotate_mutex[throttle_name].acquire()
       
  1233     try:
       
  1234       next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD
       
  1235       if next_rotate_time >= self.get_time():
       
  1236         return
       
  1237 
       
  1238       for name, count in self.transferred[throttle_name].items():
       
  1239 
       
  1240 
       
  1241         self.prior_block[throttle_name][name] = count
       
  1242         self.transferred[throttle_name][name] = 0
       
  1243 
       
  1244         self.totals[throttle_name][name] += count
       
  1245 
       
  1246       self.last_rotate[throttle_name] = self.get_time()
       
  1247 
       
  1248     finally:
       
  1249       self.rotate_mutex[throttle_name].release()
       
  1250 
       
  1251   def TotalTransferred(self, throttle_name):
       
  1252     """Return the total transferred, and over what period.
       
  1253 
       
  1254     Args:
       
  1255       throttle_name: The name of the throttle to total.
       
  1256 
       
  1257     Returns:
       
  1258       A tuple of the total count and running time for the given throttle name.
       
  1259     """
       
  1260     total = 0
       
  1261     for count in self.totals[throttle_name].values():
       
  1262       total += count
       
  1263     for count in self.transferred[throttle_name].values():
       
  1264       total += count
       
  1265     return total, self.get_time() - self.start_time
       
  1266 
       
  1267 
       
  1268 class _ThreadBase(threading.Thread):
       
  1269   """Provide some basic features for the threads used in the uploader.
       
  1270 
       
  1271   This abstract base class is used to provide some common features:
       
  1272 
       
  1273   * Flag to ask thread to exit as soon as possible.
       
  1274   * Record exit/error status for the primary thread to pick up.
       
  1275   * Capture exceptions and record them for pickup.
       
  1276   * Some basic logging of thread start/stop.
       
  1277   * All threads are "daemon" threads.
       
  1278   * Friendly names for presenting to users.
       
  1279 
       
  1280   Concrete sub-classes must implement PerformWork().
       
  1281 
       
  1282   Either self.NAME should be set or GetFriendlyName() be overridden to
       
  1283   return a human-friendly name for this thread.
       
  1284 
       
  1285   The run() method starts the thread and prints start/exit messages.
       
  1286 
       
  1287   self.exit_flag is intended to signal that this thread should exit
       
  1288   when it gets the chance.  PerformWork() should check self.exit_flag
       
  1289   whenever it has the opportunity to exit gracefully.
       
  1290   """
       
  1291 
       
  1292   def __init__(self):
       
  1293     threading.Thread.__init__(self)
       
  1294 
       
  1295     self.setDaemon(True)
       
  1296 
       
  1297     self.exit_flag = False
       
  1298     self.error = None
       
  1299 
       
  1300   def run(self):
       
  1301     """Perform the work of the thread."""
       
  1302     logging.info('[%s] %s: started', self.getName(), self.__class__.__name__)
       
  1303 
       
  1304     try:
       
  1305       self.PerformWork()
       
  1306     except:
       
  1307       self.error = sys.exc_info()[1]
       
  1308       logging.exception('[%s] %s:', self.getName(), self.__class__.__name__)
       
  1309 
       
  1310     logging.info('[%s] %s: exiting', self.getName(), self.__class__.__name__)
       
  1311 
       
  1312   def PerformWork(self):
       
  1313     """Perform the thread-specific work."""
       
  1314     raise NotImplementedError()
       
  1315 
       
  1316   def CheckError(self):
       
  1317     """If an error is present, then log it."""
       
  1318     if self.error:
       
  1319       logging.error('Error in %s: %s', self.GetFriendlyName(), self.error)
       
  1320 
       
  1321   def GetFriendlyName(self):
       
  1322     """Returns a human-friendly description of the thread."""
       
  1323     if hasattr(self, 'NAME'):
       
  1324       return self.NAME
       
  1325     return 'unknown thread'
       
  1326 
       
  1327 
       
  1328 class BulkLoaderThread(_ThreadBase):
       
  1329   """A thread which transmits entities to the server application.
       
  1330 
       
  1331   This thread will read WorkItem instances from the work_queue and upload
       
  1332   the entities to the server application. Progress information will be
       
  1333   pushed into the progress_queue as the work is being performed.
       
  1334 
       
  1335   If a BulkLoaderThread encounters a transient error, the entities will be
       
  1336   resent, if a fatal error is encoutered the BulkLoaderThread exits.
       
  1337   """
       
  1338 
       
  1339   def __init__(self,
       
  1340                work_queue,
       
  1341                throttle,
       
  1342                thread_gate,
       
  1343                request_manager):
       
  1344     """Initialize the BulkLoaderThread instance.
       
  1345 
       
  1346     Args:
       
  1347       work_queue: A queue containing WorkItems for processing.
       
  1348       throttle: A Throttles to control upload bandwidth.
       
  1349       thread_gate: A ThreadGate to control number of simultaneous uploads.
       
  1350       request_manager: A RequestManager instance.
       
  1351     """
       
  1352     _ThreadBase.__init__(self)
       
  1353 
       
  1354     self.work_queue = work_queue
       
  1355     self.throttle = throttle
       
  1356     self.thread_gate = thread_gate
       
  1357 
       
  1358     self.request_manager = request_manager
       
  1359 
       
  1360   def PerformWork(self):
       
  1361     """Perform the work of a BulkLoaderThread."""
       
  1362     while not self.exit_flag:
       
  1363       success = False
       
  1364       self.thread_gate.StartWork()
       
  1365       try:
       
  1366         try:
       
  1367           item = self.work_queue.get(block=True, timeout=1.0)
       
  1368         except Queue.Empty:
       
  1369           continue
       
  1370         if item == _THREAD_SHOULD_EXIT:
       
  1371           break
       
  1372 
       
  1373         logging.debug('[%s] Got work item [%d-%d]',
       
  1374                       self.getName(), item.key_start, item.key_end)
       
  1375 
       
  1376         try:
       
  1377 
       
  1378           item.MarkAsSending()
       
  1379           try:
       
  1380             if item.content is None:
       
  1381               item.content = self.request_manager.EncodeContent(item.rows)
       
  1382             try:
       
  1383               self.request_manager.PostEntities(item)
       
  1384               success = True
       
  1385               logging.debug(
       
  1386                   '[%d-%d] Sent %d entities',
       
  1387                   item.key_start, item.key_end, item.count)
       
  1388               self.throttle.AddTransfer(RECORDS, item.count)
       
  1389             except (db.InternalError, db.NotSavedError, db.Timeout), e:
       
  1390               logging.debug('Caught non-fatal error: %s', e)
       
  1391             except urllib2.HTTPError, e:
       
  1392               if e.code == 403 or (e.code >= 500 and e.code < 600):
       
  1393                 logging.debug('Caught HTTP error %d', e.code)
       
  1394                 logging.debug('%s', e.read())
       
  1395               else:
       
  1396                 raise e
       
  1397 
       
  1398           except:
       
  1399             self.error = sys.exc_info()[1]
       
  1400             logging.exception('[%s] %s: caught exception %s', self.getName(),
       
  1401                               self.__class__.__name__, str(sys.exc_info()))
       
  1402             raise
       
  1403 
       
  1404         finally:
       
  1405           if success:
       
  1406             item.MarkAsSent()
       
  1407             self.thread_gate.IncreaseWorkers()
       
  1408             self.work_queue.task_done()
       
  1409           else:
       
  1410             item.MarkAsError()
       
  1411             self.thread_gate.DecreaseWorkers()
       
  1412             try:
       
  1413               self.work_queue.reput(item, block=False)
       
  1414             except Queue.Full:
       
  1415               logging.error('[%s] Failed to reput work item.', self.getName())
       
  1416               raise Error('Failed to reput work item')
       
  1417           logging.info('[%d-%d] %s',
       
  1418                        item.key_start, item.key_end, StateMessage(item.state))
       
  1419 
       
  1420       finally:
       
  1421         self.thread_gate.FinishWork()
       
  1422 
       
  1423 
       
  1424   def GetFriendlyName(self):
       
  1425     """Returns a human-friendly name for this thread."""
       
  1426     return 'worker [%s]' % self.getName()
       
  1427 
       
  1428 
       
  1429 class DataSourceThread(_ThreadBase):
       
  1430   """A thread which reads WorkItems and pushes them into queue.
       
  1431 
       
  1432   This thread will read/consume WorkItems from a generator (produced by
       
  1433   the generator factory). These WorkItems will then be pushed into the
       
  1434   work_queue. Note that reading will block if/when the work_queue becomes
       
  1435   full. Information on content consumed from the generator will be pushed
       
  1436   into the progress_queue.
       
  1437   """
       
  1438 
       
  1439   NAME = 'data source thread'
       
  1440 
       
  1441   def __init__(self,
       
  1442                work_queue,
       
  1443                progress_queue,
       
  1444                workitem_generator_factory,
       
  1445                progress_generator_factory):
       
  1446     """Initialize the DataSourceThread instance.
       
  1447 
       
  1448     Args:
       
  1449       work_queue: A queue containing WorkItems for processing.
       
  1450       progress_queue: A queue used for tracking progress information.
       
  1451       workitem_generator_factory: A factory that creates a WorkItem generator
       
  1452       progress_generator_factory: A factory that creates a generator which
       
  1453         produces prior progress status, or None if there is no prior status
       
  1454         to use.
       
  1455     """
       
  1456     _ThreadBase.__init__(self)
       
  1457 
       
  1458     self.work_queue = work_queue
       
  1459     self.progress_queue = progress_queue
       
  1460     self.workitem_generator_factory = workitem_generator_factory
       
  1461     self.progress_generator_factory = progress_generator_factory
       
  1462     self.entity_count = 0
       
  1463 
       
  1464   def PerformWork(self):
       
  1465     """Performs the work of a DataSourceThread."""
       
  1466     if self.progress_generator_factory:
       
  1467       progress_gen = self.progress_generator_factory()
       
  1468     else:
       
  1469       progress_gen = None
       
  1470 
       
  1471     content_gen = self.workitem_generator_factory(self.progress_queue,
       
  1472                                                   progress_gen)
       
  1473 
       
  1474     self.sent_count = 0
       
  1475     self.read_count = 0
       
  1476     self.read_all = False
       
  1477 
       
  1478     for item in content_gen.Batches():
       
  1479       item.MarkAsRead()
       
  1480 
       
  1481       while not self.exit_flag:
       
  1482         try:
       
  1483           self.work_queue.put(item, block=True, timeout=1.0)
       
  1484           self.entity_count += item.count
       
  1485           break
       
  1486         except Queue.Full:
       
  1487           pass
       
  1488 
       
  1489       if self.exit_flag:
       
  1490         break
       
  1491 
       
  1492     if not self.exit_flag:
       
  1493       self.read_all = True
       
  1494     self.read_count = content_gen.row_count
       
  1495     self.sent_count = content_gen.sent_count
       
  1496 
       
  1497 
       
  1498 
       
  1499 def _RunningInThread(thread):
       
  1500   """Return True if we are running within the specified thread."""
       
  1501   return threading.currentThread().getName() == thread.getName()
       
  1502 
       
  1503 
       
  1504 class ProgressDatabase(object):
       
  1505   """Persistently record all progress information during an upload.
       
  1506 
       
  1507   This class wraps a very simple SQLite database which records each of
       
  1508   the relevant details from the WorkItem instances. If the uploader is
       
  1509   resumed, then data is replayed out of the database.
       
  1510   """
       
  1511 
       
  1512   def __init__(self, db_filename, commit_periodicity=100):
       
  1513     """Initialize the ProgressDatabase instance.
       
  1514 
       
  1515     Args:
       
  1516       db_filename: The name of the SQLite database to use.
       
  1517       commit_periodicity: How many operations to perform between commits.
       
  1518     """
       
  1519     self.db_filename = db_filename
       
  1520 
       
  1521     logging.info('Using progress database: %s', db_filename)
       
  1522     self.primary_conn = sqlite3.connect(db_filename, isolation_level=None)
       
  1523     self.primary_thread = threading.currentThread()
       
  1524 
       
  1525     self.progress_conn = None
       
  1526     self.progress_thread = None
       
  1527 
       
  1528     self.operation_count = 0
       
  1529     self.commit_periodicity = commit_periodicity
       
  1530 
       
  1531     self.prior_key_end = None
       
  1532 
       
  1533     try:
       
  1534       self.primary_conn.execute(
       
  1535           """create table progress (
       
  1536           id integer primary key autoincrement,
       
  1537           state integer not null,
       
  1538           key_start integer not null,
       
  1539           key_end integer not null
       
  1540           )
       
  1541           """)
       
  1542     except sqlite3.OperationalError, e:
       
  1543       if 'already exists' not in e.message:
       
  1544         raise
       
  1545 
       
  1546     try:
       
  1547       self.primary_conn.execute('create index i_state on progress (state)')
       
  1548     except sqlite3.OperationalError, e:
       
  1549       if 'already exists' not in e.message:
       
  1550         raise
       
  1551 
       
  1552   def ThreadComplete(self):
       
  1553     """Finalize any operations the progress thread has performed.
       
  1554 
       
  1555     The database aggregates lots of operations into a single commit, and
       
  1556     this method is used to commit any pending operations as the thread
       
  1557     is about to shut down.
       
  1558     """
       
  1559     if self.progress_conn:
       
  1560       self._MaybeCommit(force_commit=True)
       
  1561 
       
  1562   def _MaybeCommit(self, force_commit=False):
       
  1563     """Periodically commit changes into the SQLite database.
       
  1564 
       
  1565     Committing every operation is quite expensive, and slows down the
       
  1566     operation of the script. Thus, we only commit after every N operations,
       
  1567     as determined by the self.commit_periodicity value. Optionally, the
       
  1568     caller can force a commit.
       
  1569 
       
  1570     Args:
       
  1571       force_commit: Pass True in order for a commit to occur regardless
       
  1572         of the current operation count.
       
  1573     """
       
  1574     self.operation_count += 1
       
  1575     if force_commit or (self.operation_count % self.commit_periodicity) == 0:
       
  1576       self.progress_conn.commit()
       
  1577 
       
  1578   def _OpenProgressConnection(self):
       
  1579     """Possibly open a database connection for the progress tracker thread.
       
  1580 
       
  1581     If the connection is not open (for the calling thread, which is assumed
       
  1582     to be the progress tracker thread), then open it. We also open a couple
       
  1583     cursors for later use (and reuse).
       
  1584     """
       
  1585     if self.progress_conn:
       
  1586       return
       
  1587 
       
  1588     assert not _RunningInThread(self.primary_thread)
       
  1589 
       
  1590     self.progress_thread = threading.currentThread()
       
  1591 
       
  1592     self.progress_conn = sqlite3.connect(self.db_filename)
       
  1593 
       
  1594     self.insert_cursor = self.progress_conn.cursor()
       
  1595     self.update_cursor = self.progress_conn.cursor()
       
  1596 
       
  1597   def HasUnfinishedWork(self):
       
  1598     """Returns True if the database has progress information.
       
  1599 
       
  1600     Note there are two basic cases for progress information:
       
  1601     1) All saved records indicate a successful upload. In this case, we
       
  1602        need to skip everything transmitted so far and then send the rest.
       
  1603     2) Some records for incomplete transfer are present. These need to be
       
  1604        sent again, and then we resume sending after all the successful
       
  1605        data.
       
  1606 
       
  1607     Returns:
       
  1608       True if the database has progress information, False otherwise.
       
  1609 
       
  1610     Raises:
       
  1611       ResumeError: If there is an error reading the progress database.
       
  1612     """
       
  1613     assert _RunningInThread(self.primary_thread)
       
  1614 
       
  1615     cursor = self.primary_conn.cursor()
       
  1616     cursor.execute('select count(*) from progress')
       
  1617     row = cursor.fetchone()
       
  1618     if row is None:
       
  1619       raise ResumeError('Error reading progress information.')
       
  1620 
       
  1621     return row[0] != 0
       
  1622 
       
  1623   def StoreKeys(self, key_start, key_end):
       
  1624     """Record a new progress record, returning a key for later updates.
       
  1625 
       
  1626     The specified progress information will be persisted into the database.
       
  1627     A unique key will be returned that identifies this progress state. The
       
  1628     key is later used to (quickly) update this record.
       
  1629 
       
  1630     For the progress resumption to proceed properly, calls to StoreKeys
       
  1631     MUST specify monotonically increasing key ranges. This will result in
       
  1632     a database whereby the ID, KEY_START, and KEY_END rows are all
       
  1633     increasing (rather than having ranges out of order).
       
  1634 
       
  1635     NOTE: the above precondition is NOT tested by this method (since it
       
  1636     would imply an additional table read or two on each invocation).
       
  1637 
       
  1638     Args:
       
  1639       key_start: The starting key of the WorkItem (inclusive)
       
  1640       key_end: The end key of the WorkItem (inclusive)
       
  1641 
       
  1642     Returns:
       
  1643       A string to later be used as a unique key to update this state.
       
  1644     """
       
  1645     self._OpenProgressConnection()
       
  1646 
       
  1647     assert _RunningInThread(self.progress_thread)
       
  1648     assert isinstance(key_start, int)
       
  1649     assert isinstance(key_end, int)
       
  1650     assert key_start <= key_end
       
  1651 
       
  1652     if self.prior_key_end is not None:
       
  1653       assert key_start > self.prior_key_end
       
  1654     self.prior_key_end = key_end
       
  1655 
       
  1656     self.insert_cursor.execute(
       
  1657         'insert into progress (state, key_start, key_end) values (?, ?, ?)',
       
  1658         (STATE_READ, key_start, key_end))
       
  1659 
       
  1660     progress_key = self.insert_cursor.lastrowid
       
  1661 
       
  1662     self._MaybeCommit()
       
  1663 
       
  1664     return progress_key
       
  1665 
       
  1666   def UpdateState(self, key, new_state):
       
  1667     """Update a specified progress record with new information.
       
  1668 
       
  1669     Args:
       
  1670       key: The key for this progress record, returned from StoreKeys
       
  1671       new_state: The new state to associate with this progress record.
       
  1672     """
       
  1673     self._OpenProgressConnection()
       
  1674 
       
  1675     assert _RunningInThread(self.progress_thread)
       
  1676     assert isinstance(new_state, int)
       
  1677 
       
  1678     self.update_cursor.execute('update progress set state=? where id=?',
       
  1679                                (new_state, key))
       
  1680 
       
  1681     self._MaybeCommit()
       
  1682 
       
  1683   def GetProgressStatusGenerator(self):
       
  1684     """Get a generator which returns progress information.
       
  1685 
       
  1686     The returned generator will yield a series of 4-tuples that specify
       
  1687     progress information about a prior run of the uploader. The 4-tuples
       
  1688     have the following values:
       
  1689 
       
  1690       progress_key: The unique key to later update this record with new
       
  1691                     progress information.
       
  1692       state: The last state saved for this progress record.
       
  1693       key_start: The starting key of the items for uploading (inclusive).
       
  1694       key_end: The ending key of the items for uploading (inclusive).
       
  1695 
       
  1696     After all incompletely-transferred records are provided, then one
       
  1697     more 4-tuple will be generated:
       
  1698 
       
  1699       None
       
  1700       DATA_CONSUMED_TO_HERE: A unique string value indicating this record
       
  1701                              is being provided.
       
  1702       None
       
  1703       key_end: An integer value specifying the last data source key that
       
  1704                was handled by the previous run of the uploader.
       
  1705 
       
  1706     The caller should begin uploading records which occur after key_end.
       
  1707 
       
  1708     Yields:
       
  1709       Progress information as tuples (progress_key, state, key_start, key_end).
       
  1710     """
       
  1711     conn = sqlite3.connect(self.db_filename, isolation_level=None)
       
  1712     cursor = conn.cursor()
       
  1713 
       
  1714     cursor.execute('select max(id) from progress')
       
  1715     batch_id = cursor.fetchone()[0]
       
  1716 
       
  1717     cursor.execute('select key_end from progress where id = ?', (batch_id,))
       
  1718     key_end = cursor.fetchone()[0]
       
  1719 
       
  1720     self.prior_key_end = key_end
       
  1721 
       
  1722     cursor.execute(
       
  1723         'select id, state, key_start, key_end from progress'
       
  1724         '  where state != ?'
       
  1725         '  order by id',
       
  1726         (STATE_SENT,))
       
  1727 
       
  1728     rows = cursor.fetchall()
       
  1729 
       
  1730     for row in rows:
       
  1731       if row is None:
       
  1732         break
       
  1733 
       
  1734       yield row
       
  1735 
       
  1736     yield None, DATA_CONSUMED_TO_HERE, None, key_end
       
  1737 
       
  1738 
       
  1739 class StubProgressDatabase(object):
       
  1740   """A stub implementation of ProgressDatabase which does nothing."""
       
  1741 
       
  1742   def HasUnfinishedWork(self):
       
  1743     """Whether the stub database has progress information (it doesn't)."""
       
  1744     return False
       
  1745 
       
  1746   def StoreKeys(self, unused_key_start, unused_key_end):
       
  1747     """Pretend to store a key in the stub database."""
       
  1748     return 'fake-key'
       
  1749 
       
  1750   def UpdateState(self, unused_key, unused_new_state):
       
  1751     """Pretend to update the state of a progress item."""
       
  1752     pass
       
  1753 
       
  1754   def ThreadComplete(self):
       
  1755     """Finalize operations on the stub database (i.e. do nothing)."""
       
  1756     pass
       
  1757 
       
  1758 
       
  1759 class ProgressTrackerThread(_ThreadBase):
       
  1760   """A thread which records progress information for the upload process.
       
  1761 
       
  1762   The progress information is stored into the provided progress database.
       
  1763   This class is not responsible for replaying a prior run's progress
       
  1764   information out of the database. Separate mechanisms must be used to
       
  1765   resume a prior upload attempt.
       
  1766   """
       
  1767 
       
  1768   NAME = 'progress tracking thread'
       
  1769 
       
  1770   def __init__(self, progress_queue, progress_db):
       
  1771     """Initialize the ProgressTrackerThread instance.
       
  1772 
       
  1773     Args:
       
  1774       progress_queue: A Queue used for tracking progress information.
       
  1775       progress_db: The database for tracking progress information; should
       
  1776         be an instance of ProgressDatabase.
       
  1777     """
       
  1778     _ThreadBase.__init__(self)
       
  1779 
       
  1780     self.progress_queue = progress_queue
       
  1781     self.db = progress_db
       
  1782     self.entities_sent = 0
       
  1783 
       
  1784   def PerformWork(self):
       
  1785     """Performs the work of a ProgressTrackerThread."""
       
  1786     while not self.exit_flag:
       
  1787       try:
       
  1788         item = self.progress_queue.get(block=True, timeout=1.0)
       
  1789       except Queue.Empty:
       
  1790         continue
       
  1791       if item == _THREAD_SHOULD_EXIT:
       
  1792         break
       
  1793 
       
  1794       if item.state == STATE_READ and item.progress_key is None:
       
  1795         item.progress_key = self.db.StoreKeys(item.key_start, item.key_end)
       
  1796       else:
       
  1797         assert item.progress_key is not None
       
  1798 
       
  1799         self.db.UpdateState(item.progress_key, item.state)
       
  1800         if item.state == STATE_SENT:
       
  1801           self.entities_sent += item.count
       
  1802 
       
  1803       item.progress_event.set()
       
  1804 
       
  1805       self.progress_queue.task_done()
       
  1806 
       
  1807     self.db.ThreadComplete()
       
  1808 
       
  1809 
       
  1810 
       
  1811 def Validate(value, typ):
       
  1812   """Checks that value is non-empty and of the right type.
       
  1813 
       
  1814   Args:
       
  1815     value: any value
       
  1816     typ: a type or tuple of types
       
  1817 
       
  1818   Raises:
       
  1819     ValueError if value is None or empty.
       
  1820     TypeError if it's not the given type.
       
  1821 
       
  1822   """
       
  1823   if not value:
       
  1824     raise ValueError('Value should not be empty; received %s.' % value)
       
  1825   elif not isinstance(value, typ):
       
  1826     raise TypeError('Expected a %s, but received %s (a %s).' %
       
  1827                     (typ, value, value.__class__))
       
  1828 
       
  1829 
       
  1830 class Loader(object):
       
  1831   """A base class for creating datastore entities from input data.
       
  1832 
       
  1833   To add a handler for bulk loading a new entity kind into your datastore,
       
  1834   write a subclass of this class that calls Loader.__init__ from your
       
  1835   class's __init__.
       
  1836 
       
  1837   If you need to run extra code to convert entities from the input
       
  1838   data, create new properties, or otherwise modify the entities before
       
  1839   they're inserted, override HandleEntity.
       
  1840 
       
  1841   See the CreateEntity method for the creation of entities from the
       
  1842   (parsed) input data.
       
  1843   """
       
  1844 
       
  1845   __loaders = {}
       
  1846   __kind = None
       
  1847   __properties = None
       
  1848 
       
  1849   def __init__(self, kind, properties):
       
  1850     """Constructor.
       
  1851 
       
  1852     Populates this Loader's kind and properties map. Also registers it with
       
  1853     the bulk loader, so that all you need to do is instantiate your Loader,
       
  1854     and the bulkload handler will automatically use it.
       
  1855 
       
  1856     Args:
       
  1857       kind: a string containing the entity kind that this loader handles
       
  1858 
       
  1859       properties: list of (name, converter) tuples.
       
  1860 
       
  1861         This is used to automatically convert the CSV columns into
       
  1862         properties.  The converter should be a function that takes one
       
  1863         argument, a string value from the CSV file, and returns a
       
  1864         correctly typed property value that should be inserted. The
       
  1865         tuples in this list should match the columns in your CSV file,
       
  1866         in order.
       
  1867 
       
  1868         For example:
       
  1869           [('name', str),
       
  1870            ('id_number', int),
       
  1871            ('email', datastore_types.Email),
       
  1872            ('user', users.User),
       
  1873            ('birthdate', lambda x: datetime.datetime.fromtimestamp(float(x))),
       
  1874            ('description', datastore_types.Text),
       
  1875            ]
       
  1876     """
       
  1877     Validate(kind, basestring)
       
  1878     self.__kind = kind
       
  1879 
       
  1880     db.class_for_kind(kind)
       
  1881 
       
  1882     Validate(properties, list)
       
  1883     for name, fn in properties:
       
  1884       Validate(name, basestring)
       
  1885       assert callable(fn), (
       
  1886         'Conversion function %s for property %s is not callable.' % (fn, name))
       
  1887 
       
  1888     self.__properties = properties
       
  1889 
       
  1890   @staticmethod
       
  1891   def RegisterLoader(loader):
       
  1892 
       
  1893     Loader.__loaders[loader.__kind] = loader
       
  1894 
       
  1895   def kind(self):
       
  1896     """ Return the entity kind that this Loader handes.
       
  1897     """
       
  1898     return self.__kind
       
  1899 
       
  1900   def CreateEntity(self, values, key_name=None):
       
  1901     """Creates a entity from a list of property values.
       
  1902 
       
  1903     Args:
       
  1904       values: list/tuple of str
       
  1905       key_name: if provided, the name for the (single) resulting entity
       
  1906 
       
  1907     Returns:
       
  1908       list of db.Model
       
  1909 
       
  1910       The returned entities are populated with the property values from the
       
  1911       argument, converted to native types using the properties map given in
       
  1912       the constructor, and passed through HandleEntity. They're ready to be
       
  1913       inserted.
       
  1914 
       
  1915     Raises:
       
  1916       AssertionError if the number of values doesn't match the number
       
  1917         of properties in the properties map.
       
  1918       ValueError if any element of values is None or empty.
       
  1919       TypeError if values is not a list or tuple.
       
  1920     """
       
  1921     Validate(values, (list, tuple))
       
  1922     assert len(values) == len(self.__properties), (
       
  1923       'Expected %d CSV columns, found %d.' %
       
  1924       (len(self.__properties), len(values)))
       
  1925 
       
  1926     model_class = db.class_for_kind(self.__kind)
       
  1927 
       
  1928     properties = {'key_name': key_name}
       
  1929     for (name, converter), val in zip(self.__properties, values):
       
  1930       if converter is bool and val.lower() in ('0', 'false', 'no'):
       
  1931           val = False
       
  1932       properties[name] = converter(val)
       
  1933 
       
  1934     entity = model_class(**properties)
       
  1935     entities = self.HandleEntity(entity)
       
  1936 
       
  1937     if entities:
       
  1938       if not isinstance(entities, (list, tuple)):
       
  1939         entities = [entities]
       
  1940 
       
  1941       for entity in entities:
       
  1942         if not isinstance(entity, db.Model):
       
  1943           raise TypeError('Expected a db.Model, received %s (a %s).' %
       
  1944                           (entity, entity.__class__))
       
  1945 
       
  1946     return entities
       
  1947 
       
  1948   def GenerateKey(self, i, values):
       
  1949     """Generates a key_name to be used in creating the underlying object.
       
  1950 
       
  1951     The default implementation returns None.
       
  1952 
       
  1953     This method can be overridden to control the key generation for
       
  1954     uploaded entities. The value returned should be None (to use a
       
  1955     server generated numeric key), or a string which neither starts
       
  1956     with a digit nor has the form __*__. (See
       
  1957     http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html)
       
  1958 
       
  1959     If you generate your own string keys, keep in mind:
       
  1960 
       
  1961     1. The key name for each entity must be unique.
       
  1962     2. If an entity of the same kind and key already exists in the
       
  1963        datastore, it will be overwritten.
       
  1964 
       
  1965     Args:
       
  1966       i: Number corresponding to this object (assume it's run in a loop,
       
  1967         this is your current count.
       
  1968       values: list/tuple of str.
       
  1969 
       
  1970     Returns:
       
  1971       A string to be used as the key_name for an entity.
       
  1972     """
       
  1973     return None
       
  1974 
       
  1975   def HandleEntity(self, entity):
       
  1976     """Subclasses can override this to add custom entity conversion code.
       
  1977 
       
  1978     This is called for each entity, after its properties are populated from
       
  1979     CSV but before it is stored. Subclasses can override this to add custom
       
  1980     entity handling code.
       
  1981 
       
  1982     The entity to be inserted should be returned. If multiple entities should
       
  1983     be inserted, return a list of entities. If no entities should be inserted,
       
  1984     return None or [].
       
  1985 
       
  1986     Args:
       
  1987       entity: db.Model
       
  1988 
       
  1989     Returns:
       
  1990       db.Model or list of db.Model
       
  1991     """
       
  1992     return entity
       
  1993 
       
  1994 
       
  1995   @staticmethod
       
  1996   def RegisteredLoaders():
       
  1997     """Returns a list of the Loader instances that have been created.
       
  1998     """
       
  1999     return dict(Loader.__loaders)
       
  2000 
       
  2001 
       
  2002 class QueueJoinThread(threading.Thread):
       
  2003   """A thread that joins a queue and exits.
       
  2004 
       
  2005   Queue joins do not have a timeout.  To simulate a queue join with
       
  2006   timeout, run this thread and join it with a timeout.
       
  2007   """
       
  2008 
       
  2009   def __init__(self, queue):
       
  2010     """Initialize a QueueJoinThread.
       
  2011 
       
  2012     Args:
       
  2013       queue: The queue for this thread to join.
       
  2014     """
       
  2015     threading.Thread.__init__(self)
       
  2016     assert isinstance(queue, (Queue.Queue, ReQueue))
       
  2017     self.queue = queue
       
  2018 
       
  2019   def run(self):
       
  2020     """Perform the queue join in this thread."""
       
  2021     self.queue.join()
       
  2022 
       
  2023 
       
  2024 def InterruptibleQueueJoin(queue,
       
  2025                            thread_local,
       
  2026                            thread_gate,
       
  2027                            queue_join_thread_factory=QueueJoinThread):
       
  2028   """Repeatedly joins the given ReQueue or Queue.Queue with short timeout.
       
  2029 
       
  2030   Between each timeout on the join, worker threads are checked.
       
  2031 
       
  2032   Args:
       
  2033     queue: A Queue.Queue or ReQueue instance.
       
  2034     thread_local: A threading.local instance which indicates interrupts.
       
  2035     thread_gate: A ThreadGate instance.
       
  2036     queue_join_thread_factory: Used for dependency injection.
       
  2037 
       
  2038   Returns:
       
  2039     True unless the queue join is interrupted by SIGINT or worker death.
       
  2040   """
       
  2041   thread = queue_join_thread_factory(queue)
       
  2042   thread.start()
       
  2043   while True:
       
  2044     thread.join(timeout=.5)
       
  2045     if not thread.isAlive():
       
  2046       return True
       
  2047     if thread_local.shut_down:
       
  2048       logging.debug('Queue join interrupted')
       
  2049       return False
       
  2050     for worker_thread in thread_gate.Threads():
       
  2051       if not worker_thread.isAlive():
       
  2052         return False
       
  2053 
       
  2054 
       
  2055 def ShutdownThreads(data_source_thread, work_queue, thread_gate):
       
  2056   """Shuts down the worker and data source threads.
       
  2057 
       
  2058   Args:
       
  2059     data_source_thread: A running DataSourceThread instance.
       
  2060     work_queue: The work queue.
       
  2061     thread_gate: A ThreadGate instance with workers registered.
       
  2062   """
       
  2063   logging.info('An error occurred. Shutting down...')
       
  2064 
       
  2065   data_source_thread.exit_flag = True
       
  2066 
       
  2067   for thread in thread_gate.Threads():
       
  2068     thread.exit_flag = True
       
  2069 
       
  2070   for unused_thread in thread_gate.Threads():
       
  2071     thread_gate.EnableThread()
       
  2072 
       
  2073   data_source_thread.join(timeout=3.0)
       
  2074   if data_source_thread.isAlive():
       
  2075     logging.warn('%s hung while trying to exit',
       
  2076                  data_source_thread.GetFriendlyName())
       
  2077 
       
  2078   while not work_queue.empty():
       
  2079     try:
       
  2080       unused_item = work_queue.get_nowait()
       
  2081       work_queue.task_done()
       
  2082     except Queue.Empty:
       
  2083       pass
       
  2084 
       
  2085 
       
  2086 def PerformBulkUpload(app_id,
       
  2087                       post_url,
       
  2088                       kind,
       
  2089                       workitem_generator_factory,
       
  2090                       num_threads,
       
  2091                       throttle,
       
  2092                       progress_db,
       
  2093                       max_queue_size=DEFAULT_QUEUE_SIZE,
       
  2094                       request_manager_factory=RequestManager,
       
  2095                       bulkloaderthread_factory=BulkLoaderThread,
       
  2096                       progresstrackerthread_factory=ProgressTrackerThread,
       
  2097                       datasourcethread_factory=DataSourceThread,
       
  2098                       work_queue_factory=ReQueue,
       
  2099                       progress_queue_factory=Queue.Queue):
       
  2100   """Uploads data into an application using a series of HTTP POSTs.
       
  2101 
       
  2102   This function will spin up a number of threads to read entities from
       
  2103   the data source, pass those to a number of worker ("uploader") threads
       
  2104   for sending to the application, and track all of the progress in a
       
  2105   small database in case an error or pause/termination requires a
       
  2106   restart/resumption of the upload process.
       
  2107 
       
  2108   Args:
       
  2109     app_id: String containing application id.
       
  2110     post_url: URL to post the Entity data to.
       
  2111     kind: Kind of the Entity records being posted.
       
  2112     workitem_generator_factory: A factory that creates a WorkItem generator.
       
  2113     num_threads: How many uploader threads should be created.
       
  2114     throttle: A Throttle instance.
       
  2115     progress_db: The database to use for replaying/recording progress.
       
  2116     max_queue_size: Maximum size of the queues before they should block.
       
  2117     request_manager_factory: Used for dependency injection.
       
  2118     bulkloaderthread_factory: Used for dependency injection.
       
  2119     progresstrackerthread_factory: Used for dependency injection.
       
  2120     datasourcethread_factory: Used for dependency injection.
       
  2121     work_queue_factory: Used for dependency injection.
       
  2122     progress_queue_factory: Used for dependency injection.
       
  2123 
       
  2124   Raises:
       
  2125     AuthenticationError: If authentication is required and fails.
       
  2126   """
       
  2127   thread_gate = ThreadGate(True)
       
  2128 
       
  2129   (unused_scheme,
       
  2130    host_port, url_path,
       
  2131    unused_query, unused_fragment) = urlparse.urlsplit(post_url)
       
  2132 
       
  2133   work_queue = work_queue_factory(max_queue_size)
       
  2134   progress_queue = progress_queue_factory(max_queue_size)
       
  2135   request_manager = request_manager_factory(app_id,
       
  2136                                             host_port,
       
  2137                                             url_path,
       
  2138                                             kind,
       
  2139                                             throttle)
       
  2140 
       
  2141   throttle.Register(threading.currentThread())
       
  2142   try:
       
  2143     request_manager.Authenticate()
       
  2144   except Exception, e:
       
  2145     logging.exception(e)
       
  2146     raise AuthenticationError('Authentication failed')
       
  2147   if (request_manager.credentials is not None and
       
  2148       not request_manager.authenticated):
       
  2149     raise AuthenticationError('Authentication failed')
       
  2150 
       
  2151   for unused_idx in range(num_threads):
       
  2152     thread = bulkloaderthread_factory(work_queue,
       
  2153                                       throttle,
       
  2154                                       thread_gate,
       
  2155                                       request_manager)
       
  2156     throttle.Register(thread)
       
  2157     thread_gate.Register(thread)
       
  2158 
       
  2159   progress_thread = progresstrackerthread_factory(progress_queue, progress_db)
       
  2160 
       
  2161   if progress_db.HasUnfinishedWork():
       
  2162     logging.debug('Restarting upload using progress database')
       
  2163     progress_generator_factory = progress_db.GetProgressStatusGenerator
       
  2164   else:
       
  2165     progress_generator_factory = None
       
  2166 
       
  2167   data_source_thread = datasourcethread_factory(work_queue,
       
  2168                                                 progress_queue,
       
  2169                                                 workitem_generator_factory,
       
  2170                                                 progress_generator_factory)
       
  2171 
       
  2172   thread_local = threading.local()
       
  2173   thread_local.shut_down = False
       
  2174 
       
  2175   def Interrupt(unused_signum, unused_frame):
       
  2176     """Shutdown gracefully in response to a signal."""
       
  2177     thread_local.shut_down = True
       
  2178 
       
  2179   signal.signal(signal.SIGINT, Interrupt)
       
  2180 
       
  2181   progress_thread.start()
       
  2182   data_source_thread.start()
       
  2183   for thread in thread_gate.Threads():
       
  2184     thread.start()
       
  2185 
       
  2186 
       
  2187   while not thread_local.shut_down:
       
  2188     data_source_thread.join(timeout=0.25)
       
  2189 
       
  2190     if data_source_thread.isAlive():
       
  2191       for thread in list(thread_gate.Threads()) + [progress_thread]:
       
  2192         if not thread.isAlive():
       
  2193           logging.info('Unexpected thread death: %s', thread.getName())
       
  2194           thread_local.shut_down = True
       
  2195           break
       
  2196     else:
       
  2197       break
       
  2198 
       
  2199   if thread_local.shut_down:
       
  2200     ShutdownThreads(data_source_thread, work_queue, thread_gate)
       
  2201 
       
  2202   def _Join(ob, msg):
       
  2203     logging.debug('Waiting for %s...', msg)
       
  2204     if isinstance(ob, threading.Thread):
       
  2205       ob.join(timeout=3.0)
       
  2206       if ob.isAlive():
       
  2207         logging.debug('Joining %s failed', ob.GetFriendlyName())
       
  2208       else:
       
  2209         logging.debug('... done.')
       
  2210     elif isinstance(ob, (Queue.Queue, ReQueue)):
       
  2211       if not InterruptibleQueueJoin(ob, thread_local, thread_gate):
       
  2212         ShutdownThreads(data_source_thread, work_queue, thread_gate)
       
  2213     else:
       
  2214       ob.join()
       
  2215       logging.debug('... done.')
       
  2216 
       
  2217   _Join(work_queue, 'work_queue to flush')
       
  2218 
       
  2219   for unused_thread in thread_gate.Threads():
       
  2220     work_queue.put(_THREAD_SHOULD_EXIT)
       
  2221 
       
  2222   for unused_thread in thread_gate.Threads():
       
  2223     thread_gate.EnableThread()
       
  2224 
       
  2225   for thread in thread_gate.Threads():
       
  2226     _Join(thread, 'thread [%s] to terminate' % thread.getName())
       
  2227 
       
  2228     thread.CheckError()
       
  2229 
       
  2230   if progress_thread.isAlive():
       
  2231     _Join(progress_queue, 'progress_queue to finish')
       
  2232   else:
       
  2233     logging.warn('Progress thread exited prematurely')
       
  2234 
       
  2235   progress_queue.put(_THREAD_SHOULD_EXIT)
       
  2236   _Join(progress_thread, 'progress_thread to terminate')
       
  2237   progress_thread.CheckError()
       
  2238 
       
  2239   data_source_thread.CheckError()
       
  2240 
       
  2241   total_up, duration = throttle.TotalTransferred(BANDWIDTH_UP)
       
  2242   s_total_up, unused_duration = throttle.TotalTransferred(HTTPS_BANDWIDTH_UP)
       
  2243   total_up += s_total_up
       
  2244   logging.info('%d entites read, %d previously transferred',
       
  2245                data_source_thread.read_count,
       
  2246                data_source_thread.sent_count)
       
  2247   logging.info('%d entities (%d bytes) transferred in %.1f seconds',
       
  2248                progress_thread.entities_sent, total_up, duration)
       
  2249   if (data_source_thread.read_all and
       
  2250       progress_thread.entities_sent + data_source_thread.sent_count >=
       
  2251       data_source_thread.read_count):
       
  2252     logging.info('All entities successfully uploaded')
       
  2253   else:
       
  2254     logging.info('Some entities not successfully uploaded')
       
  2255 
       
  2256 
       
  2257 def PrintUsageExit(code):
       
  2258   """Prints usage information and exits with a status code.
       
  2259 
       
  2260   Args:
       
  2261     code: Status code to pass to sys.exit() after displaying usage information.
       
  2262   """
       
  2263   print __doc__ % {'arg0': sys.argv[0]}
       
  2264   sys.stdout.flush()
       
  2265   sys.stderr.flush()
       
  2266   sys.exit(code)
       
  2267 
       
  2268 
       
  2269 def ParseArguments(argv):
       
  2270   """Parses command-line arguments.
       
  2271 
       
  2272   Prints out a help message if -h or --help is supplied.
       
  2273 
       
  2274   Args:
       
  2275     argv: List of command-line arguments.
       
  2276 
       
  2277   Returns:
       
  2278     Tuple (url, filename, cookie, batch_size, kind) containing the values from
       
  2279     each corresponding command-line flag.
       
  2280   """
       
  2281   opts, unused_args = getopt.getopt(
       
  2282       argv[1:],
       
  2283       'h',
       
  2284       ['debug',
       
  2285        'help',
       
  2286        'url=',
       
  2287        'filename=',
       
  2288        'batch_size=',
       
  2289        'kind=',
       
  2290        'num_threads=',
       
  2291        'bandwidth_limit=',
       
  2292        'rps_limit=',
       
  2293        'http_limit=',
       
  2294        'db_filename=',
       
  2295        'app_id=',
       
  2296        'config_file=',
       
  2297        'auth_domain=',
       
  2298       ])
       
  2299 
       
  2300   url = None
       
  2301   filename = None
       
  2302   batch_size = DEFAULT_BATCH_SIZE
       
  2303   kind = None
       
  2304   num_threads = DEFAULT_THREAD_COUNT
       
  2305   bandwidth_limit = DEFAULT_BANDWIDTH_LIMIT
       
  2306   rps_limit = DEFAULT_RPS_LIMIT
       
  2307   http_limit = DEFAULT_REQUEST_LIMIT
       
  2308   db_filename = None
       
  2309   app_id = None
       
  2310   config_file = None
       
  2311   auth_domain = 'gmail.com'
       
  2312 
       
  2313   for option, value in opts:
       
  2314     if option == '--debug':
       
  2315       logging.getLogger().setLevel(logging.DEBUG)
       
  2316     elif option in ('-h', '--help'):
       
  2317       PrintUsageExit(0)
       
  2318     elif option == '--url':
       
  2319       url = value
       
  2320     elif option == '--filename':
       
  2321       filename = value
       
  2322     elif option == '--batch_size':
       
  2323       batch_size = int(value)
       
  2324     elif option == '--kind':
       
  2325       kind = value
       
  2326     elif option == '--num_threads':
       
  2327       num_threads = int(value)
       
  2328     elif option == '--bandwidth_limit':
       
  2329       bandwidth_limit = int(value)
       
  2330     elif option == '--rps_limit':
       
  2331       rps_limit = int(value)
       
  2332     elif option == '--http_limit':
       
  2333       http_limit = int(value)
       
  2334     elif option == '--db_filename':
       
  2335       db_filename = value
       
  2336     elif option == '--app_id':
       
  2337       app_id = value
       
  2338     elif option == '--config_file':
       
  2339       config_file = value
       
  2340     elif option == '--auth_domain':
       
  2341       auth_domain = value
       
  2342 
       
  2343   return ProcessArguments(app_id=app_id,
       
  2344                           url=url,
       
  2345                           filename=filename,
       
  2346                           batch_size=batch_size,
       
  2347                           kind=kind,
       
  2348                           num_threads=num_threads,
       
  2349                           bandwidth_limit=bandwidth_limit,
       
  2350                           rps_limit=rps_limit,
       
  2351                           http_limit=http_limit,
       
  2352                           db_filename=db_filename,
       
  2353                           config_file=config_file,
       
  2354                           auth_domain=auth_domain,
       
  2355                           die_fn=lambda: PrintUsageExit(1))
       
  2356 
       
  2357 
       
  2358 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
       
  2359   return {
       
  2360       BANDWIDTH_UP: bandwidth_limit,
       
  2361       BANDWIDTH_DOWN: bandwidth_limit,
       
  2362       REQUESTS: http_limit,
       
  2363       HTTPS_BANDWIDTH_UP: bandwidth_limit / 5,
       
  2364       HTTPS_BANDWIDTH_DOWN: bandwidth_limit / 5,
       
  2365       HTTPS_REQUESTS: http_limit / 5,
       
  2366       RECORDS: rps_limit,
       
  2367   }
       
  2368 
       
  2369 
       
  2370 def LoadConfig(config_file):
       
  2371   """Loads a config file and registers any Loader classes present."""
       
  2372   if config_file:
       
  2373     global_dict = dict(globals())
       
  2374     execfile(config_file, global_dict)
       
  2375     for cls in Loader.__subclasses__():
       
  2376       Loader.RegisterLoader(cls())
       
  2377 
       
  2378 
       
  2379 def _MissingArgument(arg_name, die_fn):
       
  2380   """Print error message about missing argument and die."""
       
  2381   print >>sys.stderr, '%s argument required' % arg_name
       
  2382   die_fn()
       
  2383 
       
  2384 
       
  2385 def ProcessArguments(app_id=None,
       
  2386                      url=None,
       
  2387                      filename=None,
       
  2388                      batch_size=DEFAULT_BATCH_SIZE,
       
  2389                      kind=None,
       
  2390                      num_threads=DEFAULT_THREAD_COUNT,
       
  2391                      bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
       
  2392                      rps_limit=DEFAULT_RPS_LIMIT,
       
  2393                      http_limit=DEFAULT_REQUEST_LIMIT,
       
  2394                      db_filename=None,
       
  2395                      config_file=None,
       
  2396                      auth_domain='gmail.com',
       
  2397                      die_fn=lambda: sys.exit(1)):
       
  2398   """Processes non command-line input arguments."""
       
  2399   if db_filename is None:
       
  2400     db_filename = time.strftime('bulkloader-progress-%Y%m%d.%H%M%S.sql3')
       
  2401 
       
  2402   if batch_size <= 0:
       
  2403     print >>sys.stderr, 'batch_size must be 1 or larger'
       
  2404     die_fn()
       
  2405 
       
  2406   if url is None:
       
  2407     _MissingArgument('url', die_fn)
       
  2408 
       
  2409   if filename is None:
       
  2410     _MissingArgument('filename', die_fn)
       
  2411 
       
  2412   if kind is None:
       
  2413     _MissingArgument('kind', die_fn)
       
  2414 
       
  2415   if config_file is None:
       
  2416     _MissingArgument('config_file', die_fn)
       
  2417 
       
  2418   if app_id is None:
       
  2419     (unused_scheme, host_port, unused_url_path,
       
  2420      unused_query, unused_fragment) = urlparse.urlsplit(url)
       
  2421     suffix_idx = host_port.find('.appspot.com')
       
  2422     if suffix_idx > -1:
       
  2423       app_id = host_port[:suffix_idx]
       
  2424     elif host_port.split(':')[0].endswith('google.com'):
       
  2425       app_id = host_port.split('.')[0]
       
  2426     else:
       
  2427       print >>sys.stderr, 'app_id required for non appspot.com domains'
       
  2428       die_fn()
       
  2429 
       
  2430   return (app_id, url, filename, batch_size, kind, num_threads,
       
  2431           bandwidth_limit, rps_limit, http_limit, db_filename, config_file,
       
  2432           auth_domain)
       
  2433 
       
  2434 
       
  2435 def _PerformBulkload(app_id=None,
       
  2436                      url=None,
       
  2437                      filename=None,
       
  2438                      batch_size=DEFAULT_BATCH_SIZE,
       
  2439                      kind=None,
       
  2440                      num_threads=DEFAULT_THREAD_COUNT,
       
  2441                      bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
       
  2442                      rps_limit=DEFAULT_RPS_LIMIT,
       
  2443                      http_limit=DEFAULT_REQUEST_LIMIT,
       
  2444                      db_filename=None,
       
  2445                      config_file=None,
       
  2446                      auth_domain='gmail.com'):
       
  2447   """Runs the bulkloader, given the options as keyword arguments.
       
  2448 
       
  2449   Args:
       
  2450     app_id: The application id.
       
  2451     url: The url of the remote_api endpoint.
       
  2452     filename: The name of the file containing the CSV data.
       
  2453     batch_size: The number of records to send per request.
       
  2454     kind: The kind of entity to transfer.
       
  2455     num_threads: The number of threads to use to transfer data.
       
  2456     bandwidth_limit: Maximum bytes/second to transfers.
       
  2457     rps_limit: Maximum records/second to transfer.
       
  2458     http_limit: Maximum requests/second for transfers.
       
  2459     db_filename: The name of the SQLite3 progress database file.
       
  2460     config_file: The name of the configuration file.
       
  2461     auth_domain: The auth domain to use for logins and UserProperty.
       
  2462 
       
  2463   Returns:
       
  2464     An exit code.
       
  2465   """
       
  2466   os.environ['AUTH_DOMAIN'] = auth_domain
       
  2467   LoadConfig(config_file)
       
  2468 
       
  2469   throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
       
  2470 
       
  2471   throttle = Throttle(layout=throttle_layout)
       
  2472 
       
  2473 
       
  2474   workitem_generator_factory = GetCSVGeneratorFactory(filename, batch_size)
       
  2475 
       
  2476   if db_filename == 'skip':
       
  2477     progress_db = StubProgressDatabase()
       
  2478   else:
       
  2479     progress_db = ProgressDatabase(db_filename)
       
  2480 
       
  2481 
       
  2482   max_queue_size = max(DEFAULT_QUEUE_SIZE, 2 * num_threads + 5)
       
  2483 
       
  2484   PerformBulkUpload(app_id,
       
  2485                     url,
       
  2486                     kind,
       
  2487                     workitem_generator_factory,
       
  2488                     num_threads,
       
  2489                     throttle,
       
  2490                     progress_db,
       
  2491                     max_queue_size=max_queue_size)
       
  2492 
       
  2493   return 0
       
  2494 
       
  2495 
       
  2496 def Run(app_id=None,
       
  2497         url=None,
       
  2498         filename=None,
       
  2499         batch_size=DEFAULT_BATCH_SIZE,
       
  2500         kind=None,
       
  2501         num_threads=DEFAULT_THREAD_COUNT,
       
  2502         bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
       
  2503         rps_limit=DEFAULT_RPS_LIMIT,
       
  2504         http_limit=DEFAULT_REQUEST_LIMIT,
       
  2505         db_filename=None,
       
  2506         auth_domain='gmail.com',
       
  2507         config_file=None):
       
  2508   """Sets up and runs the bulkloader, given the options as keyword arguments.
       
  2509 
       
  2510   Args:
       
  2511     app_id: The application id.
       
  2512     url: The url of the remote_api endpoint.
       
  2513     filename: The name of the file containing the CSV data.
       
  2514     batch_size: The number of records to send per request.
       
  2515     kind: The kind of entity to transfer.
       
  2516     num_threads: The number of threads to use to transfer data.
       
  2517     bandwidth_limit: Maximum bytes/second to transfers.
       
  2518     rps_limit: Maximum records/second to transfer.
       
  2519     http_limit: Maximum requests/second for transfers.
       
  2520     db_filename: The name of the SQLite3 progress database file.
       
  2521     config_file: The name of the configuration file.
       
  2522     auth_domain: The auth domain to use for logins and UserProperty.
       
  2523 
       
  2524   Returns:
       
  2525     An exit code.
       
  2526   """
       
  2527   logging.basicConfig(
       
  2528       format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')
       
  2529   args = ProcessArguments(app_id=app_id,
       
  2530                           url=url,
       
  2531                           filename=filename,
       
  2532                           batch_size=batch_size,
       
  2533                           kind=kind,
       
  2534                           num_threads=num_threads,
       
  2535                           bandwidth_limit=bandwidth_limit,
       
  2536                           rps_limit=rps_limit,
       
  2537                           http_limit=http_limit,
       
  2538                           db_filename=db_filename,
       
  2539                           config_file=config_file)
       
  2540 
       
  2541   (app_id, url, filename, batch_size, kind, num_threads, bandwidth_limit,
       
  2542    rps_limit, http_limit, db_filename, config_file, auth_domain) = args
       
  2543 
       
  2544   return _PerformBulkload(app_id=app_id,
       
  2545                           url=url,
       
  2546                           filename=filename,
       
  2547                           batch_size=batch_size,
       
  2548                           kind=kind,
       
  2549                           num_threads=num_threads,
       
  2550                           bandwidth_limit=bandwidth_limit,
       
  2551                           rps_limit=rps_limit,
       
  2552                           http_limit=http_limit,
       
  2553                           db_filename=db_filename,
       
  2554                           config_file=config_file,
       
  2555                           auth_domain=auth_domain)
       
  2556 
       
  2557 
       
  2558 def main(argv):
       
  2559   """Runs the importer from the command line."""
       
  2560   logging.basicConfig(
       
  2561       level=logging.INFO,
       
  2562       format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')
       
  2563 
       
  2564   args = ParseArguments(argv)
       
  2565   if None in args:
       
  2566     print >>sys.stderr, 'Invalid arguments'
       
  2567     PrintUsageExit(1)
       
  2568 
       
  2569   (app_id, url, filename, batch_size, kind, num_threads,
       
  2570    bandwidth_limit, rps_limit, http_limit, db_filename, config_file,
       
  2571    auth_domain) = args
       
  2572 
       
  2573   return _PerformBulkload(app_id=app_id,
       
  2574                           url=url,
       
  2575                           filename=filename,
       
  2576                           batch_size=batch_size,
       
  2577                           kind=kind,
       
  2578                           num_threads=num_threads,
       
  2579                           bandwidth_limit=bandwidth_limit,
       
  2580                           rps_limit=rps_limit,
       
  2581                           http_limit=http_limit,
       
  2582                           db_filename=db_filename,
       
  2583                           config_file=config_file,
       
  2584                           auth_domain=auth_domain)
       
  2585 
       
  2586 
       
  2587 if __name__ == '__main__':
       
  2588   sys.exit(main(sys.argv))