thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 2273 e4cb9c53db3e
parent 1278 a7766286a7be
child 2309 be1b94099f2d
equal deleted inserted replaced
2272:26491ee91e33 2273:e4cb9c53db3e
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14 # See the License for the specific language governing permissions and
    14 # See the License for the specific language governing permissions and
    15 # limitations under the License.
    15 # limitations under the License.
    16 #
    16 #
    17 
    17 
    18 """Imports CSV data over HTTP.
    18 """Imports data over HTTP.
    19 
    19 
    20 Usage:
    20 Usage:
    21   %(arg0)s [flags]
    21   %(arg0)s [flags]
    22 
    22 
    23     --debug                 Show debugging information. (Optional)
    23     --debug                 Show debugging information. (Optional)
    25                             *.appspot.com)
    25                             *.appspot.com)
    26     --auth_domain=<domain>  The auth domain to use for logging in and for
    26     --auth_domain=<domain>  The auth domain to use for logging in and for
    27                             UserProperties. (Default: gmail.com)
    27                             UserProperties. (Default: gmail.com)
    28     --bandwidth_limit=<int> The maximum number of bytes per second for the
    28     --bandwidth_limit=<int> The maximum number of bytes per second for the
    29                             aggregate transfer of data to the server. Bursts
    29                             aggregate transfer of data to the server. Bursts
       
    30                             may exceed this, but overall transfer rate is
       
    31                             restricted to this rate. (Default 250000)
    30     --batch_size=<int>      Number of Entity objects to include in each post to
    32     --batch_size=<int>      Number of Entity objects to include in each post to
    31                             the URL endpoint. The more data per row/Entity, the
    33                             the URL endpoint. The more data per row/Entity, the
    32                             smaller the batch size should be. (Default 10)
    34                             smaller the batch size should be. (Default 10)
    33     --config_file=<path>    File containing Model and Loader definitions.
    35     --config_file=<path>    File containing Model and Loader definitions.
    34                             (Required)
    36                             (Required)
    36                             resume from. If not supplied, then a new database
    38                             resume from. If not supplied, then a new database
    37                             will be started, named:
    39                             will be started, named:
    38                             bulkloader-progress-TIMESTAMP.
    40                             bulkloader-progress-TIMESTAMP.
    39                             The special filename "skip" may be used to simply
    41                             The special filename "skip" may be used to simply
    40                             skip reading/writing any progress information.
    42                             skip reading/writing any progress information.
    41     --filename=<path>       Path to the CSV file to import. (Required)
    43     --download              Export entities to a file.
       
    44     --email=<string>        The username to use. Will prompt if omitted.
       
    45     --exporter_opts=<string>
       
    46                             A string to pass to the Exporter.initialize method.
       
    47     --filename=<path>       Path to the file to import. (Required)
       
    48     --has_header            Skip the first row of the input.
    42     --http_limit=<int>      The maximum numer of HTTP requests per second to
    49     --http_limit=<int>      The maximum numer of HTTP requests per second to
    43                             send to the server. (Default: 8)
    50                             send to the server. (Default: 8)
    44     --kind=<string>         Name of the Entity object kind to put in the
    51     --kind=<string>         Name of the Entity object kind to put in the
    45                             datastore. (Required)
    52                             datastore. (Required)
       
    53     --loader_opts=<string>  A string to pass to the Loader.initialize method.
       
    54     --log_file=<path>       File to write bulkloader logs.  If not supplied
       
    55                             then a new log file will be created, named:
       
    56                             bulkloader-log-TIMESTAMP.
    46     --num_threads=<int>     Number of threads to use for uploading entities
    57     --num_threads=<int>     Number of threads to use for uploading entities
    47                             (Default 10)
    58                             (Default 10)
    48                             may exceed this, but overall transfer rate is
    59     --passin                Read the login password from stdin.
    49                             restricted to this rate. (Default 250000)
    60     --result_db_filename=<path>
       
    61                             Result database to write to for downloads.
    50     --rps_limit=<int>       The maximum number of records per second to
    62     --rps_limit=<int>       The maximum number of records per second to
    51                             transfer to the server. (Default: 20)
    63                             transfer to the server. (Default: 20)
    52     --url=<string>          URL endpoint to post to for importing data.
    64     --url=<string>          URL endpoint to post to for importing data.
    53                             (Required)
    65                             (Required)
    54 
    66 
    64 
    76 
    65 """
    77 """
    66 
    78 
    67 
    79 
    68 
    80 
       
    81 import cPickle
    69 import csv
    82 import csv
       
    83 import errno
    70 import getopt
    84 import getopt
    71 import getpass
    85 import getpass
       
    86 import imp
    72 import logging
    87 import logging
    73 import new
       
    74 import os
    88 import os
    75 import Queue
    89 import Queue
       
    90 import re
    76 import signal
    91 import signal
       
    92 import StringIO
    77 import sys
    93 import sys
    78 import threading
    94 import threading
    79 import time
    95 import time
    80 import traceback
       
    81 import urllib2
    96 import urllib2
    82 import urlparse
    97 import urlparse
    83 
    98 
       
    99 from google.appengine.api import datastore_errors
    84 from google.appengine.ext import db
   100 from google.appengine.ext import db
       
   101 from google.appengine.ext.db import polymodel
    85 from google.appengine.ext.remote_api import remote_api_stub
   102 from google.appengine.ext.remote_api import remote_api_stub
       
   103 from google.appengine.runtime import apiproxy_errors
    86 from google.appengine.tools import appengine_rpc
   104 from google.appengine.tools import appengine_rpc
    87 
   105 
    88 try:
   106 try:
    89   import sqlite3
   107   import sqlite3
    90 except ImportError:
   108 except ImportError:
    91   pass
   109   pass
    92 
   110 
    93 UPLOADER_VERSION = '1'
   111 logger = logging.getLogger('google.appengine.tools.bulkloader')
    94 
   112 
    95 DEFAULT_THREAD_COUNT = 10
   113 DEFAULT_THREAD_COUNT = 10
    96 
   114 
    97 DEFAULT_BATCH_SIZE = 10
   115 DEFAULT_BATCH_SIZE = 10
    98 
   116 
   102 
   120 
   103 STATE_READ = 0
   121 STATE_READ = 0
   104 STATE_SENDING = 1
   122 STATE_SENDING = 1
   105 STATE_SENT = 2
   123 STATE_SENT = 2
   106 STATE_NOT_SENT = 3
   124 STATE_NOT_SENT = 3
       
   125 
       
   126 STATE_GETTING = 1
       
   127 STATE_GOT = 2
       
   128 STATE_NOT_GOT = 3
   107 
   129 
   108 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
   130 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
   109 
   131 
   110 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
   132 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
   111 
   133 
   126 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
   148 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
   127 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
   149 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
   128 HTTPS_REQUESTS = 'https-requests'
   150 HTTPS_REQUESTS = 'https-requests'
   129 RECORDS = 'records'
   151 RECORDS = 'records'
   130 
   152 
   131 
   153 MAXIMUM_INCREASE_DURATION = 8.0
   132 def StateMessage(state):
   154 MAXIMUM_HOLD_DURATION = 10.0
       
   155 
       
   156 
       
   157 def ImportStateMessage(state):
   133   """Converts a numeric state identifier to a status message."""
   158   """Converts a numeric state identifier to a status message."""
   134   return ({
   159   return ({
   135       STATE_READ: 'Batch read from file.',
   160       STATE_READ: 'Batch read from file.',
   136       STATE_SENDING: 'Sending batch to server.',
   161       STATE_SENDING: 'Sending batch to server.',
   137       STATE_SENT: 'Batch successfully sent.',
   162       STATE_SENT: 'Batch successfully sent.',
   138       STATE_NOT_SENT: 'Error while sending batch.'
   163       STATE_NOT_SENT: 'Error while sending batch.'
   139   }[state])
   164   }[state])
   140 
   165 
   141 
   166 
       
   167 def ExportStateMessage(state):
       
   168   """Converts a numeric state identifier to a status message."""
       
   169   return ({
       
   170       STATE_READ: 'Batch read from file.',
       
   171       STATE_GETTING: 'Fetching batch from server',
       
   172       STATE_GOT: 'Batch successfully fetched.',
       
   173       STATE_NOT_GOT: 'Error while fetching batch'
       
   174   }[state])
       
   175 
       
   176 
       
   177 def ExportStateName(state):
       
   178   """Converts a numeric state identifier to a string."""
       
   179   return ({
       
   180       STATE_READ: 'READ',
       
   181       STATE_GETTING: 'GETTING',
       
   182       STATE_GOT: 'GOT',
       
   183       STATE_NOT_GOT: 'NOT_GOT'
       
   184   }[state])
       
   185 
       
   186 
       
   187 def ImportStateName(state):
       
   188   """Converts a numeric state identifier to a string."""
       
   189   return ({
       
   190       STATE_READ: 'READ',
       
   191       STATE_GETTING: 'SENDING',
       
   192       STATE_GOT: 'SENT',
       
   193       STATE_NOT_GOT: 'NOT_SENT'
       
   194   }[state])
       
   195 
       
   196 
   142 class Error(Exception):
   197 class Error(Exception):
   143   """Base-class for exceptions in this module."""
   198   """Base-class for exceptions in this module."""
   144 
   199 
   145 
   200 
       
   201 class MissingPropertyError(Error):
       
   202   """An expected field is missing from an entity, and no default was given."""
       
   203 
       
   204 
   146 class FatalServerError(Error):
   205 class FatalServerError(Error):
   147   """An unrecoverable error occurred while trying to post data to the server."""
   206   """An unrecoverable error occurred while posting data to the server."""
   148 
   207 
   149 
   208 
   150 class ResumeError(Error):
   209 class ResumeError(Error):
   151   """Error while trying to resume a partial upload."""
   210   """Error while trying to resume a partial upload."""
   152 
   211 
   157 
   216 
   158 class AuthenticationError(Error):
   217 class AuthenticationError(Error):
   159   """Error while trying to authenticate with the server."""
   218   """Error while trying to authenticate with the server."""
   160 
   219 
   161 
   220 
   162 def GetCSVGeneratorFactory(csv_filename, batch_size,
   221 class FileNotFoundError(Error):
       
   222   """A filename passed in by the user refers to a non-existent input file."""
       
   223 
       
   224 
       
   225 class FileNotReadableError(Error):
       
   226   """A filename passed in by the user refers to a non-readable input file."""
       
   227 
       
   228 
       
   229 class FileExistsError(Error):
       
   230   """A filename passed in by the user refers to an existing output file."""
       
   231 
       
   232 
       
   233 class FileNotWritableError(Error):
       
   234   """A filename passed in by the user refers to a non-writable output file."""
       
   235 
       
   236 
       
   237 class KeyRangeError(Error):
       
   238   """Error while trying to generate a KeyRange."""
       
   239 
       
   240 
       
   241 class BadStateError(Error):
       
   242   """A work item in an unexpected state was encountered."""
       
   243 
       
   244 
       
   245 class NameClashError(Error):
       
   246   """A name clash occurred while trying to alias old method names."""
       
   247   def __init__(self, old_name, new_name, klass):
       
   248     Error.__init__(self, old_name, new_name, klass)
       
   249     self.old_name = old_name
       
   250     self.new_name = new_name
       
   251     self.klass = klass
       
   252 
       
   253 
       
   254 def GetCSVGeneratorFactory(kind, csv_filename, batch_size, csv_has_header,
   163                            openfile=open, create_csv_reader=csv.reader):
   255                            openfile=open, create_csv_reader=csv.reader):
   164   """Return a factory that creates a CSV-based WorkItem generator.
   256   """Return a factory that creates a CSV-based WorkItem generator.
   165 
   257 
   166   Args:
   258   Args:
       
   259     kind: The kind of the entities being uploaded.
   167     csv_filename: File on disk containing CSV data.
   260     csv_filename: File on disk containing CSV data.
   168     batch_size: Maximum number of CSV rows to stash into a WorkItem.
   261     batch_size: Maximum number of CSV rows to stash into a WorkItem.
       
   262     csv_has_header: Whether to skip the first row of the CSV.
   169     openfile: Used for dependency injection.
   263     openfile: Used for dependency injection.
   170     create_csv_reader: Used for dependency injection.
   264     create_csv_reader: Used for dependency injection.
   171 
   265 
   172   Returns: A callable (accepting the Progress Queue and Progress
   266   Returns:
   173     Generators as input) which creates the WorkItem generator.
   267     A callable (accepting the Progress Queue and Progress Generators
       
   268     as input) which creates the WorkItem generator.
   174   """
   269   """
       
   270   loader = Loader.RegisteredLoader(kind)
       
   271   loader._Loader__openfile = openfile
       
   272   loader._Loader__create_csv_reader = create_csv_reader
       
   273   record_generator = loader.generate_records(csv_filename)
   175 
   274 
   176   def CreateGenerator(progress_queue, progress_generator):
   275   def CreateGenerator(progress_queue, progress_generator):
   177     """Initialize a CSV generator linked to a progress generator and queue.
   276     """Initialize a WorkItem generator linked to a progress generator and queue.
   178 
   277 
   179     Args:
   278     Args:
   180       progress_queue: A ProgressQueue instance to send progress information.
   279       progress_queue: A ProgressQueue instance to send progress information.
   181       progress_generator: A generator of progress information or None.
   280       progress_generator: A generator of progress information or None.
   182 
   281 
   183     Returns:
   282     Returns:
   184       A CSVGenerator instance.
   283       A WorkItemGenerator instance.
   185     """
   284     """
   186     return CSVGenerator(progress_queue,
   285     return WorkItemGenerator(progress_queue,
   187                         progress_generator,
   286                              progress_generator,
   188                         csv_filename,
   287                              record_generator,
   189                         batch_size,
   288                              csv_has_header,
   190                         openfile,
   289                              batch_size)
   191                         create_csv_reader)
   290 
   192   return CreateGenerator
   291   return CreateGenerator
   193 
   292 
   194 
   293 
   195 class CSVGenerator(object):
   294 class WorkItemGenerator(object):
   196   """Reads a CSV file and generates WorkItems containing batches of records."""
   295   """Reads rows from a row generator and generates WorkItems of batches."""
   197 
   296 
   198   def __init__(self,
   297   def __init__(self,
   199                progress_queue,
   298                progress_queue,
   200                progress_generator,
   299                progress_generator,
   201                csv_filename,
   300                record_generator,
   202                batch_size,
   301                skip_first,
   203                openfile,
   302                batch_size):
   204                create_csv_reader):
   303     """Initialize a WorkItemGenerator.
   205     """Initializes a CSV generator.
   304 
   206 
   305     Args:
   207     Args:
   306       progress_queue: A progress queue with which to associate WorkItems.
   208       progress_queue: A queue used for tracking progress information.
   307       progress_generator: A generator of progress information.
   209       progress_generator: A generator of prior progress information, or None
   308       record_generator: A generator of data records.
   210         if there is no prior status.
   309       skip_first: Whether to skip the first data record.
   211       csv_filename: File on disk containing CSV data.
   310       batch_size: The number of data records per WorkItem.
   212       batch_size: Maximum number of CSV rows to stash into a WorkItem.
       
   213       openfile: Used for dependency injection of 'open'.
       
   214       create_csv_reader: Used for dependency injection of 'csv.reader'.
       
   215     """
   311     """
   216     self.progress_queue = progress_queue
   312     self.progress_queue = progress_queue
   217     self.progress_generator = progress_generator
   313     self.progress_generator = progress_generator
   218     self.csv_filename = csv_filename
   314     self.reader = record_generator
       
   315     self.skip_first = skip_first
   219     self.batch_size = batch_size
   316     self.batch_size = batch_size
   220     self.openfile = openfile
       
   221     self.create_csv_reader = create_csv_reader
       
   222     self.line_number = 1
   317     self.line_number = 1
   223     self.column_count = None
   318     self.column_count = None
   224     self.read_rows = []
   319     self.read_rows = []
   225     self.reader = None
       
   226     self.row_count = 0
   320     self.row_count = 0
   227     self.sent_count = 0
   321     self.xfer_count = 0
   228 
   322 
   229   def _AdvanceTo(self, line):
   323   def _AdvanceTo(self, line):
   230     """Advance the reader to the given line.
   324     """Advance the reader to the given line.
   231 
   325 
   232     Args:
   326     Args:
   234     """
   328     """
   235     while self.line_number < line:
   329     while self.line_number < line:
   236       self.reader.next()
   330       self.reader.next()
   237       self.line_number += 1
   331       self.line_number += 1
   238       self.row_count += 1
   332       self.row_count += 1
   239       self.sent_count += 1
   333       self.xfer_count += 1
   240 
   334 
   241   def _ReadRows(self, key_start, key_end):
   335   def _ReadRows(self, key_start, key_end):
   242     """Attempts to read and encode rows [key_start, key_end].
   336     """Attempts to read and encode rows [key_start, key_end].
   243 
   337 
   244     The encoded rows are stored in self.read_rows.
   338     The encoded rows are stored in self.read_rows.
   284                     progress_key=progress_key)
   378                     progress_key=progress_key)
   285 
   379 
   286     return item
   380     return item
   287 
   381 
   288   def Batches(self):
   382   def Batches(self):
   289     """Reads the CSV data file and generates WorkItems.
   383     """Reads from the record_generator and generates WorkItems.
   290 
   384 
   291     Yields:
   385     Yields:
   292       Instances of class WorkItem
   386       Instances of class WorkItem
   293 
   387 
   294     Raises:
   388     Raises:
   295       ResumeError: If the progress database and data file indicate a different
   389       ResumeError: If the progress database and data file indicate a different
   296         number of rows.
   390         number of rows.
   297     """
   391     """
   298     csv_file = self.openfile(self.csv_filename, 'r')
   392     if self.skip_first:
   299     csv_content = csv_file.read()
   393       logger.info('Skipping header line.')
   300     if csv_content:
   394       try:
   301       has_headers = csv.Sniffer().has_header(csv_content)
   395         self.reader.next()
   302     else:
   396       except StopIteration:
   303       has_headers = False
   397         return
   304     csv_file.seek(0)
       
   305     self.reader = self.create_csv_reader(csv_file, skipinitialspace=True)
       
   306     if has_headers:
       
   307       logging.info('The CSV file appears to have a header line, skipping.')
       
   308       self.reader.next()
       
   309 
   398 
   310     exhausted = False
   399     exhausted = False
   311 
   400 
   312     self.line_number = 1
   401     self.line_number = 1
   313     self.column_count = None
   402     self.column_count = None
   314 
   403 
   315     logging.info('Starting import; maximum %d entities per post',
   404     logger.info('Starting import; maximum %d entities per post',
   316                  self.batch_size)
   405                 self.batch_size)
   317 
   406 
   318     state = None
   407     state = None
   319     if self.progress_generator is not None:
   408     if self.progress_generator:
   320       for progress_key, state, key_start, key_end in self.progress_generator:
   409       for progress_key, state, key_start, key_end in self.progress_generator:
   321         if key_start:
   410         if key_start:
   322           try:
   411           try:
   323             self._AdvanceTo(key_start)
   412             self._AdvanceTo(key_start)
   324             self._ReadRows(key_start, key_end)
   413             self._ReadRows(key_start, key_end)
   325             yield self._MakeItem(key_start,
   414             yield self._MakeItem(key_start,
   326                                  key_end,
   415                                  key_end,
   327                                  self.read_rows,
   416                                  self.read_rows,
   328                                  progress_key=progress_key)
   417                                  progress_key=progress_key)
   329           except StopIteration:
   418           except StopIteration:
   330             logging.error('Mismatch between data file and progress database')
   419             logger.error('Mismatch between data file and progress database')
   331             raise ResumeError(
   420             raise ResumeError(
   332                 'Mismatch between data file and progress database')
   421                 'Mismatch between data file and progress database')
   333         elif state == DATA_CONSUMED_TO_HERE:
   422         elif state == DATA_CONSUMED_TO_HERE:
   334           try:
   423           try:
   335             self._AdvanceTo(key_end + 1)
   424             self._AdvanceTo(key_end + 1)
   347           key_end = self.line_number - 1
   436           key_end = self.line_number - 1
   348         if key_start <= key_end:
   437         if key_start <= key_end:
   349           yield self._MakeItem(key_start, key_end, self.read_rows)
   438           yield self._MakeItem(key_start, key_end, self.read_rows)
   350 
   439 
   351 
   440 
       
   441 class CSVGenerator(object):
       
   442   """Reads a CSV file and generates data records."""
       
   443 
       
   444   def __init__(self,
       
   445                csv_filename,
       
   446                openfile=open,
       
   447                create_csv_reader=csv.reader):
       
   448     """Initializes a CSV generator.
       
   449 
       
   450     Args:
       
   451       csv_filename: File on disk containing CSV data.
       
   452       openfile: Used for dependency injection of 'open'.
       
   453       create_csv_reader: Used for dependency injection of 'csv.reader'.
       
   454     """
       
   455     self.csv_filename = csv_filename
       
   456     self.openfile = openfile
       
   457     self.create_csv_reader = create_csv_reader
       
   458 
       
   459   def Records(self):
       
   460     """Reads the CSV data file and generates row records.
       
   461 
       
   462     Yields:
       
   463       Lists of strings
       
   464 
       
   465     Raises:
       
   466       ResumeError: If the progress database and data file indicate a different
       
   467         number of rows.
       
   468     """
       
   469     csv_file = self.openfile(self.csv_filename, 'rb')
       
   470     reader = self.create_csv_reader(csv_file, skipinitialspace=True)
       
   471     return reader
       
   472 
       
   473 
       
   474 class KeyRangeGenerator(object):
       
   475   """Generates ranges of keys to download.
       
   476 
       
   477   Reads progress information from the progress database and creates
       
   478   KeyRange objects corresponding to incompletely downloaded parts of an
       
   479   export.
       
   480   """
       
   481 
       
   482   def __init__(self, kind, progress_queue, progress_generator):
       
   483     """Initialize the KeyRangeGenerator.
       
   484 
       
   485     Args:
       
   486       kind: The kind of entities being transferred.
       
   487       progress_queue: A queue used for tracking progress information.
       
   488       progress_generator: A generator of prior progress information, or None
       
   489         if there is no prior status.
       
   490     """
       
   491     self.kind = kind
       
   492     self.row_count = 0
       
   493     self.xfer_count = 0
       
   494     self.progress_queue = progress_queue
       
   495     self.progress_generator = progress_generator
       
   496 
       
   497   def Batches(self):
       
   498     """Iterate through saved progress information.
       
   499 
       
   500     Yields:
       
   501       KeyRange instances corresponding to undownloaded key ranges.
       
   502     """
       
   503     if self.progress_generator is not None:
       
   504       for progress_key, state, key_start, key_end in self.progress_generator:
       
   505         if state is not None and state != STATE_GOT and key_start is not None:
       
   506           key_start = ParseKey(key_start)
       
   507           key_end = ParseKey(key_end)
       
   508 
       
   509           result = KeyRange(self.progress_queue,
       
   510                             self.kind,
       
   511                             key_start=key_start,
       
   512                             key_end=key_end,
       
   513                             progress_key=progress_key,
       
   514                             direction=KeyRange.ASC,
       
   515                             state=STATE_READ)
       
   516           yield result
       
   517     else:
       
   518 
       
   519       yield KeyRange(
       
   520           self.progress_queue, self.kind,
       
   521           key_start=None,
       
   522           key_end=None,
       
   523           direction=KeyRange.DESC)
       
   524 
       
   525 
   352 class ReQueue(object):
   526 class ReQueue(object):
   353   """A special thread-safe queue.
   527   """A special thread-safe queue.
   354 
   528 
   355   A ReQueue allows unfinished work items to be returned with a call to
   529   A ReQueue allows unfinished work items to be returned with a call to
   356   reput().  When an item is reput, task_done() should *not* be called
   530   reput().  When an item is reput, task_done() should *not* be called
   357   in addition, getting an item that has been reput does not increase
   531   in addition, getting an item that has been reput does not increase
   358   the number of outstanding tasks.
   532   the number of outstanding tasks.
   359 
   533 
   360   This class shares an interface with Queue.Queue and provides the
   534   This class shares an interface with Queue.Queue and provides the
   361   additional Reput method.
   535   additional reput method.
   362   """
   536   """
   363 
   537 
   364   def __init__(self,
   538   def __init__(self,
   365                queue_capacity,
   539                queue_capacity,
   366                requeue_capacity=None,
   540                requeue_capacity=None,
   472     """Re-put an item back into the requeue.
   646     """Re-put an item back into the requeue.
   473 
   647 
   474     Re-putting an item does not increase the number of outstanding
   648     Re-putting an item does not increase the number of outstanding
   475     tasks, so the reput item should be uniquely associated with an
   649     tasks, so the reput item should be uniquely associated with an
   476     item that was previously removed from the requeue and for which
   650     item that was previously removed from the requeue and for which
   477     task_done has not been called.
   651     TaskDone has not been called.
   478 
   652 
   479     Args:
   653     Args:
   480       item: An item to add to the requeue.
   654       item: An item to add to the requeue.
   481       block: Whether to block if the requeue is full.
   655       block: Whether to block if the requeue is full.
   482       timeout: Maximum on how long to wait until the queue is non-full.
   656       timeout: Maximum on how long to wait until the queue is non-full.
   536 
   710 
   537   def get_nowait(self):
   711   def get_nowait(self):
   538     """Try to get an item from the queue without blocking."""
   712     """Try to get an item from the queue without blocking."""
   539     return self.get(block=False)
   713     return self.get(block=False)
   540 
   714 
       
   715   def qsize(self):
       
   716     return self.queue.qsize() + self.requeue.qsize()
       
   717 
   541 
   718 
   542 class ThrottleHandler(urllib2.BaseHandler):
   719 class ThrottleHandler(urllib2.BaseHandler):
   543   """A urllib2 handler for http and https requests that adds to a throttle."""
   720   """A urllib2 handler for http and https requests that adds to a throttle."""
   544 
   721 
   545   def __init__(self, throttle):
   722   def __init__(self, throttle):
   699     request_manager: A RequestManager instance.
   876     request_manager: A RequestManager instance.
   700 
   877 
   701   Returns:
   878   Returns:
   702     A factory to produce a ThrottledHttpRpcServer.
   879     A factory to produce a ThrottledHttpRpcServer.
   703   """
   880   """
       
   881 
   704   def MakeRpcServer(*args, **kwargs):
   882   def MakeRpcServer(*args, **kwargs):
       
   883     """Factory to produce a ThrottledHttpRpcServer.
       
   884 
       
   885     Args:
       
   886       args: Positional args to pass to ThrottledHttpRpcServer.
       
   887       kwargs: Keyword args to pass to ThrottledHttpRpcServer.
       
   888 
       
   889     Returns:
       
   890       A ThrottledHttpRpcServer instance.
       
   891     """
   705     kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
   892     kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
   706     kwargs['save_cookies'] = True
   893     kwargs['save_cookies'] = True
   707     return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs)
   894     return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs)
   708   return MakeRpcServer
   895   return MakeRpcServer
   709 
   896 
   710 
   897 
       
   898 class ExportResult(object):
       
   899   """Holds the decoded content for the result of an export requests."""
       
   900 
       
   901   def __init__(self, continued, direction, keys, entities):
       
   902     self.continued = continued
       
   903     self.direction = direction
       
   904     self.keys = keys
       
   905     self.entities = entities
       
   906     self.count = len(keys)
       
   907     assert self.count == len(entities)
       
   908     assert direction in (KeyRange.ASC, KeyRange.DESC)
       
   909     if self.count > 0:
       
   910       if direction == KeyRange.ASC:
       
   911         self.key_start = keys[0]
       
   912         self.key_end = keys[-1]
       
   913       else:
       
   914         self.key_start = keys[-1]
       
   915         self.key_end = keys[0]
       
   916 
       
   917   def __str__(self):
       
   918     return 'continued = %s\n%s' % (
       
   919         str(self.continued), '\n'.join(self.entities))
       
   920 
       
   921 
       
   922 class _WorkItem(object):
       
   923   """Holds a description of a unit of upload or download work."""
       
   924 
       
   925   def __init__(self, progress_queue, key_start, key_end, state_namer,
       
   926                state=STATE_READ, progress_key=None):
       
   927     """Initialize the _WorkItem instance.
       
   928 
       
   929     Args:
       
   930       progress_queue: A queue used for tracking progress information.
       
   931       key_start: The starting key, inclusive.
       
   932       key_end: The ending key, inclusive.
       
   933       state_namer: Function to describe work item states.
       
   934       state: The initial state of the work item.
       
   935       progress_key: If this WorkItem represents state from a prior run,
       
   936         then this will be the key within the progress database.
       
   937     """
       
   938     self.progress_queue = progress_queue
       
   939     self.key_start = key_start
       
   940     self.key_end = key_end
       
   941     self.state_namer = state_namer
       
   942     self.state = state
       
   943     self.progress_key = progress_key
       
   944     self.progress_event = threading.Event()
       
   945 
       
   946   def _AssertInState(self, *states):
       
   947     """Raises an Error if the state of this range is not in states."""
       
   948     if not self.state in states:
       
   949       raise BadStateError('%s:%s not in %s' %
       
   950                           (str(self),
       
   951                            self.state_namer(self.state),
       
   952                            map(self.state_namer, states)))
       
   953 
       
   954   def _AssertProgressKey(self):
       
   955     """Raises an Error if the progress key is None."""
       
   956     if self.progress_key is None:
       
   957       raise BadStateError('%s: Progress key is missing' % str(self))
       
   958 
       
   959   def MarkAsRead(self):
       
   960     """Mark this _WorkItem as read, updating the progress database."""
       
   961     self._AssertInState(STATE_READ)
       
   962     self._StateTransition(STATE_READ, blocking=True)
       
   963 
       
   964   def MarkAsTransferring(self):
       
   965     """Mark this _WorkItem as transferring, updating the progress database."""
       
   966     self._AssertInState(STATE_READ, STATE_NOT_GOT)
       
   967     self._AssertProgressKey()
       
   968     self._StateTransition(STATE_GETTING, blocking=True)
       
   969 
       
   970   def MarkAsTransferred(self):
       
   971     """Mark this _WorkItem as transferred, updating the progress database."""
       
   972     raise NotImplementedError()
       
   973 
       
   974   def MarkAsError(self):
       
   975     """Mark this _WorkItem as failed, updating the progress database."""
       
   976     self._AssertInState(STATE_GETTING)
       
   977     self._AssertProgressKey()
       
   978     self._StateTransition(STATE_NOT_GOT, blocking=True)
       
   979 
       
   980   def _StateTransition(self, new_state, blocking=False):
       
   981     """Transition the work item to a new state, storing progress information.
       
   982 
       
   983     Args:
       
   984       new_state: The state to transition to.
       
   985       blocking: Whether to block for the progress thread to acknowledge the
       
   986         transition.
       
   987     """
       
   988     assert not self.progress_event.isSet()
       
   989 
       
   990     self.state = new_state
       
   991 
       
   992     self.progress_queue.put(self)
       
   993 
       
   994     if blocking:
       
   995       self.progress_event.wait()
       
   996 
       
   997       self.progress_event.clear()
       
   998 
       
   999 
       
  1000 
       
  1001 class WorkItem(_WorkItem):
       
  1002   """Holds a unit of uploading work.
       
  1003 
       
  1004   A WorkItem represents a number of entities that need to be uploaded to
       
  1005   Google App Engine. These entities are encoded in the "content" field of
       
  1006   the WorkItem, and will be POST'd as-is to the server.
       
  1007 
       
  1008   The entities are identified by a range of numeric keys, inclusively. In
       
  1009   the case of a resumption of an upload, or a replay to correct errors,
       
  1010   these keys must be able to identify the same set of entities.
       
  1011 
       
  1012   Note that keys specify a range. The entities do not have to sequentially
       
  1013   fill the entire range, they must simply bound a range of valid keys.
       
  1014   """
       
  1015 
       
  1016   def __init__(self, progress_queue, rows, key_start, key_end,
       
  1017                progress_key=None):
       
  1018     """Initialize the WorkItem instance.
       
  1019 
       
  1020     Args:
       
  1021       progress_queue: A queue used for tracking progress information.
       
  1022       rows: A list of pairs of a line number and a list of column values
       
  1023       key_start: The (numeric) starting key, inclusive.
       
  1024       key_end: The (numeric) ending key, inclusive.
       
  1025       progress_key: If this WorkItem represents state from a prior run,
       
  1026         then this will be the key within the progress database.
       
  1027     """
       
  1028     _WorkItem.__init__(self, progress_queue, key_start, key_end,
       
  1029                        ImportStateName, state=STATE_READ,
       
  1030                        progress_key=progress_key)
       
  1031 
       
  1032     assert isinstance(key_start, (int, long))
       
  1033     assert isinstance(key_end, (int, long))
       
  1034     assert key_start <= key_end
       
  1035 
       
  1036     self.rows = rows
       
  1037     self.content = None
       
  1038     self.count = len(rows)
       
  1039 
       
  1040   def __str__(self):
       
  1041     return '[%s-%s]' % (self.key_start, self.key_end)
       
  1042 
       
  1043   def MarkAsTransferred(self):
       
  1044     """Mark this WorkItem as sucessfully-sent to the server."""
       
  1045 
       
  1046     self._AssertInState(STATE_SENDING)
       
  1047     self._AssertProgressKey()
       
  1048 
       
  1049     self._StateTransition(STATE_SENT, blocking=False)
       
  1050 
       
  1051 
       
  1052 def GetImplementationClass(kind_or_class_key):
       
  1053   """Returns the implementation class for a given kind or class key.
       
  1054 
       
  1055   Args:
       
  1056     kind_or_class_key: A kind string or a tuple of kind strings.
       
  1057 
       
  1058   Return:
       
  1059     A db.Model subclass for the given kind or class key.
       
  1060   """
       
  1061   if isinstance(kind_or_class_key, tuple):
       
  1062     try:
       
  1063       implementation_class = polymodel._class_map[kind_or_class_key]
       
  1064     except KeyError:
       
  1065       raise db.KindError('No implementation for class \'%s\'' %
       
  1066                          kind_or_class_key)
       
  1067   else:
       
  1068     implementation_class = db.class_for_kind(kind_or_class_key)
       
  1069   return implementation_class
       
  1070 
       
  1071 class EmptyQuery(db.Query):
       
  1072   def get(self):
       
  1073     return None
       
  1074 
       
  1075   def fetch(self, limit=1000, offset=0):
       
  1076     return []
       
  1077 
       
  1078   def count(self, limit=1000):
       
  1079     return 0
       
  1080 
       
  1081 
       
  1082 def KeyLEQ(key1, key2):
       
  1083   """Compare two keys for less-than-or-equal-to.
       
  1084 
       
  1085   All keys with numeric ids come before all keys with names.
       
  1086 
       
  1087   Args:
       
  1088     key1: An int or db.Key instance.
       
  1089     key2: An int or db.Key instance.
       
  1090 
       
  1091   Returns:
       
  1092     True if key1 <= key2
       
  1093   """
       
  1094   if isinstance(key1, int) and isinstance(key2, int):
       
  1095     return key1 <= key2
       
  1096   if key1 is None or key2 is None:
       
  1097     return True
       
  1098   if key1.id() and not key2.id():
       
  1099     return True
       
  1100   return key1.id_or_name() <= key2.id_or_name()
       
  1101 
       
  1102 
       
  1103 class KeyRange(_WorkItem):
       
  1104   """Represents an item of download work.
       
  1105 
       
  1106   A KeyRange object represents a key range (key_start, key_end) and a
       
  1107   scan direction (KeyRange.DESC or KeyRange.ASC).  The KeyRange object
       
  1108   has an associated state: STATE_READ, STATE_GETTING, STATE_GOT, and
       
  1109   STATE_ERROR.
       
  1110 
       
  1111   - STATE_READ indicates the range ready to be downloaded by a worker thread.
       
  1112   - STATE_GETTING indicates the range is currently being downloaded.
       
  1113   - STATE_GOT indicates that the range was successfully downloaded
       
  1114   - STATE_ERROR indicates that an error occurred during the last download
       
  1115     attempt
       
  1116 
       
  1117   KeyRanges not in the STATE_GOT state are stored in the progress database.
       
  1118   When a piece of KeyRange work is downloaded, the download may cover only
       
  1119   a portion of the range.  In this case, the old KeyRange is removed from
       
  1120   the progress database and ranges covering the undownloaded range are
       
  1121   generated and stored as STATE_READ in the export progress database.
       
  1122   """
       
  1123 
       
  1124   DESC = 0
       
  1125   ASC = 1
       
  1126 
       
  1127   MAX_KEY_LEN = 500
       
  1128 
       
  1129   def __init__(self,
       
  1130                progress_queue,
       
  1131                kind,
       
  1132                direction,
       
  1133                key_start=None,
       
  1134                key_end=None,
       
  1135                include_start=True,
       
  1136                include_end=True,
       
  1137                progress_key=None,
       
  1138                state=STATE_READ):
       
  1139     """Initialize a KeyRange object.
       
  1140 
       
  1141     Args:
       
  1142       progress_queue: A queue used for tracking progress information.
       
  1143       kind: The kind of entities for this range.
       
  1144       direction: The direction of the query for this range.
       
  1145       key_start: The starting key for this range.
       
  1146       key_end: The ending key for this range.
       
  1147       include_start: Whether the start key should be included in the range.
       
  1148       include_end: Whether the end key should be included in the range.
       
  1149       progress_key: The key for this range within the progress database.
       
  1150       state: The initial state of this range.
       
  1151 
       
  1152     Raises:
       
  1153       KeyRangeError: if key_start is None.
       
  1154     """
       
  1155     assert direction in (KeyRange.ASC, KeyRange.DESC)
       
  1156     _WorkItem.__init__(self, progress_queue, key_start, key_end,
       
  1157                        ExportStateName, state=state, progress_key=progress_key)
       
  1158     self.kind = kind
       
  1159     self.direction = direction
       
  1160     self.export_result = None
       
  1161     self.count = 0
       
  1162     self.include_start = include_start
       
  1163     self.include_end = include_end
       
  1164     self.SPLIT_KEY = db.Key.from_path(self.kind, unichr(0))
       
  1165 
       
  1166   def __str__(self):
       
  1167     return '[%s-%s]' % (PrettyKey(self.key_start), PrettyKey(self.key_end))
       
  1168 
       
  1169   def __repr__(self):
       
  1170     return self.__str__()
       
  1171 
       
  1172   def MarkAsTransferred(self):
       
  1173     """Mark this KeyRange as transferred, updating the progress database."""
       
  1174     pass
       
  1175 
       
  1176   def Process(self, export_result, num_threads, batch_size, work_queue):
       
  1177     """Mark this KeyRange as success, updating the progress database.
       
  1178 
       
  1179     Process will split this KeyRange based on the content of export_result and
       
  1180     adds the unfinished ranges to the work queue.
       
  1181 
       
  1182     Args:
       
  1183       export_result: An ExportResult instance.
       
  1184       num_threads: The number of threads for parallel transfers.
       
  1185       batch_size: The number of entities to transfer per request.
       
  1186       work_queue: The work queue to add unfinished ranges to.
       
  1187 
       
  1188     Returns:
       
  1189       A list of KeyRanges representing undownloaded datastore key ranges.
       
  1190     """
       
  1191     self._AssertInState(STATE_GETTING)
       
  1192     self._AssertProgressKey()
       
  1193 
       
  1194     self.export_result = export_result
       
  1195     self.count = len(export_result.keys)
       
  1196     if export_result.continued:
       
  1197       self._FinishedRange()._StateTransition(STATE_GOT, blocking=True)
       
  1198       self._AddUnfinishedRanges(num_threads, batch_size, work_queue)
       
  1199     else:
       
  1200       self._StateTransition(STATE_GOT, blocking=True)
       
  1201 
       
  1202   def _FinishedRange(self):
       
  1203     """Returns the range completed by the export_result.
       
  1204 
       
  1205     Returns:
       
  1206       A KeyRange representing a completed range.
       
  1207     """
       
  1208     assert self.export_result is not None
       
  1209 
       
  1210     if self.direction == KeyRange.ASC:
       
  1211       key_start = self.key_start
       
  1212       if self.export_result.continued:
       
  1213         key_end = self.export_result.key_end
       
  1214       else:
       
  1215         key_end = self.key_end
       
  1216     else:
       
  1217       key_end = self.key_end
       
  1218       if self.export_result.continued:
       
  1219         key_start = self.export_result.key_start
       
  1220       else:
       
  1221         key_start = self.key_start
       
  1222 
       
  1223     result = KeyRange(self.progress_queue,
       
  1224                       self.kind,
       
  1225                       key_start=key_start,
       
  1226                       key_end=key_end,
       
  1227                       direction=self.direction)
       
  1228 
       
  1229     result.progress_key = self.progress_key
       
  1230     result.export_result = self.export_result
       
  1231     result.state = self.state
       
  1232     result.count = self.count
       
  1233     return result
       
  1234 
       
  1235   def FilterQuery(self, query):
       
  1236     """Add query filter to restrict to this key range.
       
  1237 
       
  1238     Args:
       
  1239       query: A db.Query instance.
       
  1240     """
       
  1241     if self.key_start == self.key_end and not (
       
  1242         self.include_start or self.include_end):
       
  1243       return EmptyQuery()
       
  1244     if self.include_start:
       
  1245       start_comparator = '>='
       
  1246     else:
       
  1247       start_comparator = '>'
       
  1248     if self.include_end:
       
  1249       end_comparator = '<='
       
  1250     else:
       
  1251       end_comparator = '<'
       
  1252     if self.key_start and self.key_end:
       
  1253       query.filter('__key__ %s' % start_comparator, self.key_start)
       
  1254       query.filter('__key__ %s' % end_comparator, self.key_end)
       
  1255     elif self.key_start:
       
  1256       query.filter('__key__ %s' % start_comparator, self.key_start)
       
  1257     elif self.key_end:
       
  1258       query.filter('__key__ %s' % end_comparator, self.key_end)
       
  1259 
       
  1260     return query
       
  1261 
       
  1262   def MakeParallelQuery(self):
       
  1263     """Construct a query for this key range, for parallel downloading.
       
  1264 
       
  1265     Returns:
       
  1266       A db.Query instance.
       
  1267 
       
  1268     Raises:
       
  1269       KeyRangeError: if self.direction is not one of
       
  1270         KeyRange.ASC, KeyRange.DESC
       
  1271     """
       
  1272     if self.direction == KeyRange.ASC:
       
  1273       direction = ''
       
  1274     elif self.direction == KeyRange.DESC:
       
  1275       direction = '-'
       
  1276     else:
       
  1277       raise KeyRangeError('KeyRange direction unexpected: %s', self.direction)
       
  1278     query = db.Query(GetImplementationClass(self.kind))
       
  1279     query.order('%s__key__' % direction)
       
  1280 
       
  1281     return self.FilterQuery(query)
       
  1282 
       
  1283   def MakeSerialQuery(self):
       
  1284     """Construct a query for this key range without descending __key__ scan.
       
  1285 
       
  1286     Returns:
       
  1287       A db.Query instance.
       
  1288     """
       
  1289     query = db.Query(GetImplementationClass(self.kind))
       
  1290     query.order('__key__')
       
  1291 
       
  1292     return self.FilterQuery(query)
       
  1293 
       
  1294   def _BisectStringRange(self, start, end):
       
  1295     if start == end:
       
  1296       return (start, start, end)
       
  1297     start += '\0'
       
  1298     end += '\0'
       
  1299     midpoint = []
       
  1300     expected_max = 127
       
  1301     for i in xrange(min(len(start), len(end))):
       
  1302       if start[i] == end[i]:
       
  1303         midpoint.append(start[i])
       
  1304       else:
       
  1305         ord_sum = ord(start[i]) + ord(end[i])
       
  1306         midpoint.append(unichr(ord_sum / 2))
       
  1307         if ord_sum % 2:
       
  1308           if len(start) > i + 1:
       
  1309             ord_start = ord(start[i+1])
       
  1310           else:
       
  1311             ord_start = 0
       
  1312           if ord_start < expected_max:
       
  1313             ord_split = (expected_max + ord_start) / 2
       
  1314           else:
       
  1315             ord_split = (0xFFFF + ord_start) / 2
       
  1316           midpoint.append(unichr(ord_split))
       
  1317         break
       
  1318     return (start[:-1], ''.join(midpoint), end[:-1])
       
  1319 
       
  1320   def SplitRange(self, key_start, include_start, key_end, include_end,
       
  1321                  export_result, num_threads, batch_size, work_queue):
       
  1322     """Split the key range [key_start, key_end] into a list of ranges."""
       
  1323     if export_result.direction == KeyRange.ASC:
       
  1324       key_start = export_result.key_end
       
  1325       include_start = False
       
  1326     else:
       
  1327       key_end = export_result.key_start
       
  1328       include_end = False
       
  1329     key_pairs = []
       
  1330     if not key_start:
       
  1331       key_pairs.append((key_start, include_start, key_end, include_end,
       
  1332                         KeyRange.ASC))
       
  1333     elif not key_end:
       
  1334       key_pairs.append((key_start, include_start, key_end, include_end,
       
  1335                         KeyRange.DESC))
       
  1336     elif work_queue.qsize() > 2 * num_threads:
       
  1337       key_pairs.append((key_start, include_start, key_end, include_end,
       
  1338                         KeyRange.ASC))
       
  1339     elif key_start.id() and key_end.id():
       
  1340       if key_end.id() - key_start.id() > batch_size:
       
  1341         key_half = db.Key.from_path(self.kind,
       
  1342                                     (key_start.id() + key_end.id()) / 2)
       
  1343         key_pairs.append((key_start, include_start,
       
  1344                           key_half, True,
       
  1345                           KeyRange.DESC))
       
  1346         key_pairs.append((key_half, False,
       
  1347                           key_end, include_end,
       
  1348                           KeyRange.ASC))
       
  1349       else:
       
  1350         key_pairs.append((key_start, include_start, key_end, include_end,
       
  1351                           KeyRange.ASC))
       
  1352     elif key_start.name() and key_end.name():
       
  1353       (start, middle, end) = self._BisectStringRange(key_start.name(),
       
  1354                                                      key_end.name())
       
  1355       key_pairs.append((key_start, include_start,
       
  1356                         db.Key.from_path(self.kind, middle), True,
       
  1357                         KeyRange.DESC))
       
  1358       key_pairs.append((db.Key.from_path(self.kind, middle), False,
       
  1359                         key_end, include_end,
       
  1360                         KeyRange.ASC))
       
  1361     else:
       
  1362       assert key_start.id() and key_end.name()
       
  1363       key_pairs.append((key_start, include_start,
       
  1364                         self.SPLIT_KEY, False,
       
  1365                         KeyRange.DESC))
       
  1366       key_pairs.append((self.SPLIT_KEY, True,
       
  1367                         key_end, include_end,
       
  1368                         KeyRange.ASC))
       
  1369 
       
  1370     ranges = [KeyRange(self.progress_queue,
       
  1371                        self.kind,
       
  1372                        key_start=start,
       
  1373                        include_start=include_start,
       
  1374                        key_end=end,
       
  1375                        include_end=include_end,
       
  1376                        direction=direction)
       
  1377               for (start, include_start, end, include_end, direction)
       
  1378               in key_pairs]
       
  1379 
       
  1380     for key_range in ranges:
       
  1381       key_range.MarkAsRead()
       
  1382       work_queue.put(key_range, block=True)
       
  1383 
       
  1384   def _AddUnfinishedRanges(self, num_threads, batch_size, work_queue):
       
  1385     """Adds incomplete KeyRanges to the work_queue.
       
  1386 
       
  1387     Args:
       
  1388       num_threads: The number of threads for parallel transfers.
       
  1389       batch_size: The number of entities to transfer per request.
       
  1390       work_queue: The work queue to add unfinished ranges to.
       
  1391 
       
  1392     Returns:
       
  1393       A list of KeyRanges representing incomplete datastore key ranges.
       
  1394 
       
  1395     Raises:
       
  1396       KeyRangeError: if this key range has already been completely transferred.
       
  1397     """
       
  1398     assert self.export_result is not None
       
  1399     if self.export_result.continued:
       
  1400       self.SplitRange(self.key_start, self.include_start, self.key_end,
       
  1401                       self.include_end, self.export_result,
       
  1402                       num_threads, batch_size, work_queue)
       
  1403     else:
       
  1404       raise KeyRangeError('No unfinished part of key range.')
       
  1405 
       
  1406 
   711 class RequestManager(object):
  1407 class RequestManager(object):
   712   """A class which wraps a connection to the server."""
  1408   """A class which wraps a connection to the server."""
   713 
       
   714   source = 'google-bulkloader-%s' % UPLOADER_VERSION
       
   715   user_agent = source
       
   716 
  1409 
   717   def __init__(self,
  1410   def __init__(self,
   718                app_id,
  1411                app_id,
   719                host_port,
  1412                host_port,
   720                url_path,
  1413                url_path,
   721                kind,
  1414                kind,
   722                throttle):
  1415                throttle,
       
  1416                batch_size,
       
  1417                secure,
       
  1418                email,
       
  1419                passin):
   723     """Initialize a RequestManager object.
  1420     """Initialize a RequestManager object.
   724 
  1421 
   725     Args:
  1422     Args:
   726       app_id: String containing the application id for requests.
  1423       app_id: String containing the application id for requests.
   727       host_port: String containing the "host:port" pair; the port is optional.
  1424       host_port: String containing the "host:port" pair; the port is optional.
   728       url_path: partial URL (path) to post entity data to.
  1425       url_path: partial URL (path) to post entity data to.
   729       kind: Kind of the Entity records being posted.
  1426       kind: Kind of the Entity records being posted.
   730       throttle: A Throttle instance.
  1427       throttle: A Throttle instance.
       
  1428       batch_size: The number of entities to transfer per request.
       
  1429       secure: Use SSL when communicating with server.
       
  1430       email: If not none, the username to log in with.
       
  1431       passin: If True, the password will be read from standard in.
   731     """
  1432     """
   732     self.app_id = app_id
  1433     self.app_id = app_id
   733     self.host_port = host_port
  1434     self.host_port = host_port
   734     self.host = host_port.split(':')[0]
  1435     self.host = host_port.split(':')[0]
   735     if url_path and url_path[0] != '/':
  1436     if url_path and url_path[0] != '/':
   736       url_path = '/' + url_path
  1437       url_path = '/' + url_path
   737     self.url_path = url_path
  1438     self.url_path = url_path
   738     self.kind = kind
  1439     self.kind = kind
   739     self.throttle = throttle
  1440     self.throttle = throttle
   740     self.credentials = None
  1441     self.batch_size = batch_size
       
  1442     self.secure = secure
       
  1443     self.authenticated = False
       
  1444     self.auth_called = False
       
  1445     self.parallel_download = True
       
  1446     self.email = email
       
  1447     self.passin = passin
   741     throttled_rpc_server_factory = ThrottledHttpRpcServerFactory(
  1448     throttled_rpc_server_factory = ThrottledHttpRpcServerFactory(
   742         self.throttle, self)
  1449         self.throttle, self)
   743     logging.debug('Configuring remote_api. app_id = %s, url_path = %s, '
  1450     logger.debug('Configuring remote_api. url_path = %s, '
   744                   'servername = %s' % (app_id, url_path, host_port))
  1451                  'servername = %s' % (url_path, host_port))
   745     remote_api_stub.ConfigureRemoteDatastore(
  1452     remote_api_stub.ConfigureRemoteDatastore(
   746         app_id,
  1453         app_id,
   747         url_path,
  1454         url_path,
   748         self.AuthFunction,
  1455         self.AuthFunction,
   749         servername=host_port,
  1456         servername=host_port,
   750         rpc_server_factory=throttled_rpc_server_factory)
  1457         rpc_server_factory=throttled_rpc_server_factory,
   751     self.authenticated = False
  1458         secure=self.secure)
       
  1459     logger.debug('Bulkloader using app_id: %s', os.environ['APPLICATION_ID'])
   752 
  1460 
   753   def Authenticate(self):
  1461   def Authenticate(self):
   754     """Invoke authentication if necessary."""
  1462     """Invoke authentication if necessary."""
       
  1463     logger.info('Connecting to %s', self.url_path)
   755     self.rpc_server.Send(self.url_path, payload=None)
  1464     self.rpc_server.Send(self.url_path, payload=None)
   756     self.authenticated = True
  1465     self.authenticated = True
   757 
  1466 
   758   def AuthFunction(self,
  1467   def AuthFunction(self,
   759                    raw_input_fn=raw_input,
  1468                    raw_input_fn=raw_input,
   768       password_input_fn: Used for dependency injection.
  1477       password_input_fn: Used for dependency injection.
   769 
  1478 
   770     Returns:
  1479     Returns:
   771       A pair of the username and password.
  1480       A pair of the username and password.
   772     """
  1481     """
   773     if self.credentials is not None:
  1482     if self.email:
   774       return self.credentials
  1483       email = self.email
   775     print 'Please enter login credentials for %s (%s)' % (
  1484     else:
   776         self.host, self.app_id)
  1485       print 'Please enter login credentials for %s' % (
   777     email = raw_input_fn('Email: ')
  1486           self.host)
       
  1487       email = raw_input_fn('Email: ')
       
  1488 
   778     if email:
  1489     if email:
   779       password_prompt = 'Password for %s: ' % email
  1490       password_prompt = 'Password for %s: ' % email
   780       password = password_input_fn(password_prompt)
  1491       if self.passin:
       
  1492         password = raw_input_fn(password_prompt)
       
  1493       else:
       
  1494         password = password_input_fn(password_prompt)
   781     else:
  1495     else:
   782       password = None
  1496       password = None
   783     self.credentials = (email, password)
  1497 
   784     return self.credentials
  1498     self.auth_called = True
   785 
  1499     return (email, password)
   786   def _GetHeaders(self):
  1500 
   787     """Constructs a dictionary of extra headers to send with a request."""
  1501   def EncodeContent(self, rows, loader=None):
   788     headers = {
       
   789         'GAE-Uploader-Version': UPLOADER_VERSION,
       
   790         'GAE-Uploader-Kind': self.kind
       
   791         }
       
   792     return headers
       
   793 
       
   794   def EncodeContent(self, rows):
       
   795     """Encodes row data to the wire format.
  1502     """Encodes row data to the wire format.
   796 
  1503 
   797     Args:
  1504     Args:
   798       rows: A list of pairs of a line number and a list of column values.
  1505       rows: A list of pairs of a line number and a list of column values.
       
  1506       loader: Used for dependency injection.
   799 
  1507 
   800     Returns:
  1508     Returns:
   801       A list of db.Model instances.
  1509       A list of db.Model instances.
   802     """
  1510 
   803     try:
  1511     Raises:
   804       loader = Loader.RegisteredLoaders()[self.kind]
  1512       ConfigurationError: if no loader is defined for self.kind
   805     except KeyError:
  1513     """
   806       logging.error('No Loader defined for kind %s.' % self.kind)
  1514     if not loader:
   807       raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
  1515       try:
       
  1516         loader = Loader.RegisteredLoader(self.kind)
       
  1517       except KeyError:
       
  1518         logger.error('No Loader defined for kind %s.' % self.kind)
       
  1519         raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
   808     entities = []
  1520     entities = []
   809     for line_number, values in rows:
  1521     for line_number, values in rows:
   810       key = loader.GenerateKey(line_number, values)
  1522       key = loader.generate_key(line_number, values)
   811       entity = loader.CreateEntity(values, key_name=key)
  1523       if isinstance(key, db.Key):
   812       entities.extend(entity)
  1524         parent = key.parent()
       
  1525         key = key.name()
       
  1526       else:
       
  1527         parent = None
       
  1528       entity = loader.create_entity(values, key_name=key, parent=parent)
       
  1529       if isinstance(entity, list):
       
  1530         entities.extend(entity)
       
  1531       elif entity:
       
  1532         entities.append(entity)
   813 
  1533 
   814     return entities
  1534     return entities
   815 
  1535 
   816   def PostEntities(self, item):
  1536   def PostEntities(self, item):
   817     """Posts Entity records to a remote endpoint over HTTP.
  1537     """Posts Entity records to a remote endpoint over HTTP.
   824         from the server as a str.
  1544         from the server as a str.
   825     """
  1545     """
   826     entities = item.content
  1546     entities = item.content
   827     db.put(entities)
  1547     db.put(entities)
   828 
  1548 
   829 
  1549   def GetEntities(self, key_range):
   830 class WorkItem(object):
  1550     """Gets Entity records from a remote endpoint over HTTP.
   831   """Holds a unit of uploading work.
  1551 
   832 
  1552     Args:
   833   A WorkItem represents a number of entities that need to be uploaded to
  1553      key_range: Range of keys to get.
   834   Google App Engine. These entities are encoded in the "content" field of
  1554 
   835   the WorkItem, and will be POST'd as-is to the server.
  1555     Returns:
   836 
  1556       An ExportResult instance.
   837   The entities are identified by a range of numeric keys, inclusively. In
  1557 
   838   the case of a resumption of an upload, or a replay to correct errors,
  1558     Raises:
   839   these keys must be able to identify the same set of entities.
  1559       ConfigurationError: if no Exporter is defined for self.kind
   840 
  1560     """
   841   Note that keys specify a range. The entities do not have to sequentially
  1561     try:
   842   fill the entire range, they must simply bound a range of valid keys.
  1562       Exporter.RegisteredExporter(self.kind)
   843   """
  1563     except KeyError:
   844 
  1564       raise ConfigurationError('No Exporter defined for kind %s.' % self.kind)
   845   def __init__(self, progress_queue, rows, key_start, key_end,
  1565 
   846                progress_key=None):
  1566     keys = []
   847     """Initialize the WorkItem instance.
  1567     entities = []
   848 
  1568 
   849     Args:
  1569     if self.parallel_download:
   850       progress_queue: A queue used for tracking progress information.
  1570       query = key_range.MakeParallelQuery()
   851       rows: A list of pairs of a line number and a list of column values
  1571       try:
   852       key_start: The (numeric) starting key, inclusive.
  1572         results = query.fetch(self.batch_size)
   853       key_end: The (numeric) ending key, inclusive.
  1573       except datastore_errors.NeedIndexError:
   854       progress_key: If this WorkItem represents state from a prior run,
  1574         logger.info('%s: No descending index on __key__, '
   855         then this will be the key within the progress database.
  1575                     'performing serial download', self.kind)
   856     """
  1576         self.parallel_download = False
   857     self.state = STATE_READ
  1577 
   858 
  1578     if not self.parallel_download:
   859     self.progress_queue = progress_queue
  1579       key_range.direction = KeyRange.ASC
   860 
  1580       query = key_range.MakeSerialQuery()
   861     assert isinstance(key_start, (int, long))
  1581       results = query.fetch(self.batch_size)
   862     assert isinstance(key_end, (int, long))
  1582 
   863     assert key_start <= key_end
  1583     size = len(results)
   864 
  1584 
   865     self.key_start = key_start
  1585     for model in results:
   866     self.key_end = key_end
  1586       key = model.key()
   867     self.progress_key = progress_key
  1587       entities.append(cPickle.dumps(model))
   868 
  1588       keys.append(key)
   869     self.progress_event = threading.Event()
  1589 
   870 
  1590     continued = (size == self.batch_size)
   871     self.rows = rows
  1591     key_range.count = size
   872     self.content = None
  1592 
   873     self.count = len(rows)
  1593     return ExportResult(continued, key_range.direction, keys, entities)
   874 
       
   875   def MarkAsRead(self):
       
   876     """Mark this WorkItem as read/consumed from the data source."""
       
   877 
       
   878     assert self.state == STATE_READ
       
   879 
       
   880     self._StateTransition(STATE_READ, blocking=True)
       
   881 
       
   882     assert self.progress_key is not None
       
   883 
       
   884   def MarkAsSending(self):
       
   885     """Mark this WorkItem as in-process on being uploaded to the server."""
       
   886 
       
   887     assert self.state == STATE_READ or self.state == STATE_NOT_SENT
       
   888     assert self.progress_key is not None
       
   889 
       
   890     self._StateTransition(STATE_SENDING, blocking=True)
       
   891 
       
   892   def MarkAsSent(self):
       
   893     """Mark this WorkItem as sucessfully-sent to the server."""
       
   894 
       
   895     assert self.state == STATE_SENDING
       
   896     assert self.progress_key is not None
       
   897 
       
   898     self._StateTransition(STATE_SENT, blocking=False)
       
   899 
       
   900   def MarkAsError(self):
       
   901     """Mark this WorkItem as required manual error recovery."""
       
   902 
       
   903     assert self.state == STATE_SENDING
       
   904     assert self.progress_key is not None
       
   905 
       
   906     self._StateTransition(STATE_NOT_SENT, blocking=True)
       
   907 
       
   908   def _StateTransition(self, new_state, blocking=False):
       
   909     """Transition the work item to a new state, storing progress information.
       
   910 
       
   911     Args:
       
   912       new_state: The state to transition to.
       
   913       blocking: Whether to block for the progress thread to acknowledge the
       
   914         transition.
       
   915     """
       
   916     logging.debug('[%s-%s] %s' %
       
   917                   (self.key_start, self.key_end, StateMessage(self.state)))
       
   918     assert not self.progress_event.isSet()
       
   919 
       
   920     self.state = new_state
       
   921 
       
   922     self.progress_queue.put(self)
       
   923 
       
   924     if blocking:
       
   925       self.progress_event.wait()
       
   926 
       
   927       self.progress_event.clear()
       
   928 
       
   929 
  1594 
   930 
  1595 
   931 def InterruptibleSleep(sleep_time):
  1596 def InterruptibleSleep(sleep_time):
   932   """Puts thread to sleep, checking this threads exit_flag twice a second.
  1597   """Puts thread to sleep, checking this threads exit_flag twice a second.
   933 
  1598 
   959   Initially the ThreadGate allows only one uploader thread to be active.
  1624   Initially the ThreadGate allows only one uploader thread to be active.
   960   For each successful upload, another thread is activated and for each
  1625   For each successful upload, another thread is activated and for each
   961   failed upload, the number of active threads is reduced by one.
  1626   failed upload, the number of active threads is reduced by one.
   962   """
  1627   """
   963 
  1628 
   964   def __init__(self, enabled, sleep=InterruptibleSleep):
  1629   def __init__(self, enabled,
       
  1630                threshhold1=MAXIMUM_INCREASE_DURATION,
       
  1631                threshhold2=MAXIMUM_HOLD_DURATION,
       
  1632                sleep=InterruptibleSleep):
       
  1633     """Constructor for ThreadGate instances.
       
  1634 
       
  1635     Args:
       
  1636       enabled: Whether the thread gate is enabled
       
  1637       threshhold1: Maximum duration (in seconds) for a transfer to increase
       
  1638         the number of active threads.
       
  1639       threshhold2: Maximum duration (in seconds) for a transfer to not decrease
       
  1640         the number of active threads.
       
  1641     """
   965     self.enabled = enabled
  1642     self.enabled = enabled
   966     self.enabled_count = 1
  1643     self.enabled_count = 1
   967     self.lock = threading.Lock()
  1644     self.lock = threading.Lock()
   968     self.thread_semaphore = threading.Semaphore(self.enabled_count)
  1645     self.thread_semaphore = threading.Semaphore(self.enabled_count)
   969     self._threads = []
  1646     self._threads = []
   970     self.backoff_time = 0
  1647     self.backoff_time = 0
   971     self.sleep = sleep
  1648     self.sleep = sleep
       
  1649     self.threshhold1 = threshhold1
       
  1650     self.threshhold2 = threshhold2
   972 
  1651 
   973   def Register(self, thread):
  1652   def Register(self, thread):
   974     """Register a thread with the thread gate."""
  1653     """Register a thread with the thread gate."""
   975     self._threads.append(thread)
  1654     self._threads.append(thread)
   976 
  1655 
   988       self.lock.release()
  1667       self.lock.release()
   989     self.thread_semaphore.release()
  1668     self.thread_semaphore.release()
   990 
  1669 
   991   def EnableAllThreads(self):
  1670   def EnableAllThreads(self):
   992     """Enable all worker threads."""
  1671     """Enable all worker threads."""
   993     for unused_idx in range(len(self._threads) - self.enabled_count):
  1672     for unused_idx in xrange(len(self._threads) - self.enabled_count):
   994       self.EnableThread()
  1673       self.EnableThread()
   995 
  1674 
   996   def StartWork(self):
  1675   def StartWork(self):
   997     """Starts a critical section in which the number of workers is limited.
  1676     """Starts a critical section in which the number of workers is limited.
   998 
  1677 
  1002     """
  1681     """
  1003     if self.enabled:
  1682     if self.enabled:
  1004       self.thread_semaphore.acquire()
  1683       self.thread_semaphore.acquire()
  1005       if self.backoff_time > 0.0:
  1684       if self.backoff_time > 0.0:
  1006         if not threading.currentThread().exit_flag:
  1685         if not threading.currentThread().exit_flag:
  1007           logging.info('Backing off: %.1f seconds',
  1686           logger.info('Backing off: %.1f seconds',
  1008                        self.backoff_time)
  1687                       self.backoff_time)
  1009         self.sleep(self.backoff_time)
  1688         self.sleep(self.backoff_time)
  1010 
  1689 
  1011   def FinishWork(self):
  1690   def FinishWork(self):
  1012     """Ends a critical section started with self.StartWork()."""
  1691     """Ends a critical section started with self.StartWork()."""
  1013     if self.enabled:
  1692     if self.enabled:
  1014       self.thread_semaphore.release()
  1693       self.thread_semaphore.release()
  1015 
  1694 
  1016   def IncreaseWorkers(self):
  1695   def TransferSuccess(self, duration):
  1017     """Informs the throttler that an item was successfully sent.
  1696     """Informs the throttler that an item was successfully sent.
  1018 
  1697 
  1019     If thread throttling is enabled, this method will cause an
  1698     If thread throttling is enabled and the duration is low enough, this
  1020     additional thread to run in the critical section.
  1699     method will cause an additional thread to run in the critical section.
  1021     """
  1700 
  1022     if self.enabled:
  1701     Args:
       
  1702       duration: The duration of the transfer in seconds.
       
  1703     """
       
  1704     if duration > self.threshhold2:
       
  1705       self.DecreaseWorkers()
       
  1706     elif duration > self.threshhold1:
       
  1707       return
       
  1708     elif self.enabled:
  1023       if self.backoff_time > 0.0:
  1709       if self.backoff_time > 0.0:
  1024         logging.info('Resetting backoff to 0.0')
  1710         logger.info('Resetting backoff to 0.0')
  1025         self.backoff_time = 0.0
  1711         self.backoff_time = 0.0
  1026       do_enable = False
  1712       do_enable = False
  1027       self.lock.acquire()
  1713       self.lock.acquire()
  1028       try:
  1714       try:
  1029         if self.enabled and len(self._threads) > self.enabled_count:
  1715         if self.enabled and len(self._threads) > self.enabled_count:
  1030           do_enable = True
  1716           do_enable = True
  1031           self.enabled_count += 1
  1717           self.enabled_count += 1
  1032       finally:
  1718       finally:
  1033         self.lock.release()
  1719         self.lock.release()
  1034       if do_enable:
  1720       if do_enable:
       
  1721         logger.debug('Increasing active thread count to %d',
       
  1722                      self.enabled_count)
  1035         self.thread_semaphore.release()
  1723         self.thread_semaphore.release()
  1036 
  1724 
  1037   def DecreaseWorkers(self):
  1725   def DecreaseWorkers(self):
  1038     """Informs the thread_gate that an item failed to send.
  1726     """Informs the thread_gate that an item failed to send.
  1039 
  1727 
  1056             else:
  1744             else:
  1057               self.backoff_time *= BACKOFF_FACTOR
  1745               self.backoff_time *= BACKOFF_FACTOR
  1058       finally:
  1746       finally:
  1059         self.lock.release()
  1747         self.lock.release()
  1060       if do_disable:
  1748       if do_disable:
       
  1749         logger.debug('Decreasing the number of active threads to %d',
       
  1750                      self.enabled_count)
  1061         self.thread_semaphore.acquire()
  1751         self.thread_semaphore.acquire()
  1062 
  1752 
  1063 
  1753 
  1064 class Throttle(object):
  1754 class Throttle(object):
  1065   """A base class for upload rate throttling.
  1755   """A base class for upload rate throttling.
  1205       sleep_time = (float(total) / self.throttles[throttle_name]) - duration
  1895       sleep_time = (float(total) / self.throttles[throttle_name]) - duration
  1206 
  1896 
  1207       if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
  1897       if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
  1208         break
  1898         break
  1209 
  1899 
  1210       logging.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
  1900       logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
  1211                     '(duration=%.1f ms, total=%d)',
  1901                    '(duration=%.1f ms, total=%d)',
  1212                     thread.getName(), throttle_name,
  1902                    thread.getName(), throttle_name,
  1213                     sleep_time * 1000, duration * 1000, total)
  1903                    sleep_time * 1000, duration * 1000, total)
  1214       self.thread_sleep(sleep_time)
  1904       self.thread_sleep(sleep_time)
  1215       if thread.exit_flag:
  1905       if thread.exit_flag:
  1216         break
  1906         break
  1217       self._RotateCounts(throttle_name)
  1907       self._RotateCounts(throttle_name)
  1218 
  1908 
  1297     self.exit_flag = False
  1987     self.exit_flag = False
  1298     self.error = None
  1988     self.error = None
  1299 
  1989 
  1300   def run(self):
  1990   def run(self):
  1301     """Perform the work of the thread."""
  1991     """Perform the work of the thread."""
  1302     logging.info('[%s] %s: started', self.getName(), self.__class__.__name__)
  1992     logger.info('[%s] %s: started', self.getName(), self.__class__.__name__)
  1303 
  1993 
  1304     try:
  1994     try:
  1305       self.PerformWork()
  1995       self.PerformWork()
  1306     except:
  1996     except:
  1307       self.error = sys.exc_info()[1]
  1997       self.error = sys.exc_info()[1]
  1308       logging.exception('[%s] %s:', self.getName(), self.__class__.__name__)
  1998       logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
  1309 
  1999 
  1310     logging.info('[%s] %s: exiting', self.getName(), self.__class__.__name__)
  2000     logger.info('[%s] %s: exiting', self.getName(), self.__class__.__name__)
  1311 
  2001 
  1312   def PerformWork(self):
  2002   def PerformWork(self):
  1313     """Perform the thread-specific work."""
  2003     """Perform the thread-specific work."""
  1314     raise NotImplementedError()
  2004     raise NotImplementedError()
  1315 
  2005 
  1316   def CheckError(self):
  2006   def CheckError(self):
  1317     """If an error is present, then log it."""
  2007     """If an error is present, then log it."""
  1318     if self.error:
  2008     if self.error:
  1319       logging.error('Error in %s: %s', self.GetFriendlyName(), self.error)
  2009       logger.error('Error in %s: %s', self.GetFriendlyName(), self.error)
  1320 
  2010 
  1321   def GetFriendlyName(self):
  2011   def GetFriendlyName(self):
  1322     """Returns a human-friendly description of the thread."""
  2012     """Returns a human-friendly description of the thread."""
  1323     if hasattr(self, 'NAME'):
  2013     if hasattr(self, 'NAME'):
  1324       return self.NAME
  2014       return self.NAME
  1325     return 'unknown thread'
  2015     return 'unknown thread'
  1326 
  2016 
  1327 
  2017 
  1328 class BulkLoaderThread(_ThreadBase):
  2018 non_fatal_error_codes = set([errno.EAGAIN,
  1329   """A thread which transmits entities to the server application.
  2019                              errno.ENETUNREACH,
       
  2020                              errno.ENETRESET,
       
  2021                              errno.ECONNRESET,
       
  2022                              errno.ETIMEDOUT,
       
  2023                              errno.EHOSTUNREACH])
       
  2024 
       
  2025 
       
  2026 def IsURLErrorFatal(error):
       
  2027   """Returns False if the given URLError may be from a transient failure.
       
  2028 
       
  2029   Args:
       
  2030     error: A urllib2.URLError instance.
       
  2031   """
       
  2032   assert isinstance(error, urllib2.URLError)
       
  2033   if not hasattr(error, 'reason'):
       
  2034     return True
       
  2035   if not isinstance(error.reason[0], int):
       
  2036     return True
       
  2037   return error.reason[0] not in non_fatal_error_codes
       
  2038 
       
  2039 
       
  2040 def PrettyKey(key):
       
  2041   """Returns a nice string representation of the given key."""
       
  2042   if key is None:
       
  2043     return None
       
  2044   elif isinstance(key, db.Key):
       
  2045     return repr(key.id_or_name())
       
  2046   return str(key)
       
  2047 
       
  2048 
       
  2049 class _BulkWorkerThread(_ThreadBase):
       
  2050   """A base class for worker threads.
  1330 
  2051 
  1331   This thread will read WorkItem instances from the work_queue and upload
  2052   This thread will read WorkItem instances from the work_queue and upload
  1332   the entities to the server application. Progress information will be
  2053   the entities to the server application. Progress information will be
  1333   pushed into the progress_queue as the work is being performed.
  2054   pushed into the progress_queue as the work is being performed.
  1334 
  2055 
  1335   If a BulkLoaderThread encounters a transient error, the entities will be
  2056   If a _BulkWorkerThread encounters a transient error, the entities will be
  1336   resent, if a fatal error is encoutered the BulkLoaderThread exits.
  2057   resent, if a fatal error is encoutered the BulkWorkerThread exits.
       
  2058 
       
  2059   Subclasses must provide implementations for PreProcessItem, TransferItem,
       
  2060   and ProcessResponse.
  1337   """
  2061   """
  1338 
  2062 
  1339   def __init__(self,
  2063   def __init__(self,
  1340                work_queue,
  2064                work_queue,
  1341                throttle,
  2065                throttle,
  1342                thread_gate,
  2066                thread_gate,
  1343                request_manager):
  2067                request_manager,
       
  2068                num_threads,
       
  2069                batch_size,
       
  2070                state_message,
       
  2071                get_time):
  1344     """Initialize the BulkLoaderThread instance.
  2072     """Initialize the BulkLoaderThread instance.
  1345 
  2073 
  1346     Args:
  2074     Args:
  1347       work_queue: A queue containing WorkItems for processing.
  2075       work_queue: A queue containing WorkItems for processing.
  1348       throttle: A Throttles to control upload bandwidth.
  2076       throttle: A Throttles to control upload bandwidth.
  1349       thread_gate: A ThreadGate to control number of simultaneous uploads.
  2077       thread_gate: A ThreadGate to control number of simultaneous uploads.
  1350       request_manager: A RequestManager instance.
  2078       request_manager: A RequestManager instance.
       
  2079       num_threads: The number of threads for parallel transfers.
       
  2080       batch_size: The number of entities to transfer per request.
       
  2081       state_message: Used for dependency injection.
       
  2082       get_time: Used for dependency injection.
  1351     """
  2083     """
  1352     _ThreadBase.__init__(self)
  2084     _ThreadBase.__init__(self)
  1353 
  2085 
  1354     self.work_queue = work_queue
  2086     self.work_queue = work_queue
  1355     self.throttle = throttle
  2087     self.throttle = throttle
  1356     self.thread_gate = thread_gate
  2088     self.thread_gate = thread_gate
  1357 
       
  1358     self.request_manager = request_manager
  2089     self.request_manager = request_manager
       
  2090     self.num_threads = num_threads
       
  2091     self.batch_size = batch_size
       
  2092     self.state_message = state_message
       
  2093     self.get_time = get_time
       
  2094 
       
  2095   def PreProcessItem(self, item):
       
  2096     """Performs pre transfer processing on a work item."""
       
  2097     raise NotImplementedError()
       
  2098 
       
  2099   def TransferItem(self, item):
       
  2100     """Transfers the entities associated with an item.
       
  2101 
       
  2102     Args:
       
  2103       item: An item of upload (WorkItem) or download (KeyRange) work.
       
  2104 
       
  2105     Returns:
       
  2106       A tuple of (estimated transfer size, response)
       
  2107     """
       
  2108     raise NotImplementedError()
       
  2109 
       
  2110   def ProcessResponse(self, item, result):
       
  2111     """Processes the response from the server application."""
       
  2112     raise NotImplementedError()
  1359 
  2113 
  1360   def PerformWork(self):
  2114   def PerformWork(self):
  1361     """Perform the work of a BulkLoaderThread."""
  2115     """Perform the work of a _BulkWorkerThread."""
  1362     while not self.exit_flag:
  2116     while not self.exit_flag:
  1363       success = False
  2117       transferred = False
  1364       self.thread_gate.StartWork()
  2118       self.thread_gate.StartWork()
  1365       try:
  2119       try:
  1366         try:
  2120         try:
  1367           item = self.work_queue.get(block=True, timeout=1.0)
  2121           item = self.work_queue.get(block=True, timeout=1.0)
  1368         except Queue.Empty:
  2122         except Queue.Empty:
  1369           continue
  2123           continue
  1370         if item == _THREAD_SHOULD_EXIT:
  2124         if item == _THREAD_SHOULD_EXIT:
  1371           break
  2125           break
  1372 
  2126 
  1373         logging.debug('[%s] Got work item [%d-%d]',
  2127         logger.debug('[%s] Got work item %s', self.getName(), item)
  1374                       self.getName(), item.key_start, item.key_end)
       
  1375 
  2128 
  1376         try:
  2129         try:
  1377 
  2130 
  1378           item.MarkAsSending()
  2131           item.MarkAsTransferring()
       
  2132           self.PreProcessItem(item)
       
  2133           response = None
  1379           try:
  2134           try:
  1380             if item.content is None:
       
  1381               item.content = self.request_manager.EncodeContent(item.rows)
       
  1382             try:
  2135             try:
  1383               self.request_manager.PostEntities(item)
  2136               t = self.get_time()
  1384               success = True
  2137               response = self.TransferItem(item)
  1385               logging.debug(
  2138               status = 200
  1386                   '[%d-%d] Sent %d entities',
  2139               transferred = True
  1387                   item.key_start, item.key_end, item.count)
  2140               transfer_time = self.get_time() - t
       
  2141               logger.debug('[%s] %s Transferred %d entities', self.getName(),
       
  2142                            item, item.count)
  1388               self.throttle.AddTransfer(RECORDS, item.count)
  2143               self.throttle.AddTransfer(RECORDS, item.count)
  1389             except (db.InternalError, db.NotSavedError, db.Timeout), e:
  2144             except (db.InternalError, db.NotSavedError, db.Timeout,
  1390               logging.debug('Caught non-fatal error: %s', e)
  2145                     apiproxy_errors.OverQuotaError,
       
  2146                     apiproxy_errors.DeadlineExceededError), e:
       
  2147               logger.exception('Caught non-fatal datastore error: %s', e)
  1391             except urllib2.HTTPError, e:
  2148             except urllib2.HTTPError, e:
  1392               if e.code == 403 or (e.code >= 500 and e.code < 600):
  2149               status = e.code
  1393                 logging.debug('Caught HTTP error %d', e.code)
  2150               if status == 403 or (status >= 500 and status < 600):
  1394                 logging.debug('%s', e.read())
  2151                 logger.exception('Caught non-fatal HTTP error: %d %s',
       
  2152                                  status, e.msg)
  1395               else:
  2153               else:
  1396                 raise e
  2154                 raise e
       
  2155             except urllib2.URLError, e:
       
  2156               if IsURLErrorFatal(e):
       
  2157                 raise e
       
  2158               else:
       
  2159                 logger.exception('Caught non-fatal URL error: %s', e.reason)
       
  2160 
       
  2161             self.ProcessResponse(item, response)
  1397 
  2162 
  1398           except:
  2163           except:
  1399             self.error = sys.exc_info()[1]
  2164             self.error = sys.exc_info()[1]
  1400             logging.exception('[%s] %s: caught exception %s', self.getName(),
  2165             logger.exception('[%s] %s: caught exception %s', self.getName(),
  1401                               self.__class__.__name__, str(sys.exc_info()))
  2166                              self.__class__.__name__, str(sys.exc_info()))
  1402             raise
  2167             raise
  1403 
  2168 
  1404         finally:
  2169         finally:
  1405           if success:
  2170           if transferred:
  1406             item.MarkAsSent()
  2171             item.MarkAsTransferred()
  1407             self.thread_gate.IncreaseWorkers()
  2172             self.thread_gate.TransferSuccess(transfer_time)
  1408             self.work_queue.task_done()
  2173             self.work_queue.task_done()
  1409           else:
  2174           else:
  1410             item.MarkAsError()
  2175             item.MarkAsError()
  1411             self.thread_gate.DecreaseWorkers()
       
  1412             try:
  2176             try:
  1413               self.work_queue.reput(item, block=False)
  2177               self.work_queue.reput(item, block=False)
  1414             except Queue.Full:
  2178             except Queue.Full:
  1415               logging.error('[%s] Failed to reput work item.', self.getName())
  2179               logger.error('[%s] Failed to reput work item.', self.getName())
  1416               raise Error('Failed to reput work item')
  2180               raise Error('Failed to reput work item')
  1417           logging.info('[%d-%d] %s',
  2181             self.thread_gate.DecreaseWorkers()
  1418                        item.key_start, item.key_end, StateMessage(item.state))
  2182           logger.info('%s %s',
       
  2183                       item,
       
  2184                       self.state_message(item.state))
  1419 
  2185 
  1420       finally:
  2186       finally:
  1421         self.thread_gate.FinishWork()
  2187         self.thread_gate.FinishWork()
  1422 
  2188 
  1423 
  2189 
  1424   def GetFriendlyName(self):
  2190   def GetFriendlyName(self):
  1425     """Returns a human-friendly name for this thread."""
  2191     """Returns a human-friendly name for this thread."""
  1426     return 'worker [%s]' % self.getName()
  2192     return 'worker [%s]' % self.getName()
       
  2193 
       
  2194 
       
  2195 class BulkLoaderThread(_BulkWorkerThread):
       
  2196   """A thread which transmits entities to the server application.
       
  2197 
       
  2198   This thread will read WorkItem instances from the work_queue and upload
       
  2199   the entities to the server application. Progress information will be
       
  2200   pushed into the progress_queue as the work is being performed.
       
  2201 
       
  2202   If a BulkLoaderThread encounters a transient error, the entities will be
       
  2203   resent, if a fatal error is encoutered the BulkLoaderThread exits.
       
  2204   """
       
  2205 
       
  2206   def __init__(self,
       
  2207                work_queue,
       
  2208                throttle,
       
  2209                thread_gate,
       
  2210                request_manager,
       
  2211                num_threads,
       
  2212                batch_size,
       
  2213                get_time=time.time):
       
  2214     """Initialize the BulkLoaderThread instance.
       
  2215 
       
  2216     Args:
       
  2217       work_queue: A queue containing WorkItems for processing.
       
  2218       throttle: A Throttles to control upload bandwidth.
       
  2219       thread_gate: A ThreadGate to control number of simultaneous uploads.
       
  2220       request_manager: A RequestManager instance.
       
  2221       num_threads: The number of threads for parallel transfers.
       
  2222       batch_size: The number of entities to transfer per request.
       
  2223       get_time: Used for dependency injection.
       
  2224     """
       
  2225     _BulkWorkerThread.__init__(self,
       
  2226                                work_queue,
       
  2227                                throttle,
       
  2228                                thread_gate,
       
  2229                                request_manager,
       
  2230                                num_threads,
       
  2231                                batch_size,
       
  2232                                ImportStateMessage,
       
  2233                                get_time)
       
  2234 
       
  2235   def PreProcessItem(self, item):
       
  2236     """Performs pre transfer processing on a work item."""
       
  2237     if item and not item.content:
       
  2238       item.content = self.request_manager.EncodeContent(item.rows)
       
  2239 
       
  2240   def TransferItem(self, item):
       
  2241     """Transfers the entities associated with an item.
       
  2242 
       
  2243     Args:
       
  2244       item: An item of upload (WorkItem) work.
       
  2245 
       
  2246     Returns:
       
  2247       A tuple of (estimated transfer size, response)
       
  2248     """
       
  2249     return self.request_manager.PostEntities(item)
       
  2250 
       
  2251   def ProcessResponse(self, item, response):
       
  2252     """Processes the response from the server application."""
       
  2253     pass
       
  2254 
       
  2255 
       
  2256 class BulkExporterThread(_BulkWorkerThread):
       
  2257   """A thread which recieved entities to the server application.
       
  2258 
       
  2259   This thread will read KeyRange instances from the work_queue and export
       
  2260   the entities from the server application. Progress information will be
       
  2261   pushed into the progress_queue as the work is being performed.
       
  2262 
       
  2263   If a BulkExporterThread encounters an error when trying to post data,
       
  2264   the thread will exit and cause the application to terminate.
       
  2265   """
       
  2266 
       
  2267   def __init__(self,
       
  2268                work_queue,
       
  2269                throttle,
       
  2270                thread_gate,
       
  2271                request_manager,
       
  2272                num_threads,
       
  2273                batch_size,
       
  2274                get_time=time.time):
       
  2275 
       
  2276     """Initialize the BulkExporterThread instance.
       
  2277 
       
  2278     Args:
       
  2279       work_queue: A queue containing KeyRanges for processing.
       
  2280       throttle: A Throttles to control upload bandwidth.
       
  2281       thread_gate: A ThreadGate to control number of simultaneous uploads.
       
  2282       request_manager: A RequestManager instance.
       
  2283       num_threads: The number of threads for parallel transfers.
       
  2284       batch_size: The number of entities to transfer per request.
       
  2285       get_time: Used for dependency injection.
       
  2286     """
       
  2287     _BulkWorkerThread.__init__(self,
       
  2288                                work_queue,
       
  2289                                throttle,
       
  2290                                thread_gate,
       
  2291                                request_manager,
       
  2292                                num_threads,
       
  2293                                batch_size,
       
  2294                                ExportStateMessage,
       
  2295                                get_time)
       
  2296 
       
  2297   def PreProcessItem(self, unused_item):
       
  2298     """Performs pre transfer processing on a work item."""
       
  2299     pass
       
  2300 
       
  2301   def TransferItem(self, item):
       
  2302     """Transfers the entities associated with an item.
       
  2303 
       
  2304     Args:
       
  2305       item: An item of download (KeyRange) work.
       
  2306 
       
  2307     Returns:
       
  2308       A tuple of (estimated transfer size, response)
       
  2309     """
       
  2310     return self.request_manager.GetEntities(item)
       
  2311 
       
  2312   def ProcessResponse(self, item, export_result):
       
  2313     """Processes the response from the server application."""
       
  2314     if export_result:
       
  2315       item.Process(export_result, self.num_threads, self.batch_size,
       
  2316                    self.work_queue)
  1427 
  2317 
  1428 
  2318 
  1429 class DataSourceThread(_ThreadBase):
  2319 class DataSourceThread(_ThreadBase):
  1430   """A thread which reads WorkItems and pushes them into queue.
  2320   """A thread which reads WorkItems and pushes them into queue.
  1431 
  2321 
  1469       progress_gen = None
  2359       progress_gen = None
  1470 
  2360 
  1471     content_gen = self.workitem_generator_factory(self.progress_queue,
  2361     content_gen = self.workitem_generator_factory(self.progress_queue,
  1472                                                   progress_gen)
  2362                                                   progress_gen)
  1473 
  2363 
  1474     self.sent_count = 0
  2364     self.xfer_count = 0
  1475     self.read_count = 0
  2365     self.read_count = 0
  1476     self.read_all = False
  2366     self.read_all = False
  1477 
  2367 
  1478     for item in content_gen.Batches():
  2368     for item in content_gen.Batches():
  1479       item.MarkAsRead()
  2369       item.MarkAsRead()
  1490         break
  2380         break
  1491 
  2381 
  1492     if not self.exit_flag:
  2382     if not self.exit_flag:
  1493       self.read_all = True
  2383       self.read_all = True
  1494     self.read_count = content_gen.row_count
  2384     self.read_count = content_gen.row_count
  1495     self.sent_count = content_gen.sent_count
  2385     self.xfer_count = content_gen.xfer_count
  1496 
  2386 
  1497 
  2387 
  1498 
  2388 
  1499 def _RunningInThread(thread):
  2389 def _RunningInThread(thread):
  1500   """Return True if we are running within the specified thread."""
  2390   """Return True if we are running within the specified thread."""
  1501   return threading.currentThread().getName() == thread.getName()
  2391   return threading.currentThread().getName() == thread.getName()
  1502 
  2392 
  1503 
  2393 
  1504 class ProgressDatabase(object):
  2394 class _Database(object):
  1505   """Persistently record all progress information during an upload.
  2395   """Base class for database connections in this module.
  1506 
  2396 
  1507   This class wraps a very simple SQLite database which records each of
  2397   The table is created by a primary thread (the python main thread)
  1508   the relevant details from the WorkItem instances. If the uploader is
  2398   but all future lookups and updates are performed by a secondary
  1509   resumed, then data is replayed out of the database.
  2399   thread.
  1510   """
  2400   """
  1511 
  2401 
  1512   def __init__(self, db_filename, commit_periodicity=100):
  2402   SIGNATURE_TABLE_NAME = 'bulkloader_database_signature'
  1513     """Initialize the ProgressDatabase instance.
  2403 
  1514 
  2404   def __init__(self,
  1515     Args:
  2405                db_filename,
  1516       db_filename: The name of the SQLite database to use.
  2406                create_table,
  1517       commit_periodicity: How many operations to perform between commits.
  2407                signature,
       
  2408                index=None,
       
  2409                commit_periodicity=100):
       
  2410     """Initialize the _Database instance.
       
  2411 
       
  2412     Args:
       
  2413       db_filename: The sqlite3 file to use for the database.
       
  2414       create_table: A string containing the SQL table creation command.
       
  2415       signature: A string identifying the important invocation options,
       
  2416         used to make sure we are not using an old database.
       
  2417       index: An optional string to create an index for the database.
       
  2418       commit_periodicity: Number of operations between database commits.
  1518     """
  2419     """
  1519     self.db_filename = db_filename
  2420     self.db_filename = db_filename
  1520 
  2421 
  1521     logging.info('Using progress database: %s', db_filename)
  2422     logger.info('Opening database: %s', db_filename)
  1522     self.primary_conn = sqlite3.connect(db_filename, isolation_level=None)
  2423     self.primary_conn = sqlite3.connect(db_filename, isolation_level=None)
  1523     self.primary_thread = threading.currentThread()
  2424     self.primary_thread = threading.currentThread()
  1524 
  2425 
  1525     self.progress_conn = None
  2426     self.secondary_conn = None
  1526     self.progress_thread = None
  2427     self.secondary_thread = None
  1527 
  2428 
  1528     self.operation_count = 0
  2429     self.operation_count = 0
  1529     self.commit_periodicity = commit_periodicity
  2430     self.commit_periodicity = commit_periodicity
  1530 
  2431 
  1531     self.prior_key_end = None
       
  1532 
       
  1533     try:
  2432     try:
  1534       self.primary_conn.execute(
  2433       self.primary_conn.execute(create_table)
  1535           """create table progress (
       
  1536           id integer primary key autoincrement,
       
  1537           state integer not null,
       
  1538           key_start integer not null,
       
  1539           key_end integer not null
       
  1540           )
       
  1541           """)
       
  1542     except sqlite3.OperationalError, e:
  2434     except sqlite3.OperationalError, e:
  1543       if 'already exists' not in e.message:
  2435       if 'already exists' not in e.message:
  1544         raise
  2436         raise
  1545 
  2437 
       
  2438     if index:
       
  2439       try:
       
  2440         self.primary_conn.execute(index)
       
  2441       except sqlite3.OperationalError, e:
       
  2442         if 'already exists' not in e.message:
       
  2443           raise
       
  2444 
       
  2445     self.existing_table = False
       
  2446     signature_cursor = self.primary_conn.cursor()
       
  2447     create_signature = """
       
  2448       create table %s (
       
  2449       value TEXT not null)
       
  2450     """ % _Database.SIGNATURE_TABLE_NAME
  1546     try:
  2451     try:
  1547       self.primary_conn.execute('create index i_state on progress (state)')
  2452       self.primary_conn.execute(create_signature)
       
  2453       self.primary_conn.cursor().execute(
       
  2454           'insert into %s (value) values (?)' % _Database.SIGNATURE_TABLE_NAME,
       
  2455           (signature,))
  1548     except sqlite3.OperationalError, e:
  2456     except sqlite3.OperationalError, e:
  1549       if 'already exists' not in e.message:
  2457       if 'already exists' not in e.message:
       
  2458         logger.exception('Exception creating table:')
  1550         raise
  2459         raise
       
  2460       else:
       
  2461         self.existing_table = True
       
  2462         signature_cursor.execute(
       
  2463             'select * from %s' % _Database.SIGNATURE_TABLE_NAME)
       
  2464         (result,) = signature_cursor.fetchone()
       
  2465         if result and result != signature:
       
  2466           logger.error('Database signature mismatch:\n\n'
       
  2467                        'Found:\n'
       
  2468                        '%s\n\n'
       
  2469                        'Expecting:\n'
       
  2470                        '%s\n',
       
  2471                        result, signature)
       
  2472           raise ResumeError('Database signature mismatch: %s != %s' % (
       
  2473                             signature, result))
  1551 
  2474 
  1552   def ThreadComplete(self):
  2475   def ThreadComplete(self):
  1553     """Finalize any operations the progress thread has performed.
  2476     """Finalize any operations the secondary thread has performed.
  1554 
  2477 
  1555     The database aggregates lots of operations into a single commit, and
  2478     The database aggregates lots of operations into a single commit, and
  1556     this method is used to commit any pending operations as the thread
  2479     this method is used to commit any pending operations as the thread
  1557     is about to shut down.
  2480     is about to shut down.
  1558     """
  2481     """
  1559     if self.progress_conn:
  2482     if self.secondary_conn:
  1560       self._MaybeCommit(force_commit=True)
  2483       self._MaybeCommit(force_commit=True)
  1561 
  2484 
  1562   def _MaybeCommit(self, force_commit=False):
  2485   def _MaybeCommit(self, force_commit=False):
  1563     """Periodically commit changes into the SQLite database.
  2486     """Periodically commit changes into the SQLite database.
  1564 
  2487 
  1571       force_commit: Pass True in order for a commit to occur regardless
  2494       force_commit: Pass True in order for a commit to occur regardless
  1572         of the current operation count.
  2495         of the current operation count.
  1573     """
  2496     """
  1574     self.operation_count += 1
  2497     self.operation_count += 1
  1575     if force_commit or (self.operation_count % self.commit_periodicity) == 0:
  2498     if force_commit or (self.operation_count % self.commit_periodicity) == 0:
  1576       self.progress_conn.commit()
  2499       self.secondary_conn.commit()
  1577 
  2500 
  1578   def _OpenProgressConnection(self):
  2501   def _OpenSecondaryConnection(self):
  1579     """Possibly open a database connection for the progress tracker thread.
  2502     """Possibly open a database connection for the secondary thread.
  1580 
  2503 
  1581     If the connection is not open (for the calling thread, which is assumed
  2504     If the connection is not open (for the calling thread, which is assumed
  1582     to be the progress tracker thread), then open it. We also open a couple
  2505     to be the unique secondary thread), then open it. We also open a couple
  1583     cursors for later use (and reuse).
  2506     cursors for later use (and reuse).
  1584     """
  2507     """
  1585     if self.progress_conn:
  2508     if self.secondary_conn:
  1586       return
  2509       return
  1587 
  2510 
  1588     assert not _RunningInThread(self.primary_thread)
  2511     assert not _RunningInThread(self.primary_thread)
  1589 
  2512 
  1590     self.progress_thread = threading.currentThread()
  2513     self.secondary_thread = threading.currentThread()
  1591 
  2514 
  1592     self.progress_conn = sqlite3.connect(self.db_filename)
  2515     self.secondary_conn = sqlite3.connect(self.db_filename)
  1593 
  2516 
  1594     self.insert_cursor = self.progress_conn.cursor()
  2517     self.insert_cursor = self.secondary_conn.cursor()
  1595     self.update_cursor = self.progress_conn.cursor()
  2518     self.update_cursor = self.secondary_conn.cursor()
  1596 
  2519 
  1597   def HasUnfinishedWork(self):
  2520 
       
  2521 class ResultDatabase(_Database):
       
  2522   """Persistently record all the entities downloaded during an export.
       
  2523 
       
  2524   The entities are held in the database by their unique datastore key
       
  2525   in order to avoid duplication if an export is restarted.
       
  2526   """
       
  2527 
       
  2528   def __init__(self, db_filename, signature, commit_periodicity=1):
       
  2529     """Initialize a ResultDatabase object.
       
  2530 
       
  2531     Args:
       
  2532       db_filename: The name of the SQLite database to use.
       
  2533       signature: A string identifying the important invocation options,
       
  2534         used to make sure we are not using an old database.
       
  2535       commit_periodicity: How many operations to perform between commits.
       
  2536     """
       
  2537     self.complete = False
       
  2538     create_table = ('create table result (\n'
       
  2539                     'id TEXT primary key,\n'
       
  2540                     'value BLOB not null)')
       
  2541 
       
  2542     _Database.__init__(self,
       
  2543                        db_filename,
       
  2544                        create_table,
       
  2545                        signature,
       
  2546                        commit_periodicity=commit_periodicity)
       
  2547     if self.existing_table:
       
  2548       cursor = self.primary_conn.cursor()
       
  2549       cursor.execute('select count(*) from result')
       
  2550       self.existing_count = int(cursor.fetchone()[0])
       
  2551     else:
       
  2552       self.existing_count = 0
       
  2553     self.count = self.existing_count
       
  2554 
       
  2555   def _StoreEntity(self, entity_id, value):
       
  2556     """Store an entity in the result database.
       
  2557 
       
  2558     Args:
       
  2559       entity_id: A db.Key for the entity.
       
  2560       value: A string of the contents of the entity.
       
  2561 
       
  2562     Returns:
       
  2563       True if this entities is not already present in the result database.
       
  2564     """
       
  2565 
       
  2566     assert _RunningInThread(self.secondary_thread)
       
  2567     assert isinstance(entity_id, db.Key)
       
  2568 
       
  2569     entity_id = entity_id.id_or_name()
       
  2570     self.insert_cursor.execute(
       
  2571         'select count(*) from result where id = ?', (unicode(entity_id),))
       
  2572     already_present = self.insert_cursor.fetchone()[0]
       
  2573     result = True
       
  2574     if already_present:
       
  2575       result = False
       
  2576       self.insert_cursor.execute('delete from result where id = ?',
       
  2577                                  (unicode(entity_id),))
       
  2578     else:
       
  2579       self.count += 1
       
  2580     self.insert_cursor.execute(
       
  2581         'insert into result (id, value) values (?, ?)',
       
  2582         (unicode(entity_id), buffer(value)))
       
  2583     return result
       
  2584 
       
  2585   def StoreEntities(self, keys, entities):
       
  2586     """Store a group of entities in the result database.
       
  2587 
       
  2588     Args:
       
  2589       keys: A list of entity keys.
       
  2590       entities: A list of entities.
       
  2591 
       
  2592     Returns:
       
  2593       The number of new entities stored in the result database.
       
  2594     """
       
  2595     self._OpenSecondaryConnection()
       
  2596     t = time.time()
       
  2597     count = 0
       
  2598     for entity_id, value in zip(keys,
       
  2599                                 entities):
       
  2600       if self._StoreEntity(entity_id, value):
       
  2601         count += 1
       
  2602     logger.debug('%s insert: delta=%.3f',
       
  2603                  self.db_filename,
       
  2604                  time.time() - t)
       
  2605     logger.debug('Entities transferred total: %s', self.count)
       
  2606     self._MaybeCommit()
       
  2607     return count
       
  2608 
       
  2609   def ResultsComplete(self):
       
  2610     """Marks the result database as containing complete results."""
       
  2611     self.complete = True
       
  2612 
       
  2613   def AllEntities(self):
       
  2614     """Yields all pairs of (id, value) from the result table."""
       
  2615     conn = sqlite3.connect(self.db_filename, isolation_level=None)
       
  2616     cursor = conn.cursor()
       
  2617 
       
  2618     cursor.execute(
       
  2619         'select id, value from result order by id')
       
  2620 
       
  2621     for unused_entity_id, entity in cursor:
       
  2622       yield cPickle.loads(str(entity))
       
  2623 
       
  2624 
       
  2625 class _ProgressDatabase(_Database):
       
  2626   """Persistently record all progress information during an upload.
       
  2627 
       
  2628   This class wraps a very simple SQLite database which records each of
       
  2629   the relevant details from a chunk of work. If the loader is
       
  2630   resumed, then data is replayed out of the database.
       
  2631   """
       
  2632 
       
  2633   def __init__(self,
       
  2634                db_filename,
       
  2635                sql_type,
       
  2636                py_type,
       
  2637                signature,
       
  2638                commit_periodicity=100):
       
  2639     """Initialize the ProgressDatabase instance.
       
  2640 
       
  2641     Args:
       
  2642       db_filename: The name of the SQLite database to use.
       
  2643       sql_type: A string of the SQL type to use for entity keys.
       
  2644       py_type: The python type of entity keys.
       
  2645       signature: A string identifying the important invocation options,
       
  2646         used to make sure we are not using an old database.
       
  2647       commit_periodicity: How many operations to perform between commits.
       
  2648     """
       
  2649     self.prior_key_end = None
       
  2650 
       
  2651     create_table = ('create table progress (\n'
       
  2652                     'id integer primary key autoincrement,\n'
       
  2653                     'state integer not null,\n'
       
  2654                     'key_start %s,\n'
       
  2655                     'key_end %s)'
       
  2656                     % (sql_type, sql_type))
       
  2657     self.py_type = py_type
       
  2658 
       
  2659     index = 'create index i_state on progress (state)'
       
  2660     _Database.__init__(self,
       
  2661                        db_filename,
       
  2662                        create_table,
       
  2663                        signature,
       
  2664                        index=index,
       
  2665                        commit_periodicity=commit_periodicity)
       
  2666 
       
  2667   def UseProgressData(self):
  1598     """Returns True if the database has progress information.
  2668     """Returns True if the database has progress information.
  1599 
  2669 
  1600     Note there are two basic cases for progress information:
  2670     Note there are two basic cases for progress information:
  1601     1) All saved records indicate a successful upload. In this case, we
  2671     1) All saved records indicate a successful upload. In this case, we
  1602        need to skip everything transmitted so far and then send the rest.
  2672        need to skip everything transmitted so far and then send the rest.
  1603     2) Some records for incomplete transfer are present. These need to be
  2673     2) Some records for incomplete transfer are present. These need to be
  1604        sent again, and then we resume sending after all the successful
  2674        sent again, and then we resume sending after all the successful
  1605        data.
  2675        data.
  1606 
  2676 
  1607     Returns:
  2677     Returns:
  1608       True if the database has progress information, False otherwise.
  2678       True: if the database has progress information.
  1609 
  2679 
  1610     Raises:
  2680     Raises:
  1611       ResumeError: If there is an error reading the progress database.
  2681       ResumeError: if there is an error retrieving rows from the database.
  1612     """
  2682     """
  1613     assert _RunningInThread(self.primary_thread)
  2683     assert _RunningInThread(self.primary_thread)
  1614 
  2684 
  1615     cursor = self.primary_conn.cursor()
  2685     cursor = self.primary_conn.cursor()
  1616     cursor.execute('select count(*) from progress')
  2686     cursor.execute('select count(*) from progress')
  1617     row = cursor.fetchone()
  2687     row = cursor.fetchone()
  1618     if row is None:
  2688     if row is None:
  1619       raise ResumeError('Error reading progress information.')
  2689       raise ResumeError('Cannot retrieve progress information from database.')
  1620 
  2690 
  1621     return row[0] != 0
  2691     return row[0] != 0
  1622 
  2692 
  1623   def StoreKeys(self, key_start, key_end):
  2693   def StoreKeys(self, key_start, key_end):
  1624     """Record a new progress record, returning a key for later updates.
  2694     """Record a new progress record, returning a key for later updates.
  1640       key_end: The end key of the WorkItem (inclusive)
  2710       key_end: The end key of the WorkItem (inclusive)
  1641 
  2711 
  1642     Returns:
  2712     Returns:
  1643       A string to later be used as a unique key to update this state.
  2713       A string to later be used as a unique key to update this state.
  1644     """
  2714     """
  1645     self._OpenProgressConnection()
  2715     self._OpenSecondaryConnection()
  1646 
  2716 
  1647     assert _RunningInThread(self.progress_thread)
  2717     assert _RunningInThread(self.secondary_thread)
  1648     assert isinstance(key_start, int)
  2718     assert not key_start or isinstance(key_start, self.py_type)
  1649     assert isinstance(key_end, int)
  2719     assert not key_end or isinstance(key_end, self.py_type), '%s is a %s' % (
  1650     assert key_start <= key_end
  2720         key_end, key_end.__class__)
  1651 
  2721     assert KeyLEQ(key_start, key_end), '%s not less than %s' % (
  1652     if self.prior_key_end is not None:
  2722         repr(key_start), repr(key_end))
  1653       assert key_start > self.prior_key_end
       
  1654     self.prior_key_end = key_end
       
  1655 
  2723 
  1656     self.insert_cursor.execute(
  2724     self.insert_cursor.execute(
  1657         'insert into progress (state, key_start, key_end) values (?, ?, ?)',
  2725         'insert into progress (state, key_start, key_end) values (?, ?, ?)',
  1658         (STATE_READ, key_start, key_end))
  2726         (STATE_READ, unicode(key_start), unicode(key_end)))
  1659 
  2727 
  1660     progress_key = self.insert_cursor.lastrowid
  2728     progress_key = self.insert_cursor.lastrowid
  1661 
  2729 
  1662     self._MaybeCommit()
  2730     self._MaybeCommit()
  1663 
  2731 
  1668 
  2736 
  1669     Args:
  2737     Args:
  1670       key: The key for this progress record, returned from StoreKeys
  2738       key: The key for this progress record, returned from StoreKeys
  1671       new_state: The new state to associate with this progress record.
  2739       new_state: The new state to associate with this progress record.
  1672     """
  2740     """
  1673     self._OpenProgressConnection()
  2741     self._OpenSecondaryConnection()
  1674 
  2742 
  1675     assert _RunningInThread(self.progress_thread)
  2743     assert _RunningInThread(self.secondary_thread)
  1676     assert isinstance(new_state, int)
  2744     assert isinstance(new_state, int)
  1677 
  2745 
  1678     self.update_cursor.execute('update progress set state=? where id=?',
  2746     self.update_cursor.execute('update progress set state=? where id=?',
  1679                                (new_state, key))
  2747                                (new_state, key))
  1680 
  2748 
  1681     self._MaybeCommit()
  2749     self._MaybeCommit()
  1682 
  2750 
       
  2751   def DeleteKey(self, progress_key):
       
  2752     """Delete the entities with the given key from the result database."""
       
  2753     self._OpenSecondaryConnection()
       
  2754 
       
  2755     assert _RunningInThread(self.secondary_thread)
       
  2756 
       
  2757     t = time.time()
       
  2758     self.insert_cursor.execute(
       
  2759         'delete from progress where rowid = ?', (progress_key,))
       
  2760 
       
  2761     logger.debug('delete: delta=%.3f', time.time() - t)
       
  2762 
       
  2763     self._MaybeCommit()
       
  2764 
  1683   def GetProgressStatusGenerator(self):
  2765   def GetProgressStatusGenerator(self):
  1684     """Get a generator which returns progress information.
  2766     """Get a generator which yields progress information.
  1685 
  2767 
  1686     The returned generator will yield a series of 4-tuples that specify
  2768     The returned generator will yield a series of 4-tuples that specify
  1687     progress information about a prior run of the uploader. The 4-tuples
  2769     progress information about a prior run of the uploader. The 4-tuples
  1688     have the following values:
  2770     have the following values:
  1689 
  2771 
  1704                was handled by the previous run of the uploader.
  2786                was handled by the previous run of the uploader.
  1705 
  2787 
  1706     The caller should begin uploading records which occur after key_end.
  2788     The caller should begin uploading records which occur after key_end.
  1707 
  2789 
  1708     Yields:
  2790     Yields:
  1709       Progress information as tuples (progress_key, state, key_start, key_end).
  2791       Four-tuples of (progress_key, state, key_start, key_end)
  1710     """
  2792     """
  1711     conn = sqlite3.connect(self.db_filename, isolation_level=None)
  2793     conn = sqlite3.connect(self.db_filename, isolation_level=None)
  1712     cursor = conn.cursor()
  2794     cursor = conn.cursor()
  1713 
  2795 
  1714     cursor.execute('select max(id) from progress')
  2796     cursor.execute('select max(key_end) from progress')
  1715     batch_id = cursor.fetchone()[0]
  2797 
  1716 
  2798     result = cursor.fetchone()
  1717     cursor.execute('select key_end from progress where id = ?', (batch_id,))
  2799     if result is not None:
  1718     key_end = cursor.fetchone()[0]
  2800       key_end = result[0]
       
  2801     else:
       
  2802       logger.debug('No rows in progress database.')
       
  2803       return
  1719 
  2804 
  1720     self.prior_key_end = key_end
  2805     self.prior_key_end = key_end
  1721 
  2806 
  1722     cursor.execute(
  2807     cursor.execute(
  1723         'select id, state, key_start, key_end from progress'
  2808         'select id, state, key_start, key_end from progress'
  1728     rows = cursor.fetchall()
  2813     rows = cursor.fetchall()
  1729 
  2814 
  1730     for row in rows:
  2815     for row in rows:
  1731       if row is None:
  2816       if row is None:
  1732         break
  2817         break
  1733 
  2818       progress_key, state, key_start, key_end = row
  1734       yield row
  2819 
       
  2820       yield progress_key, state, key_start, key_end
  1735 
  2821 
  1736     yield None, DATA_CONSUMED_TO_HERE, None, key_end
  2822     yield None, DATA_CONSUMED_TO_HERE, None, key_end
       
  2823 
       
  2824 
       
  2825 def ProgressDatabase(db_filename, signature):
       
  2826   """Returns a database to store upload progress information."""
       
  2827   return _ProgressDatabase(db_filename, 'INTEGER', int, signature)
       
  2828 
       
  2829 
       
  2830 class ExportProgressDatabase(_ProgressDatabase):
       
  2831   """A database to store download progress information."""
       
  2832 
       
  2833   def __init__(self, db_filename, signature):
       
  2834     """Initialize an ExportProgressDatabase."""
       
  2835     _ProgressDatabase.__init__(self,
       
  2836                                db_filename,
       
  2837                                'TEXT',
       
  2838                                db.Key,
       
  2839                                signature,
       
  2840                                commit_periodicity=1)
       
  2841 
       
  2842   def UseProgressData(self):
       
  2843     """Check if the progress database contains progress data.
       
  2844 
       
  2845     Returns:
       
  2846       True: if the database contains progress data.
       
  2847     """
       
  2848     return self.existing_table
  1737 
  2849 
  1738 
  2850 
  1739 class StubProgressDatabase(object):
  2851 class StubProgressDatabase(object):
  1740   """A stub implementation of ProgressDatabase which does nothing."""
  2852   """A stub implementation of ProgressDatabase which does nothing."""
  1741 
  2853 
  1742   def HasUnfinishedWork(self):
  2854   def UseProgressData(self):
  1743     """Whether the stub database has progress information (it doesn't)."""
  2855     """Whether the stub database has progress information (it doesn't)."""
  1744     return False
  2856     return False
  1745 
  2857 
  1746   def StoreKeys(self, unused_key_start, unused_key_end):
  2858   def StoreKeys(self, unused_key_start, unused_key_end):
  1747     """Pretend to store a key in the stub database."""
  2859     """Pretend to store a key in the stub database."""
  1754   def ThreadComplete(self):
  2866   def ThreadComplete(self):
  1755     """Finalize operations on the stub database (i.e. do nothing)."""
  2867     """Finalize operations on the stub database (i.e. do nothing)."""
  1756     pass
  2868     pass
  1757 
  2869 
  1758 
  2870 
  1759 class ProgressTrackerThread(_ThreadBase):
  2871 class _ProgressThreadBase(_ThreadBase):
  1760   """A thread which records progress information for the upload process.
  2872   """A thread which records progress information for the upload process.
  1761 
  2873 
  1762   The progress information is stored into the provided progress database.
  2874   The progress information is stored into the provided progress database.
  1763   This class is not responsible for replaying a prior run's progress
  2875   This class is not responsible for replaying a prior run's progress
  1764   information out of the database. Separate mechanisms must be used to
  2876   information out of the database. Separate mechanisms must be used to
  1777     """
  2889     """
  1778     _ThreadBase.__init__(self)
  2890     _ThreadBase.__init__(self)
  1779 
  2891 
  1780     self.progress_queue = progress_queue
  2892     self.progress_queue = progress_queue
  1781     self.db = progress_db
  2893     self.db = progress_db
  1782     self.entities_sent = 0
  2894     self.entities_transferred = 0
       
  2895 
       
  2896   def EntitiesTransferred(self):
       
  2897     """Return the total number of unique entities transferred."""
       
  2898     return self.entities_transferred
       
  2899 
       
  2900   def UpdateProgress(self, item):
       
  2901     """Updates the progress information for the given item.
       
  2902 
       
  2903     Args:
       
  2904       item: A work item whose new state will be recorded
       
  2905     """
       
  2906     raise NotImplementedError()
       
  2907 
       
  2908   def WorkFinished(self):
       
  2909     """Performs final actions after the entity transfer is complete."""
       
  2910     raise NotImplementedError()
  1783 
  2911 
  1784   def PerformWork(self):
  2912   def PerformWork(self):
  1785     """Performs the work of a ProgressTrackerThread."""
  2913     """Performs the work of a ProgressTrackerThread."""
  1786     while not self.exit_flag:
  2914     while not self.exit_flag:
  1787       try:
  2915       try:
  1793 
  2921 
  1794       if item.state == STATE_READ and item.progress_key is None:
  2922       if item.state == STATE_READ and item.progress_key is None:
  1795         item.progress_key = self.db.StoreKeys(item.key_start, item.key_end)
  2923         item.progress_key = self.db.StoreKeys(item.key_start, item.key_end)
  1796       else:
  2924       else:
  1797         assert item.progress_key is not None
  2925         assert item.progress_key is not None
  1798 
  2926         self.UpdateProgress(item)
  1799         self.db.UpdateState(item.progress_key, item.state)
       
  1800         if item.state == STATE_SENT:
       
  1801           self.entities_sent += item.count
       
  1802 
  2927 
  1803       item.progress_event.set()
  2928       item.progress_event.set()
  1804 
  2929 
  1805       self.progress_queue.task_done()
  2930       self.progress_queue.task_done()
  1806 
  2931 
  1807     self.db.ThreadComplete()
  2932     self.db.ThreadComplete()
  1808 
  2933 
       
  2934 
       
  2935 
       
  2936 class ProgressTrackerThread(_ProgressThreadBase):
       
  2937   """A thread which records progress information for the upload process.
       
  2938 
       
  2939   The progress information is stored into the provided progress database.
       
  2940   This class is not responsible for replaying a prior run's progress
       
  2941   information out of the database. Separate mechanisms must be used to
       
  2942   resume a prior upload attempt.
       
  2943   """
       
  2944   NAME = 'progress tracking thread'
       
  2945 
       
  2946   def __init__(self, progress_queue, progress_db):
       
  2947     """Initialize the ProgressTrackerThread instance.
       
  2948 
       
  2949     Args:
       
  2950       progress_queue: A Queue used for tracking progress information.
       
  2951       progress_db: The database for tracking progress information; should
       
  2952         be an instance of ProgressDatabase.
       
  2953     """
       
  2954     _ProgressThreadBase.__init__(self, progress_queue, progress_db)
       
  2955 
       
  2956   def UpdateProgress(self, item):
       
  2957     """Update the state of the given WorkItem.
       
  2958 
       
  2959     Args:
       
  2960       item: A WorkItem instance.
       
  2961     """
       
  2962     self.db.UpdateState(item.progress_key, item.state)
       
  2963     if item.state == STATE_SENT:
       
  2964       self.entities_transferred += item.count
       
  2965 
       
  2966   def WorkFinished(self):
       
  2967     """Performs final actions after the entity transfer is complete."""
       
  2968     pass
       
  2969 
       
  2970 
       
  2971 class ExportProgressThread(_ProgressThreadBase):
       
  2972   """A thread to record progress information and write record data for exports.
       
  2973 
       
  2974   The progress information is stored into a provided progress database.
       
  2975   Exported results are stored in the result database and dumped to an output
       
  2976   file at the end of the download.
       
  2977   """
       
  2978 
       
  2979   def __init__(self, kind, progress_queue, progress_db, result_db):
       
  2980     """Initialize the ExportProgressThread instance.
       
  2981 
       
  2982     Args:
       
  2983       kind: The kind of entities being stored in the database.
       
  2984       progress_queue: A Queue used for tracking progress information.
       
  2985       progress_db: The database for tracking progress information; should
       
  2986         be an instance of ProgressDatabase.
       
  2987       result_db: The database for holding exported entities; should be an
       
  2988         instance of ResultDatabase.
       
  2989     """
       
  2990     _ProgressThreadBase.__init__(self, progress_queue, progress_db)
       
  2991 
       
  2992     self.kind = kind
       
  2993     self.existing_count = result_db.existing_count
       
  2994     self.result_db = result_db
       
  2995 
       
  2996   def EntitiesTransferred(self):
       
  2997     """Return the total number of unique entities transferred."""
       
  2998     return self.result_db.count
       
  2999 
       
  3000   def WorkFinished(self):
       
  3001     """Write the contents of the result database."""
       
  3002     exporter = Exporter.RegisteredExporter(self.kind)
       
  3003     exporter.output_entities(self.result_db.AllEntities())
       
  3004 
       
  3005   def UpdateProgress(self, item):
       
  3006     """Update the state of the given KeyRange.
       
  3007 
       
  3008     Args:
       
  3009       item: A KeyRange instance.
       
  3010     """
       
  3011     if item.state == STATE_GOT:
       
  3012       count = self.result_db.StoreEntities(item.export_result.keys,
       
  3013                                            item.export_result.entities)
       
  3014       self.db.DeleteKey(item.progress_key)
       
  3015       self.entities_transferred += count
       
  3016     else:
       
  3017       self.db.UpdateState(item.progress_key, item.state)
       
  3018 
       
  3019 
       
  3020 def ParseKey(key_string):
       
  3021   """Turn a key stored in the database into a db.Key or None.
       
  3022 
       
  3023   Args:
       
  3024     key_string: The string representation of a db.Key.
       
  3025 
       
  3026   Returns:
       
  3027     A db.Key instance or None
       
  3028   """
       
  3029   if not key_string:
       
  3030     return None
       
  3031   if key_string == 'None':
       
  3032     return None
       
  3033   return db.Key(encoded=key_string)
  1809 
  3034 
  1810 
  3035 
  1811 def Validate(value, typ):
  3036 def Validate(value, typ):
  1812   """Checks that value is non-empty and of the right type.
  3037   """Checks that value is non-empty and of the right type.
  1813 
  3038 
  1814   Args:
  3039   Args:
  1815     value: any value
  3040     value: any value
  1816     typ: a type or tuple of types
  3041     typ: a type or tuple of types
  1817 
  3042 
  1818   Raises:
  3043   Raises:
  1819     ValueError if value is None or empty.
  3044     ValueError: if value is None or empty.
  1820     TypeError if it's not the given type.
  3045     TypeError: if it's not the given type.
  1821 
       
  1822   """
  3046   """
  1823   if not value:
  3047   if not value:
  1824     raise ValueError('Value should not be empty; received %s.' % value)
  3048     raise ValueError('Value should not be empty; received %s.' % value)
  1825   elif not isinstance(value, typ):
  3049   elif not isinstance(value, typ):
  1826     raise TypeError('Expected a %s, but received %s (a %s).' %
  3050     raise TypeError('Expected a %s, but received %s (a %s).' %
  1827                     (typ, value, value.__class__))
  3051                     (typ, value, value.__class__))
  1828 
  3052 
  1829 
  3053 
       
  3054 def CheckFile(filename):
       
  3055   """Check that the given file exists and can be opened for reading.
       
  3056 
       
  3057   Args:
       
  3058     filename: The name of the file.
       
  3059 
       
  3060   Raises:
       
  3061     FileNotFoundError: if the given filename is not found
       
  3062     FileNotReadableError: if the given filename is not readable.
       
  3063   """
       
  3064   if not os.path.exists(filename):
       
  3065     raise FileNotFoundError('%s: file not found' % filename)
       
  3066   elif not os.access(filename, os.R_OK):
       
  3067     raise FileNotReadableError('%s: file not readable' % filename)
       
  3068 
       
  3069 
  1830 class Loader(object):
  3070 class Loader(object):
  1831   """A base class for creating datastore entities from input data.
  3071   """A base class for creating datastore entities from input data.
  1832 
  3072 
  1833   To add a handler for bulk loading a new entity kind into your datastore,
  3073   To add a handler for bulk loading a new entity kind into your datastore,
  1834   write a subclass of this class that calls Loader.__init__ from your
  3074   write a subclass of this class that calls Loader.__init__ from your
  1835   class's __init__.
  3075   class's __init__.
  1836 
  3076 
  1837   If you need to run extra code to convert entities from the input
  3077   If you need to run extra code to convert entities from the input
  1838   data, create new properties, or otherwise modify the entities before
  3078   data, create new properties, or otherwise modify the entities before
  1839   they're inserted, override HandleEntity.
  3079   they're inserted, override handle_entity.
  1840 
  3080 
  1841   See the CreateEntity method for the creation of entities from the
  3081   See the create_entity method for the creation of entities from the
  1842   (parsed) input data.
  3082   (parsed) input data.
  1843   """
  3083   """
  1844 
  3084 
  1845   __loaders = {}
  3085   __loaders = {}
  1846   __kind = None
  3086   kind = None
  1847   __properties = None
  3087   __properties = None
  1848 
  3088 
  1849   def __init__(self, kind, properties):
  3089   def __init__(self, kind, properties):
  1850     """Constructor.
  3090     """Constructor.
  1851 
  3091 
  1856     Args:
  3096     Args:
  1857       kind: a string containing the entity kind that this loader handles
  3097       kind: a string containing the entity kind that this loader handles
  1858 
  3098 
  1859       properties: list of (name, converter) tuples.
  3099       properties: list of (name, converter) tuples.
  1860 
  3100 
  1861         This is used to automatically convert the CSV columns into
  3101         This is used to automatically convert the input columns into
  1862         properties.  The converter should be a function that takes one
  3102         properties.  The converter should be a function that takes one
  1863         argument, a string value from the CSV file, and returns a
  3103         argument, a string value from the input file, and returns a
  1864         correctly typed property value that should be inserted. The
  3104         correctly typed property value that should be inserted. The
  1865         tuples in this list should match the columns in your CSV file,
  3105         tuples in this list should match the columns in your input file,
  1866         in order.
  3106         in order.
  1867 
  3107 
  1868         For example:
  3108         For example:
  1869           [('name', str),
  3109           [('name', str),
  1870            ('id_number', int),
  3110            ('id_number', int),
  1872            ('user', users.User),
  3112            ('user', users.User),
  1873            ('birthdate', lambda x: datetime.datetime.fromtimestamp(float(x))),
  3113            ('birthdate', lambda x: datetime.datetime.fromtimestamp(float(x))),
  1874            ('description', datastore_types.Text),
  3114            ('description', datastore_types.Text),
  1875            ]
  3115            ]
  1876     """
  3116     """
  1877     Validate(kind, basestring)
  3117     Validate(kind, (basestring, tuple))
  1878     self.__kind = kind
  3118     self.kind = kind
  1879 
  3119     self.__openfile = open
  1880     db.class_for_kind(kind)
  3120     self.__create_csv_reader = csv.reader
       
  3121 
       
  3122     GetImplementationClass(kind)
  1881 
  3123 
  1882     Validate(properties, list)
  3124     Validate(properties, list)
  1883     for name, fn in properties:
  3125     for name, fn in properties:
  1884       Validate(name, basestring)
  3126       Validate(name, basestring)
  1885       assert callable(fn), (
  3127       assert callable(fn), (
  1888     self.__properties = properties
  3130     self.__properties = properties
  1889 
  3131 
  1890   @staticmethod
  3132   @staticmethod
  1891   def RegisterLoader(loader):
  3133   def RegisterLoader(loader):
  1892 
  3134 
  1893     Loader.__loaders[loader.__kind] = loader
  3135     Loader.__loaders[loader.kind] = loader
  1894 
  3136 
  1895   def kind(self):
  3137   def alias_old_names(self):
  1896     """ Return the entity kind that this Loader handes.
  3138     """Aliases method names so that Loaders defined with old names work."""
  1897     """
  3139     aliases = (
  1898     return self.__kind
  3140         ('CreateEntity', 'create_entity'),
  1899 
  3141         ('HandleEntity', 'handle_entity'),
  1900   def CreateEntity(self, values, key_name=None):
  3142         ('GenerateKey', 'generate_key'),
       
  3143         )
       
  3144     for old_name, new_name in aliases:
       
  3145       setattr(Loader, old_name, getattr(Loader, new_name))
       
  3146       if hasattr(self.__class__, old_name) and not (
       
  3147           getattr(self.__class__, old_name).im_func ==
       
  3148           getattr(Loader, new_name).im_func):
       
  3149         if hasattr(self.__class__, new_name) and not (
       
  3150             getattr(self.__class__, new_name).im_func ==
       
  3151             getattr(Loader, new_name).im_func):
       
  3152           raise NameClashError(old_name, new_name, self.__class__)
       
  3153         setattr(self, new_name, getattr(self, old_name))
       
  3154 
       
  3155   def create_entity(self, values, key_name=None, parent=None):
  1901     """Creates a entity from a list of property values.
  3156     """Creates a entity from a list of property values.
  1902 
  3157 
  1903     Args:
  3158     Args:
  1904       values: list/tuple of str
  3159       values: list/tuple of str
  1905       key_name: if provided, the name for the (single) resulting entity
  3160       key_name: if provided, the name for the (single) resulting entity
       
  3161       parent: A db.Key instance for the parent, or None
  1906 
  3162 
  1907     Returns:
  3163     Returns:
  1908       list of db.Model
  3164       list of db.Model
  1909 
  3165 
  1910       The returned entities are populated with the property values from the
  3166       The returned entities are populated with the property values from the
  1911       argument, converted to native types using the properties map given in
  3167       argument, converted to native types using the properties map given in
  1912       the constructor, and passed through HandleEntity. They're ready to be
  3168       the constructor, and passed through handle_entity. They're ready to be
  1913       inserted.
  3169       inserted.
  1914 
  3170 
  1915     Raises:
  3171     Raises:
  1916       AssertionError if the number of values doesn't match the number
  3172       AssertionError: if the number of values doesn't match the number
  1917         of properties in the properties map.
  3173         of properties in the properties map.
  1918       ValueError if any element of values is None or empty.
  3174       ValueError: if any element of values is None or empty.
  1919       TypeError if values is not a list or tuple.
  3175       TypeError: if values is not a list or tuple.
  1920     """
  3176     """
  1921     Validate(values, (list, tuple))
  3177     Validate(values, (list, tuple))
  1922     assert len(values) == len(self.__properties), (
  3178     assert len(values) == len(self.__properties), (
  1923       'Expected %d CSV columns, found %d.' %
  3179         'Expected %d columns, found %d.' %
  1924       (len(self.__properties), len(values)))
  3180         (len(self.__properties), len(values)))
  1925 
  3181 
  1926     model_class = db.class_for_kind(self.__kind)
  3182     model_class = GetImplementationClass(self.kind)
  1927 
  3183 
  1928     properties = {'key_name': key_name}
  3184     properties = {
       
  3185         'key_name': key_name,
       
  3186         'parent': parent,
       
  3187         }
  1929     for (name, converter), val in zip(self.__properties, values):
  3188     for (name, converter), val in zip(self.__properties, values):
  1930       if converter is bool and val.lower() in ('0', 'false', 'no'):
  3189       if converter is bool and val.lower() in ('0', 'false', 'no'):
  1931           val = False
  3190         val = False
  1932       properties[name] = converter(val)
  3191       properties[name] = converter(val)
  1933 
  3192 
  1934     entity = model_class(**properties)
  3193     entity = model_class(**properties)
  1935     entities = self.HandleEntity(entity)
  3194     entities = self.handle_entity(entity)
  1936 
  3195 
  1937     if entities:
  3196     if entities:
  1938       if not isinstance(entities, (list, tuple)):
  3197       if not isinstance(entities, (list, tuple)):
  1939         entities = [entities]
  3198         entities = [entities]
  1940 
  3199 
  1943           raise TypeError('Expected a db.Model, received %s (a %s).' %
  3202           raise TypeError('Expected a db.Model, received %s (a %s).' %
  1944                           (entity, entity.__class__))
  3203                           (entity, entity.__class__))
  1945 
  3204 
  1946     return entities
  3205     return entities
  1947 
  3206 
  1948   def GenerateKey(self, i, values):
  3207   def generate_key(self, i, values):
  1949     """Generates a key_name to be used in creating the underlying object.
  3208     """Generates a key_name to be used in creating the underlying object.
  1950 
  3209 
  1951     The default implementation returns None.
  3210     The default implementation returns None.
  1952 
  3211 
  1953     This method can be overridden to control the key generation for
  3212     This method can be overridden to control the key generation for
  1954     uploaded entities. The value returned should be None (to use a
  3213     uploaded entities. The value returned should be None (to use a
  1955     server generated numeric key), or a string which neither starts
  3214     server generated numeric key), or a string which neither starts
  1956     with a digit nor has the form __*__. (See
  3215     with a digit nor has the form __*__ (see
  1957     http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html)
  3216     http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html),
       
  3217     or a db.Key instance.
  1958 
  3218 
  1959     If you generate your own string keys, keep in mind:
  3219     If you generate your own string keys, keep in mind:
  1960 
  3220 
  1961     1. The key name for each entity must be unique.
  3221     1. The key name for each entity must be unique.
  1962     2. If an entity of the same kind and key already exists in the
  3222     2. If an entity of the same kind and key already exists in the
  1970     Returns:
  3230     Returns:
  1971       A string to be used as the key_name for an entity.
  3231       A string to be used as the key_name for an entity.
  1972     """
  3232     """
  1973     return None
  3233     return None
  1974 
  3234 
  1975   def HandleEntity(self, entity):
  3235   def handle_entity(self, entity):
  1976     """Subclasses can override this to add custom entity conversion code.
  3236     """Subclasses can override this to add custom entity conversion code.
  1977 
  3237 
  1978     This is called for each entity, after its properties are populated from
  3238     This is called for each entity, after its properties are populated
  1979     CSV but before it is stored. Subclasses can override this to add custom
  3239     from the input but before it is stored. Subclasses can override
  1980     entity handling code.
  3240     this to add custom entity handling code.
  1981 
  3241 
  1982     The entity to be inserted should be returned. If multiple entities should
  3242     The entity to be inserted should be returned. If multiple entities
  1983     be inserted, return a list of entities. If no entities should be inserted,
  3243     should be inserted, return a list of entities. If no entities
  1984     return None or [].
  3244     should be inserted, return None or [].
  1985 
  3245 
  1986     Args:
  3246     Args:
  1987       entity: db.Model
  3247       entity: db.Model
  1988 
  3248 
  1989     Returns:
  3249     Returns:
  1990       db.Model or list of db.Model
  3250       db.Model or list of db.Model
  1991     """
  3251     """
  1992     return entity
  3252     return entity
  1993 
  3253 
       
  3254   def initialize(self, filename, loader_opts):
       
  3255     """Performs initialization and validation of the input file.
       
  3256 
       
  3257     This implementation checks that the input file exists and can be
       
  3258     opened for reading.
       
  3259 
       
  3260     Args:
       
  3261       filename: The string given as the --filename flag argument.
       
  3262       loader_opts: The string given as the --loader_opts flag argument.
       
  3263     """
       
  3264     CheckFile(filename)
       
  3265 
       
  3266   def finalize(self):
       
  3267     """Performs finalization actions after the upload completes."""
       
  3268     pass
       
  3269 
       
  3270   def generate_records(self, filename):
       
  3271     """Subclasses can override this to add custom data input code.
       
  3272 
       
  3273     This method must yield fixed-length lists of strings.
       
  3274 
       
  3275     The default implementation uses csv.reader to read CSV rows
       
  3276     from filename.
       
  3277 
       
  3278     Args:
       
  3279       filename: The string input for the --filename option.
       
  3280 
       
  3281     Yields:
       
  3282       Lists of strings.
       
  3283     """
       
  3284     csv_generator = CSVGenerator(filename, openfile=self.__openfile,
       
  3285                                  create_csv_reader=self.__create_csv_reader
       
  3286                                 ).Records()
       
  3287     return csv_generator
  1994 
  3288 
  1995   @staticmethod
  3289   @staticmethod
  1996   def RegisteredLoaders():
  3290   def RegisteredLoaders():
  1997     """Returns a list of the Loader instances that have been created.
  3291     """Returns a dict of the Loader instances that have been created."""
  1998     """
       
  1999     return dict(Loader.__loaders)
  3292     return dict(Loader.__loaders)
       
  3293 
       
  3294   @staticmethod
       
  3295   def RegisteredLoader(kind):
       
  3296     """Returns the loader instance for the given kind if it exists."""
       
  3297     return Loader.__loaders[kind]
       
  3298 
       
  3299 
       
  3300 class Exporter(object):
       
  3301   """A base class for serializing datastore entities.
       
  3302 
       
  3303   To add a handler for exporting an entity kind from your datastore,
       
  3304   write a subclass of this class that calls Exporter.__init__ from your
       
  3305   class's __init__.
       
  3306 
       
  3307   If you need to run extra code to convert entities from the input
       
  3308   data, create new properties, or otherwise modify the entities before
       
  3309   they're inserted, override handle_entity.
       
  3310 
       
  3311   See the output_entities method for the writing of data from entities.
       
  3312   """
       
  3313 
       
  3314   __exporters = {}
       
  3315   kind = None
       
  3316   __properties = None
       
  3317 
       
  3318   def __init__(self, kind, properties):
       
  3319     """Constructor.
       
  3320 
       
  3321     Populates this Exporters's kind and properties map. Also registers
       
  3322     it so that all you need to do is instantiate your Exporter, and
       
  3323     the bulkload handler will automatically use it.
       
  3324 
       
  3325     Args:
       
  3326       kind: a string containing the entity kind that this exporter handles
       
  3327 
       
  3328       properties: list of (name, converter, default) tuples.
       
  3329 
       
  3330       This is used to automatically convert the entities to strings.
       
  3331       The converter should be a function that takes one argument, a property
       
  3332       value of the appropriate type, and returns a str or unicode.  The default
       
  3333       is a string to be used if the property is not present, or None to fail
       
  3334       with an error if the property is missing.
       
  3335 
       
  3336       For example:
       
  3337         [('name', str, None),
       
  3338          ('id_number', str, None),
       
  3339          ('email', str, ''),
       
  3340          ('user', str, None),
       
  3341          ('birthdate',
       
  3342           lambda x: str(datetime.datetime.fromtimestamp(float(x))),
       
  3343           None),
       
  3344          ('description', str, ''),
       
  3345          ]
       
  3346     """
       
  3347     Validate(kind, basestring)
       
  3348     self.kind = kind
       
  3349 
       
  3350     GetImplementationClass(kind)
       
  3351 
       
  3352     Validate(properties, list)
       
  3353     for name, fn, default in properties:
       
  3354       Validate(name, basestring)
       
  3355       assert callable(fn), (
       
  3356           'Conversion function %s for property %s is not callable.' % (
       
  3357               fn, name))
       
  3358       if default:
       
  3359         Validate(default, basestring)
       
  3360 
       
  3361     self.__properties = properties
       
  3362 
       
  3363   @staticmethod
       
  3364   def RegisterExporter(exporter):
       
  3365 
       
  3366     Exporter.__exporters[exporter.kind] = exporter
       
  3367 
       
  3368   def __ExtractProperties(self, entity):
       
  3369     """Converts an entity into a list of string values.
       
  3370 
       
  3371     Args:
       
  3372       entity: An entity to extract the properties from.
       
  3373 
       
  3374     Returns:
       
  3375       A list of the properties of the entity.
       
  3376 
       
  3377     Raises:
       
  3378       MissingPropertyError: if an expected field on the entity is missing.
       
  3379     """
       
  3380     encoding = []
       
  3381     for name, fn, default in self.__properties:
       
  3382       try:
       
  3383         encoding.append(fn(getattr(entity, name)))
       
  3384       except AttributeError:
       
  3385         if default is None:
       
  3386           raise MissingPropertyError(name)
       
  3387         else:
       
  3388           encoding.append(default)
       
  3389     return encoding
       
  3390 
       
  3391   def __EncodeEntity(self, entity):
       
  3392     """Convert the given entity into CSV string.
       
  3393 
       
  3394     Args:
       
  3395       entity: The entity to encode.
       
  3396 
       
  3397     Returns:
       
  3398       A CSV string.
       
  3399     """
       
  3400     output = StringIO.StringIO()
       
  3401     writer = csv.writer(output, lineterminator='')
       
  3402     writer.writerow(self.__ExtractProperties(entity))
       
  3403     return output.getvalue()
       
  3404 
       
  3405   def __SerializeEntity(self, entity):
       
  3406     """Creates a string representation of an entity.
       
  3407 
       
  3408     Args:
       
  3409       entity: The entity to serialize.
       
  3410 
       
  3411     Returns:
       
  3412       A serialized representation of an entity.
       
  3413     """
       
  3414     encoding = self.__EncodeEntity(entity)
       
  3415     if not isinstance(encoding, unicode):
       
  3416       encoding = unicode(encoding, 'utf-8')
       
  3417     encoding = encoding.encode('utf-8')
       
  3418     return encoding
       
  3419 
       
  3420   def output_entities(self, entity_generator):
       
  3421     """Outputs the downloaded entities.
       
  3422 
       
  3423     This implementation writes CSV.
       
  3424 
       
  3425     Args:
       
  3426       entity_generator: A generator that yields the downloaded entities
       
  3427         in key order.
       
  3428     """
       
  3429     CheckOutputFile(self.output_filename)
       
  3430     output_file = open(self.output_filename, 'w')
       
  3431     logger.debug('Export complete, writing to file')
       
  3432     output_file.writelines(self.__SerializeEntity(entity) + '\n'
       
  3433                            for entity in entity_generator)
       
  3434 
       
  3435   def initialize(self, filename, exporter_opts):
       
  3436     """Performs initialization and validation of the output file.
       
  3437 
       
  3438     This implementation checks that the input file exists and can be
       
  3439     opened for writing.
       
  3440 
       
  3441     Args:
       
  3442       filename: The string given as the --filename flag argument.
       
  3443       exporter_opts: The string given as the --exporter_opts flag argument.
       
  3444     """
       
  3445     CheckOutputFile(filename)
       
  3446     self.output_filename = filename
       
  3447 
       
  3448   def finalize(self):
       
  3449     """Performs finalization actions after the download completes."""
       
  3450     pass
       
  3451 
       
  3452   @staticmethod
       
  3453   def RegisteredExporters():
       
  3454     """Returns a dictionary of the exporter instances that have been created."""
       
  3455     return dict(Exporter.__exporters)
       
  3456 
       
  3457   @staticmethod
       
  3458   def RegisteredExporter(kind):
       
  3459     """Returns an exporter instance for the given kind if it exists."""
       
  3460     return Exporter.__exporters[kind]
  2000 
  3461 
  2001 
  3462 
  2002 class QueueJoinThread(threading.Thread):
  3463 class QueueJoinThread(threading.Thread):
  2003   """A thread that joins a queue and exits.
  3464   """A thread that joins a queue and exits.
  2004 
  3465 
  2043   while True:
  3504   while True:
  2044     thread.join(timeout=.5)
  3505     thread.join(timeout=.5)
  2045     if not thread.isAlive():
  3506     if not thread.isAlive():
  2046       return True
  3507       return True
  2047     if thread_local.shut_down:
  3508     if thread_local.shut_down:
  2048       logging.debug('Queue join interrupted')
  3509       logger.debug('Queue join interrupted')
  2049       return False
  3510       return False
  2050     for worker_thread in thread_gate.Threads():
  3511     for worker_thread in thread_gate.Threads():
  2051       if not worker_thread.isAlive():
  3512       if not worker_thread.isAlive():
  2052         return False
  3513         return False
  2053 
  3514 
  2058   Args:
  3519   Args:
  2059     data_source_thread: A running DataSourceThread instance.
  3520     data_source_thread: A running DataSourceThread instance.
  2060     work_queue: The work queue.
  3521     work_queue: The work queue.
  2061     thread_gate: A ThreadGate instance with workers registered.
  3522     thread_gate: A ThreadGate instance with workers registered.
  2062   """
  3523   """
  2063   logging.info('An error occurred. Shutting down...')
  3524   logger.info('An error occurred. Shutting down...')
  2064 
  3525 
  2065   data_source_thread.exit_flag = True
  3526   data_source_thread.exit_flag = True
  2066 
  3527 
  2067   for thread in thread_gate.Threads():
  3528   for thread in thread_gate.Threads():
  2068     thread.exit_flag = True
  3529     thread.exit_flag = True
  2070   for unused_thread in thread_gate.Threads():
  3531   for unused_thread in thread_gate.Threads():
  2071     thread_gate.EnableThread()
  3532     thread_gate.EnableThread()
  2072 
  3533 
  2073   data_source_thread.join(timeout=3.0)
  3534   data_source_thread.join(timeout=3.0)
  2074   if data_source_thread.isAlive():
  3535   if data_source_thread.isAlive():
  2075     logging.warn('%s hung while trying to exit',
  3536     logger.warn('%s hung while trying to exit',
  2076                  data_source_thread.GetFriendlyName())
  3537                 data_source_thread.GetFriendlyName())
  2077 
  3538 
  2078   while not work_queue.empty():
  3539   while not work_queue.empty():
  2079     try:
  3540     try:
  2080       unused_item = work_queue.get_nowait()
  3541       unused_item = work_queue.get_nowait()
  2081       work_queue.task_done()
  3542       work_queue.task_done()
  2082     except Queue.Empty:
  3543     except Queue.Empty:
  2083       pass
  3544       pass
  2084 
  3545 
  2085 
  3546 
  2086 def PerformBulkUpload(app_id,
  3547 class BulkTransporterApp(object):
  2087                       post_url,
  3548   """Class to wrap bulk transport application functionality."""
  2088                       kind,
  3549 
  2089                       workitem_generator_factory,
  3550   def __init__(self,
  2090                       num_threads,
  3551                arg_dict,
  2091                       throttle,
  3552                input_generator_factory,
  2092                       progress_db,
  3553                throttle,
  2093                       max_queue_size=DEFAULT_QUEUE_SIZE,
  3554                progress_db,
  2094                       request_manager_factory=RequestManager,
  3555                workerthread_factory,
  2095                       bulkloaderthread_factory=BulkLoaderThread,
  3556                progresstrackerthread_factory,
  2096                       progresstrackerthread_factory=ProgressTrackerThread,
  3557                max_queue_size=DEFAULT_QUEUE_SIZE,
  2097                       datasourcethread_factory=DataSourceThread,
  3558                request_manager_factory=RequestManager,
  2098                       work_queue_factory=ReQueue,
  3559                datasourcethread_factory=DataSourceThread,
  2099                       progress_queue_factory=Queue.Queue):
  3560                work_queue_factory=ReQueue,
  2100   """Uploads data into an application using a series of HTTP POSTs.
  3561                progress_queue_factory=Queue.Queue):
  2101 
  3562     """Instantiate a BulkTransporterApp.
  2102   This function will spin up a number of threads to read entities from
  3563 
  2103   the data source, pass those to a number of worker ("uploader") threads
  3564     Uploads or downloads data to or from application using HTTP requests.
  2104   for sending to the application, and track all of the progress in a
  3565     When run, the class will spin up a number of threads to read entities
  2105   small database in case an error or pause/termination requires a
  3566     from the data source, pass those to a number of worker threads
  2106   restart/resumption of the upload process.
  3567     for sending to the application, and track all of the progress in a
  2107 
  3568     small database in case an error or pause/termination requires a
  2108   Args:
  3569     restart/resumption of the upload process.
  2109     app_id: String containing application id.
  3570 
  2110     post_url: URL to post the Entity data to.
  3571     Args:
  2111     kind: Kind of the Entity records being posted.
  3572       arg_dict: Dictionary of command line options.
  2112     workitem_generator_factory: A factory that creates a WorkItem generator.
  3573       input_generator_factory: A factory that creates a WorkItem generator.
  2113     num_threads: How many uploader threads should be created.
  3574       throttle: A Throttle instance.
  2114     throttle: A Throttle instance.
  3575       progress_db: The database to use for replaying/recording progress.
  2115     progress_db: The database to use for replaying/recording progress.
  3576       workerthread_factory: A factory for worker threads.
  2116     max_queue_size: Maximum size of the queues before they should block.
  3577       progresstrackerthread_factory: Used for dependency injection.
  2117     request_manager_factory: Used for dependency injection.
  3578       max_queue_size: Maximum size of the queues before they should block.
  2118     bulkloaderthread_factory: Used for dependency injection.
  3579       request_manager_factory: Used for dependency injection.
  2119     progresstrackerthread_factory: Used for dependency injection.
  3580       datasourcethread_factory: Used for dependency injection.
  2120     datasourcethread_factory: Used for dependency injection.
  3581       work_queue_factory: Used for dependency injection.
  2121     work_queue_factory: Used for dependency injection.
  3582       progress_queue_factory: Used for dependency injection.
  2122     progress_queue_factory: Used for dependency injection.
  3583     """
  2123 
  3584     self.app_id = arg_dict['app_id']
  2124   Raises:
  3585     self.post_url = arg_dict['url']
  2125     AuthenticationError: If authentication is required and fails.
  3586     self.kind = arg_dict['kind']
  2126   """
  3587     self.batch_size = arg_dict['batch_size']
  2127   thread_gate = ThreadGate(True)
  3588     self.input_generator_factory = input_generator_factory
  2128 
  3589     self.num_threads = arg_dict['num_threads']
  2129   (unused_scheme,
  3590     self.email = arg_dict['email']
  2130    host_port, url_path,
  3591     self.passin = arg_dict['passin']
  2131    unused_query, unused_fragment) = urlparse.urlsplit(post_url)
  3592     self.throttle = throttle
  2132 
  3593     self.progress_db = progress_db
  2133   work_queue = work_queue_factory(max_queue_size)
  3594     self.workerthread_factory = workerthread_factory
  2134   progress_queue = progress_queue_factory(max_queue_size)
  3595     self.progresstrackerthread_factory = progresstrackerthread_factory
  2135   request_manager = request_manager_factory(app_id,
  3596     self.max_queue_size = max_queue_size
  2136                                             host_port,
  3597     self.request_manager_factory = request_manager_factory
  2137                                             url_path,
  3598     self.datasourcethread_factory = datasourcethread_factory
  2138                                             kind,
  3599     self.work_queue_factory = work_queue_factory
  2139                                             throttle)
  3600     self.progress_queue_factory = progress_queue_factory
  2140 
  3601     (scheme,
  2141   throttle.Register(threading.currentThread())
  3602      self.host_port, self.url_path,
  2142   try:
  3603      unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
  2143     request_manager.Authenticate()
  3604     self.secure = (scheme == 'https')
  2144   except Exception, e:
  3605 
  2145     logging.exception(e)
  3606   def Run(self):
  2146     raise AuthenticationError('Authentication failed')
  3607     """Perform the work of the BulkTransporterApp.
  2147   if (request_manager.credentials is not None and
  3608 
  2148       not request_manager.authenticated):
  3609     Raises:
  2149     raise AuthenticationError('Authentication failed')
  3610       AuthenticationError: If authentication is required and fails.
  2150 
  3611 
  2151   for unused_idx in range(num_threads):
  3612     Returns:
  2152     thread = bulkloaderthread_factory(work_queue,
  3613       Error code suitable for sys.exit, e.g. 0 on success, 1 on failure.
  2153                                       throttle,
  3614     """
  2154                                       thread_gate,
  3615     thread_gate = ThreadGate(True)
  2155                                       request_manager)
  3616 
  2156     throttle.Register(thread)
  3617     self.throttle.Register(threading.currentThread())
  2157     thread_gate.Register(thread)
  3618     threading.currentThread().exit_flag = False
  2158 
  3619 
  2159   progress_thread = progresstrackerthread_factory(progress_queue, progress_db)
  3620     work_queue = self.work_queue_factory(self.max_queue_size)
  2160 
  3621 
  2161   if progress_db.HasUnfinishedWork():
  3622     progress_queue = self.progress_queue_factory(self.max_queue_size)
  2162     logging.debug('Restarting upload using progress database')
  3623     request_manager = self.request_manager_factory(self.app_id,
  2163     progress_generator_factory = progress_db.GetProgressStatusGenerator
  3624                                                    self.host_port,
  2164   else:
  3625                                                    self.url_path,
  2165     progress_generator_factory = None
  3626                                                    self.kind,
  2166 
  3627                                                    self.throttle,
  2167   data_source_thread = datasourcethread_factory(work_queue,
  3628                                                    self.batch_size,
  2168                                                 progress_queue,
  3629                                                    self.secure,
  2169                                                 workitem_generator_factory,
  3630                                                    self.email,
  2170                                                 progress_generator_factory)
  3631                                                    self.passin)
  2171 
  3632     try:
  2172   thread_local = threading.local()
  3633       request_manager.Authenticate()
  2173   thread_local.shut_down = False
  3634     except Exception, e:
  2174 
  3635       if not isinstance(e, urllib2.HTTPError) or (
  2175   def Interrupt(unused_signum, unused_frame):
  3636           e.code != 302 and e.code != 401):
  2176     """Shutdown gracefully in response to a signal."""
  3637         logger.exception('Exception during authentication')
  2177     thread_local.shut_down = True
  3638       raise AuthenticationError()
  2178 
  3639     if (request_manager.auth_called and
  2179   signal.signal(signal.SIGINT, Interrupt)
  3640         not request_manager.authenticated):
  2180 
  3641       raise AuthenticationError('Authentication failed')
  2181   progress_thread.start()
  3642 
  2182   data_source_thread.start()
  3643     for unused_idx in xrange(self.num_threads):
  2183   for thread in thread_gate.Threads():
  3644       thread = self.workerthread_factory(work_queue,
  2184     thread.start()
  3645                                          self.throttle,
  2185 
  3646                                          thread_gate,
  2186 
  3647                                          request_manager,
  2187   while not thread_local.shut_down:
  3648                                          self.num_threads,
  2188     data_source_thread.join(timeout=0.25)
  3649                                          self.batch_size)
  2189 
  3650       self.throttle.Register(thread)
  2190     if data_source_thread.isAlive():
  3651       thread_gate.Register(thread)
  2191       for thread in list(thread_gate.Threads()) + [progress_thread]:
  3652 
  2192         if not thread.isAlive():
  3653     self.progress_thread = self.progresstrackerthread_factory(
  2193           logging.info('Unexpected thread death: %s', thread.getName())
  3654         progress_queue, self.progress_db)
  2194           thread_local.shut_down = True
  3655 
  2195           break
  3656     if self.progress_db.UseProgressData():
       
  3657       logger.debug('Restarting upload using progress database')
       
  3658       progress_generator_factory = self.progress_db.GetProgressStatusGenerator
  2196     else:
  3659     else:
  2197       break
  3660       progress_generator_factory = None
  2198 
  3661 
  2199   if thread_local.shut_down:
  3662     self.data_source_thread = (
  2200     ShutdownThreads(data_source_thread, work_queue, thread_gate)
  3663         self.datasourcethread_factory(work_queue,
  2201 
  3664                                       progress_queue,
  2202   def _Join(ob, msg):
  3665                                       self.input_generator_factory,
  2203     logging.debug('Waiting for %s...', msg)
  3666                                       progress_generator_factory))
  2204     if isinstance(ob, threading.Thread):
  3667 
  2205       ob.join(timeout=3.0)
  3668     thread_local = threading.local()
  2206       if ob.isAlive():
  3669     thread_local.shut_down = False
  2207         logging.debug('Joining %s failed', ob.GetFriendlyName())
  3670 
       
  3671     def Interrupt(unused_signum, unused_frame):
       
  3672       """Shutdown gracefully in response to a signal."""
       
  3673       thread_local.shut_down = True
       
  3674 
       
  3675     signal.signal(signal.SIGINT, Interrupt)
       
  3676 
       
  3677     self.progress_thread.start()
       
  3678     self.data_source_thread.start()
       
  3679     for thread in thread_gate.Threads():
       
  3680       thread.start()
       
  3681 
       
  3682 
       
  3683     while not thread_local.shut_down:
       
  3684       self.data_source_thread.join(timeout=0.25)
       
  3685 
       
  3686       if self.data_source_thread.isAlive():
       
  3687         for thread in list(thread_gate.Threads()) + [self.progress_thread]:
       
  3688           if not thread.isAlive():
       
  3689             logger.info('Unexpected thread death: %s', thread.getName())
       
  3690             thread_local.shut_down = True
       
  3691             break
  2208       else:
  3692       else:
  2209         logging.debug('... done.')
  3693         break
  2210     elif isinstance(ob, (Queue.Queue, ReQueue)):
  3694 
  2211       if not InterruptibleQueueJoin(ob, thread_local, thread_gate):
  3695     if thread_local.shut_down:
  2212         ShutdownThreads(data_source_thread, work_queue, thread_gate)
  3696       ShutdownThreads(self.data_source_thread, work_queue, thread_gate)
       
  3697 
       
  3698     def _Join(ob, msg):
       
  3699       logger.debug('Waiting for %s...', msg)
       
  3700       if isinstance(ob, threading.Thread):
       
  3701         ob.join(timeout=3.0)
       
  3702         if ob.isAlive():
       
  3703           logger.debug('Joining %s failed', ob.GetFriendlyName())
       
  3704         else:
       
  3705           logger.debug('... done.')
       
  3706       elif isinstance(ob, (Queue.Queue, ReQueue)):
       
  3707         if not InterruptibleQueueJoin(ob, thread_local, thread_gate):
       
  3708           ShutdownThreads(self.data_source_thread, work_queue, thread_gate)
       
  3709       else:
       
  3710         ob.join()
       
  3711         logger.debug('... done.')
       
  3712 
       
  3713     _Join(work_queue, 'work_queue to flush')
       
  3714 
       
  3715     for unused_thread in thread_gate.Threads():
       
  3716       work_queue.put(_THREAD_SHOULD_EXIT)
       
  3717 
       
  3718     for unused_thread in thread_gate.Threads():
       
  3719       thread_gate.EnableThread()
       
  3720 
       
  3721     for thread in thread_gate.Threads():
       
  3722       _Join(thread, 'thread [%s] to terminate' % thread.getName())
       
  3723 
       
  3724       thread.CheckError()
       
  3725 
       
  3726     if self.progress_thread.isAlive():
       
  3727       _Join(progress_queue, 'progress_queue to finish')
  2213     else:
  3728     else:
  2214       ob.join()
  3729       logger.warn('Progress thread exited prematurely')
  2215       logging.debug('... done.')
  3730 
  2216 
  3731     progress_queue.put(_THREAD_SHOULD_EXIT)
  2217   _Join(work_queue, 'work_queue to flush')
  3732     _Join(self.progress_thread, 'progress_thread to terminate')
  2218 
  3733     self.progress_thread.CheckError()
  2219   for unused_thread in thread_gate.Threads():
  3734     if not thread_local.shut_down:
  2220     work_queue.put(_THREAD_SHOULD_EXIT)
  3735       self.progress_thread.WorkFinished()
  2221 
  3736 
  2222   for unused_thread in thread_gate.Threads():
  3737     self.data_source_thread.CheckError()
  2223     thread_gate.EnableThread()
  3738 
  2224 
  3739     return self.ReportStatus()
  2225   for thread in thread_gate.Threads():
  3740 
  2226     _Join(thread, 'thread [%s] to terminate' % thread.getName())
  3741   def ReportStatus(self):
  2227 
  3742     """Display a message reporting the final status of the transfer."""
  2228     thread.CheckError()
  3743     raise NotImplementedError()
  2229 
  3744 
  2230   if progress_thread.isAlive():
  3745 
  2231     _Join(progress_queue, 'progress_queue to finish')
  3746 class BulkUploaderApp(BulkTransporterApp):
  2232   else:
  3747   """Class to encapsulate bulk uploader functionality."""
  2233     logging.warn('Progress thread exited prematurely')
  3748 
  2234 
  3749   def __init__(self, *args, **kwargs):
  2235   progress_queue.put(_THREAD_SHOULD_EXIT)
  3750     BulkTransporterApp.__init__(self, *args, **kwargs)
  2236   _Join(progress_thread, 'progress_thread to terminate')
  3751 
  2237   progress_thread.CheckError()
  3752   def ReportStatus(self):
  2238 
  3753     """Display a message reporting the final status of the transfer."""
  2239   data_source_thread.CheckError()
  3754     total_up, duration = self.throttle.TotalTransferred(BANDWIDTH_UP)
  2240 
  3755     s_total_up, unused_duration = self.throttle.TotalTransferred(
  2241   total_up, duration = throttle.TotalTransferred(BANDWIDTH_UP)
  3756         HTTPS_BANDWIDTH_UP)
  2242   s_total_up, unused_duration = throttle.TotalTransferred(HTTPS_BANDWIDTH_UP)
  3757     total_up += s_total_up
  2243   total_up += s_total_up
  3758     total = total_up
  2244   logging.info('%d entites read, %d previously transferred',
  3759     logger.info('%d entites total, %d previously transferred',
  2245                data_source_thread.read_count,
  3760                 self.data_source_thread.read_count,
  2246                data_source_thread.sent_count)
  3761                 self.data_source_thread.xfer_count)
  2247   logging.info('%d entities (%d bytes) transferred in %.1f seconds',
  3762     transfer_count = self.progress_thread.EntitiesTransferred()
  2248                progress_thread.entities_sent, total_up, duration)
  3763     logger.info('%d entities (%d bytes) transferred in %.1f seconds',
  2249   if (data_source_thread.read_all and
  3764                 transfer_count, total, duration)
  2250       progress_thread.entities_sent + data_source_thread.sent_count >=
  3765     if (self.data_source_thread.read_all and
  2251       data_source_thread.read_count):
  3766         transfer_count +
  2252     logging.info('All entities successfully uploaded')
  3767         self.data_source_thread.xfer_count >=
  2253   else:
  3768         self.data_source_thread.read_count):
  2254     logging.info('Some entities not successfully uploaded')
  3769       logger.info('All entities successfully transferred')
       
  3770       return 0
       
  3771     else:
       
  3772       logger.info('Some entities not successfully transferred')
       
  3773       return 1
       
  3774 
       
  3775 
       
  3776 class BulkDownloaderApp(BulkTransporterApp):
       
  3777   """Class to encapsulate bulk downloader functionality."""
       
  3778 
       
  3779   def __init__(self, *args, **kwargs):
       
  3780     BulkTransporterApp.__init__(self, *args, **kwargs)
       
  3781 
       
  3782   def ReportStatus(self):
       
  3783     """Display a message reporting the final status of the transfer."""
       
  3784     total_down, duration = self.throttle.TotalTransferred(BANDWIDTH_DOWN)
       
  3785     s_total_down, unused_duration = self.throttle.TotalTransferred(
       
  3786         HTTPS_BANDWIDTH_DOWN)
       
  3787     total_down += s_total_down
       
  3788     total = total_down
       
  3789     existing_count = self.progress_thread.existing_count
       
  3790     xfer_count = self.progress_thread.EntitiesTransferred()
       
  3791     logger.info('Have %d entities, %d previously transferred',
       
  3792                 xfer_count + existing_count, existing_count)
       
  3793     logger.info('%d entities (%d bytes) transferred in %.1f seconds',
       
  3794                 xfer_count, total, duration)
       
  3795     return 0
  2255 
  3796 
  2256 
  3797 
  2257 def PrintUsageExit(code):
  3798 def PrintUsageExit(code):
  2258   """Prints usage information and exits with a status code.
  3799   """Prints usage information and exits with a status code.
  2259 
  3800 
  2264   sys.stdout.flush()
  3805   sys.stdout.flush()
  2265   sys.stderr.flush()
  3806   sys.stderr.flush()
  2266   sys.exit(code)
  3807   sys.exit(code)
  2267 
  3808 
  2268 
  3809 
       
  3810 REQUIRED_OPTION = object()
       
  3811 
       
  3812 
       
  3813 FLAG_SPEC = ['debug',
       
  3814              'help',
       
  3815              'url=',
       
  3816              'filename=',
       
  3817              'batch_size=',
       
  3818              'kind=',
       
  3819              'num_threads=',
       
  3820              'bandwidth_limit=',
       
  3821              'rps_limit=',
       
  3822              'http_limit=',
       
  3823              'db_filename=',
       
  3824              'app_id=',
       
  3825              'config_file=',
       
  3826              'has_header',
       
  3827              'csv_has_header',
       
  3828              'auth_domain=',
       
  3829              'result_db_filename=',
       
  3830              'download',
       
  3831              'loader_opts=',
       
  3832              'exporter_opts=',
       
  3833              'log_file=',
       
  3834              'email=',
       
  3835              'passin',
       
  3836              ]
       
  3837 
       
  3838 
  2269 def ParseArguments(argv):
  3839 def ParseArguments(argv):
  2270   """Parses command-line arguments.
  3840   """Parses command-line arguments.
  2271 
  3841 
  2272   Prints out a help message if -h or --help is supplied.
  3842   Prints out a help message if -h or --help is supplied.
  2273 
  3843 
  2274   Args:
  3844   Args:
  2275     argv: List of command-line arguments.
  3845     argv: List of command-line arguments.
  2276 
  3846 
  2277   Returns:
  3847   Returns:
  2278     Tuple (url, filename, cookie, batch_size, kind) containing the values from
  3848     A dictionary containing the value of command-line options.
  2279     each corresponding command-line flag.
       
  2280   """
  3849   """
  2281   opts, unused_args = getopt.getopt(
  3850   opts, unused_args = getopt.getopt(
  2282       argv[1:],
  3851       argv[1:],
  2283       'h',
  3852       'h',
  2284       ['debug',
  3853       FLAG_SPEC)
  2285        'help',
  3854 
  2286        'url=',
  3855   arg_dict = {}
  2287        'filename=',
  3856 
  2288        'batch_size=',
  3857   arg_dict['url'] = REQUIRED_OPTION
  2289        'kind=',
  3858   arg_dict['filename'] = REQUIRED_OPTION
  2290        'num_threads=',
  3859   arg_dict['config_file'] = REQUIRED_OPTION
  2291        'bandwidth_limit=',
  3860   arg_dict['kind'] = REQUIRED_OPTION
  2292        'rps_limit=',
  3861 
  2293        'http_limit=',
  3862   arg_dict['batch_size'] = DEFAULT_BATCH_SIZE
  2294        'db_filename=',
  3863   arg_dict['num_threads'] = DEFAULT_THREAD_COUNT
  2295        'app_id=',
  3864   arg_dict['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT
  2296        'config_file=',
  3865   arg_dict['rps_limit'] = DEFAULT_RPS_LIMIT
  2297        'auth_domain=',
  3866   arg_dict['http_limit'] = DEFAULT_REQUEST_LIMIT
  2298       ])
  3867 
  2299 
  3868   arg_dict['db_filename'] = None
  2300   url = None
  3869   arg_dict['app_id'] = ''
  2301   filename = None
  3870   arg_dict['auth_domain'] = 'gmail.com'
  2302   batch_size = DEFAULT_BATCH_SIZE
  3871   arg_dict['has_header'] = False
  2303   kind = None
  3872   arg_dict['result_db_filename'] = None
  2304   num_threads = DEFAULT_THREAD_COUNT
  3873   arg_dict['download'] = False
  2305   bandwidth_limit = DEFAULT_BANDWIDTH_LIMIT
  3874   arg_dict['loader_opts'] = None
  2306   rps_limit = DEFAULT_RPS_LIMIT
  3875   arg_dict['exporter_opts'] = None
  2307   http_limit = DEFAULT_REQUEST_LIMIT
  3876   arg_dict['debug'] = False
  2308   db_filename = None
  3877   arg_dict['log_file'] = None
  2309   app_id = None
  3878   arg_dict['email'] = None
  2310   config_file = None
  3879   arg_dict['passin'] = False
  2311   auth_domain = 'gmail.com'
  3880 
       
  3881   def ExpandFilename(filename):
       
  3882     """Expand shell variables and ~usernames in filename."""
       
  3883     return os.path.expandvars(os.path.expanduser(filename))
  2312 
  3884 
  2313   for option, value in opts:
  3885   for option, value in opts:
  2314     if option == '--debug':
  3886     if option == '--debug':
  2315       logging.getLogger().setLevel(logging.DEBUG)
  3887       arg_dict['debug'] = True
  2316     elif option in ('-h', '--help'):
  3888     elif option in ('-h', '--help'):
  2317       PrintUsageExit(0)
  3889       PrintUsageExit(0)
  2318     elif option == '--url':
  3890     elif option == '--url':
  2319       url = value
  3891       arg_dict['url'] = value
  2320     elif option == '--filename':
  3892     elif option == '--filename':
  2321       filename = value
  3893       arg_dict['filename'] = ExpandFilename(value)
  2322     elif option == '--batch_size':
  3894     elif option == '--batch_size':
  2323       batch_size = int(value)
  3895       arg_dict['batch_size'] = int(value)
  2324     elif option == '--kind':
  3896     elif option == '--kind':
  2325       kind = value
  3897       arg_dict['kind'] = value
  2326     elif option == '--num_threads':
  3898     elif option == '--num_threads':
  2327       num_threads = int(value)
  3899       arg_dict['num_threads'] = int(value)
  2328     elif option == '--bandwidth_limit':
  3900     elif option == '--bandwidth_limit':
  2329       bandwidth_limit = int(value)
  3901       arg_dict['bandwidth_limit'] = int(value)
  2330     elif option == '--rps_limit':
  3902     elif option == '--rps_limit':
  2331       rps_limit = int(value)
  3903       arg_dict['rps_limit'] = int(value)
  2332     elif option == '--http_limit':
  3904     elif option == '--http_limit':
  2333       http_limit = int(value)
  3905       arg_dict['http_limit'] = int(value)
  2334     elif option == '--db_filename':
  3906     elif option == '--db_filename':
  2335       db_filename = value
  3907       arg_dict['db_filename'] = ExpandFilename(value)
  2336     elif option == '--app_id':
  3908     elif option == '--app_id':
  2337       app_id = value
  3909       arg_dict['app_id'] = value
  2338     elif option == '--config_file':
  3910     elif option == '--config_file':
  2339       config_file = value
  3911       arg_dict['config_file'] = ExpandFilename(value)
  2340     elif option == '--auth_domain':
  3912     elif option == '--auth_domain':
  2341       auth_domain = value
  3913       arg_dict['auth_domain'] = value
  2342 
  3914     elif option == '--has_header':
  2343   return ProcessArguments(app_id=app_id,
  3915       arg_dict['has_header'] = True
  2344                           url=url,
  3916     elif option == '--csv_has_header':
  2345                           filename=filename,
  3917       print >>sys.stderr, ('--csv_has_header is deprecated, please use '
  2346                           batch_size=batch_size,
  3918                            '--has_header.')
  2347                           kind=kind,
  3919       arg_dict['has_header'] = True
  2348                           num_threads=num_threads,
  3920     elif option == '--result_db_filename':
  2349                           bandwidth_limit=bandwidth_limit,
  3921       arg_dict['result_db_filename'] = ExpandFilename(value)
  2350                           rps_limit=rps_limit,
  3922     elif option == '--download':
  2351                           http_limit=http_limit,
  3923       arg_dict['download'] = True
  2352                           db_filename=db_filename,
  3924     elif option == '--loader_opts':
  2353                           config_file=config_file,
  3925       arg_dict['loader_opts'] = value
  2354                           auth_domain=auth_domain,
  3926     elif option == '--exporter_opts':
  2355                           die_fn=lambda: PrintUsageExit(1))
  3927       arg_dict['exporter_opts'] = value
       
  3928     elif option == '--log_file':
       
  3929       arg_dict['log_file'] = value
       
  3930     elif option == '--email':
       
  3931       arg_dict['email'] = value
       
  3932     elif option == '--passin':
       
  3933       arg_dict['passin'] = True
       
  3934 
       
  3935   return ProcessArguments(arg_dict, die_fn=lambda: PrintUsageExit(1))
  2356 
  3936 
  2357 
  3937 
  2358 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
  3938 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
       
  3939   """Return a dictionary indicating the throttle options."""
  2359   return {
  3940   return {
  2360       BANDWIDTH_UP: bandwidth_limit,
  3941       BANDWIDTH_UP: bandwidth_limit,
  2361       BANDWIDTH_DOWN: bandwidth_limit,
  3942       BANDWIDTH_DOWN: bandwidth_limit,
  2362       REQUESTS: http_limit,
  3943       REQUESTS: http_limit,
  2363       HTTPS_BANDWIDTH_UP: bandwidth_limit / 5,
  3944       HTTPS_BANDWIDTH_UP: bandwidth_limit / 5,
  2365       HTTPS_REQUESTS: http_limit / 5,
  3946       HTTPS_REQUESTS: http_limit / 5,
  2366       RECORDS: rps_limit,
  3947       RECORDS: rps_limit,
  2367   }
  3948   }
  2368 
  3949 
  2369 
  3950 
  2370 def LoadConfig(config_file):
  3951 def CheckOutputFile(filename):
  2371   """Loads a config file and registers any Loader classes present."""
  3952   """Check that the given file does not exist and can be opened for writing.
  2372   if config_file:
  3953 
  2373     global_dict = dict(globals())
  3954   Args:
  2374     execfile(config_file, global_dict)
  3955     filename: The name of the file.
  2375     for cls in Loader.__subclasses__():
  3956 
  2376       Loader.RegisterLoader(cls())
  3957   Raises:
  2377 
  3958     FileExistsError: if the given filename is not found
  2378 
  3959     FileNotWritableError: if the given filename is not readable.
  2379 def _MissingArgument(arg_name, die_fn):
  3960   """
  2380   """Print error message about missing argument and die."""
  3961   if os.path.exists(filename):
  2381   print >>sys.stderr, '%s argument required' % arg_name
  3962     raise FileExistsError('%s: output file exists' % filename)
  2382   die_fn()
  3963   elif not os.access(os.path.dirname(filename), os.W_OK):
  2383 
  3964     raise FileNotWritableError(
  2384 
  3965         '%s: not writable' % os.path.dirname(filename))
  2385 def ProcessArguments(app_id=None,
  3966 
  2386                      url=None,
  3967 
  2387                      filename=None,
  3968 def LoadConfig(config_file_name, exit_fn=sys.exit):
  2388                      batch_size=DEFAULT_BATCH_SIZE,
  3969   """Loads a config file and registers any Loader classes present.
  2389                      kind=None,
  3970 
  2390                      num_threads=DEFAULT_THREAD_COUNT,
  3971   Args:
  2391                      bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
  3972     config_file_name: The name of the configuration file.
  2392                      rps_limit=DEFAULT_RPS_LIMIT,
  3973     exit_fn: Used for dependency injection.
  2393                      http_limit=DEFAULT_REQUEST_LIMIT,
  3974   """
  2394                      db_filename=None,
  3975   if config_file_name:
  2395                      config_file=None,
  3976     config_file = open(config_file_name, 'r')
  2396                      auth_domain='gmail.com',
  3977     try:
       
  3978       bulkloader_config = imp.load_module(
       
  3979           'bulkloader_config', config_file, config_file_name,
       
  3980           ('', 'r', imp.PY_SOURCE))
       
  3981       sys.modules['bulkloader_config'] = bulkloader_config
       
  3982 
       
  3983       if hasattr(bulkloader_config, 'loaders'):
       
  3984         for cls in bulkloader_config.loaders:
       
  3985           Loader.RegisterLoader(cls())
       
  3986 
       
  3987       if hasattr(bulkloader_config, 'exporters'):
       
  3988         for cls in bulkloader_config.exporters:
       
  3989           Exporter.RegisterExporter(cls())
       
  3990     except NameError, e:
       
  3991       m = re.search(r"[^']*'([^']*)'.*", str(e))
       
  3992       if m.groups() and m.group(1) == 'Loader':
       
  3993         print >>sys.stderr, """
       
  3994 The config file format has changed and you appear to be using an old-style
       
  3995 config file.  Please make the following changes:
       
  3996 
       
  3997 1. At the top of the file, add this:
       
  3998 
       
  3999 from google.appengine.tools import bulkloader.Loader
       
  4000 
       
  4001 2. For each of your Loader subclasses add the following at the end of the
       
  4002    __init__ definitioion:
       
  4003 
       
  4004 self.alias_old_names()
       
  4005 
       
  4006 3. At the bottom of the file, add this:
       
  4007 
       
  4008 loaders = [MyLoader1,...,MyLoaderN]
       
  4009 
       
  4010 Where MyLoader1,...,MyLoaderN are the Loader subclasses you want the bulkloader
       
  4011 to have access to.
       
  4012 """
       
  4013         exit_fn(1)
       
  4014       else:
       
  4015         raise
       
  4016     except Exception, e:
       
  4017       if isinstance(e, NameClashError) or 'bulkloader_config' in vars() and (
       
  4018           hasattr(bulkloader_config, 'bulkloader') and
       
  4019           isinstance(e, bulkloader_config.bulkloader.NameClashError)):
       
  4020         print >> sys.stderr, (
       
  4021             'Found both %s and %s while aliasing old names on %s.'%
       
  4022             (e.old_name, e.new_name, e.klass))
       
  4023         exit_fn(1)
       
  4024       else:
       
  4025         raise
       
  4026 
       
  4027 def GetArgument(kwargs, name, die_fn):
       
  4028   """Get the value of the key name in kwargs, or die with die_fn.
       
  4029 
       
  4030   Args:
       
  4031     kwargs: A dictionary containing the options for the bulkloader.
       
  4032     name: The name of a bulkloader option.
       
  4033     die_fn: The function to call to exit the program.
       
  4034 
       
  4035   Returns:
       
  4036     The value of kwargs[name] is name in kwargs
       
  4037   """
       
  4038   if name in kwargs:
       
  4039     return kwargs[name]
       
  4040   else:
       
  4041     print >>sys.stderr, '%s argument required' % name
       
  4042     die_fn()
       
  4043 
       
  4044 
       
  4045 def _MakeSignature(app_id=None,
       
  4046                    url=None,
       
  4047                    kind=None,
       
  4048                    db_filename=None,
       
  4049                    download=None,
       
  4050                    has_header=None,
       
  4051                    result_db_filename=None):
       
  4052   """Returns a string that identifies the important options for the database."""
       
  4053   if download:
       
  4054     result_db_line = 'result_db: %s' % result_db_filename
       
  4055   else:
       
  4056     result_db_line = ''
       
  4057   return u"""
       
  4058   app_id: %s
       
  4059   url: %s
       
  4060   kind: %s
       
  4061   download: %s
       
  4062   progress_db: %s
       
  4063   has_header: %s
       
  4064   %s
       
  4065   """ % (app_id, url, kind, download, db_filename, has_header, result_db_line)
       
  4066 
       
  4067 
       
  4068 def ProcessArguments(arg_dict,
  2397                      die_fn=lambda: sys.exit(1)):
  4069                      die_fn=lambda: sys.exit(1)):
  2398   """Processes non command-line input arguments."""
  4070   """Processes non command-line input arguments.
       
  4071 
       
  4072   Args:
       
  4073     arg_dict: Dictionary containing the values of bulkloader options.
       
  4074     die_fn: Function to call in case of an error during argument processing.
       
  4075 
       
  4076   Returns:
       
  4077     A dictionary of bulkloader options.
       
  4078   """
       
  4079   app_id = GetArgument(arg_dict, 'app_id', die_fn)
       
  4080   url = GetArgument(arg_dict, 'url', die_fn)
       
  4081   filename = GetArgument(arg_dict, 'filename', die_fn)
       
  4082   batch_size = GetArgument(arg_dict, 'batch_size', die_fn)
       
  4083   kind = GetArgument(arg_dict, 'kind', die_fn)
       
  4084   db_filename = GetArgument(arg_dict, 'db_filename', die_fn)
       
  4085   config_file = GetArgument(arg_dict, 'config_file', die_fn)
       
  4086   result_db_filename = GetArgument(arg_dict, 'result_db_filename', die_fn)
       
  4087   download = GetArgument(arg_dict, 'download', die_fn)
       
  4088   log_file = GetArgument(arg_dict, 'log_file', die_fn)
       
  4089 
       
  4090   unused_passin = GetArgument(arg_dict, 'passin', die_fn)
       
  4091   unused_email = GetArgument(arg_dict, 'email', die_fn)
       
  4092   unused_debug = GetArgument(arg_dict, 'debug', die_fn)
       
  4093   unused_num_threads = GetArgument(arg_dict, 'num_threads', die_fn)
       
  4094   unused_bandwidth_limit = GetArgument(arg_dict, 'bandwidth_limit', die_fn)
       
  4095   unused_rps_limit = GetArgument(arg_dict, 'rps_limit', die_fn)
       
  4096   unused_http_limit = GetArgument(arg_dict, 'http_limit', die_fn)
       
  4097   unused_auth_domain = GetArgument(arg_dict, 'auth_domain', die_fn)
       
  4098   unused_has_headers = GetArgument(arg_dict, 'has_header', die_fn)
       
  4099   unused_loader_opts = GetArgument(arg_dict, 'loader_opts', die_fn)
       
  4100   unused_exporter_opts = GetArgument(arg_dict, 'exporter_opts', die_fn)
       
  4101 
       
  4102   errors = []
       
  4103 
  2399   if db_filename is None:
  4104   if db_filename is None:
  2400     db_filename = time.strftime('bulkloader-progress-%Y%m%d.%H%M%S.sql3')
  4105     arg_dict['db_filename'] = time.strftime(
       
  4106         'bulkloader-progress-%Y%m%d.%H%M%S.sql3')
       
  4107 
       
  4108   if result_db_filename is None:
       
  4109     arg_dict['result_db_filename'] = time.strftime(
       
  4110         'bulkloader-results-%Y%m%d.%H%M%S.sql3')
       
  4111 
       
  4112   if log_file is None:
       
  4113     arg_dict['log_file'] = time.strftime('bulkloader-log-%Y%m%d.%H%M%S')
  2401 
  4114 
  2402   if batch_size <= 0:
  4115   if batch_size <= 0:
  2403     print >>sys.stderr, 'batch_size must be 1 or larger'
  4116     errors.append('batch_size must be at least 1')
  2404     die_fn()
  4117 
  2405 
  4118   required = '%s argument required'
  2406   if url is None:
  4119 
  2407     _MissingArgument('url', die_fn)
  4120   if url is REQUIRED_OPTION:
  2408 
  4121     errors.append(required % 'url')
  2409   if filename is None:
  4122 
  2410     _MissingArgument('filename', die_fn)
  4123   if filename is REQUIRED_OPTION:
  2411 
  4124     errors.append(required % 'filename')
  2412   if kind is None:
  4125 
  2413     _MissingArgument('kind', die_fn)
  4126   if kind is REQUIRED_OPTION:
  2414 
  4127     errors.append(required % 'kind')
  2415   if config_file is None:
  4128 
  2416     _MissingArgument('config_file', die_fn)
  4129   if config_file is REQUIRED_OPTION:
  2417 
  4130     errors.append(required % 'config_file')
  2418   if app_id is None:
  4131 
       
  4132   if download:
       
  4133     if result_db_filename is REQUIRED_OPTION:
       
  4134       errors.append(required % 'result_db_filename')
       
  4135 
       
  4136   if not app_id:
  2419     (unused_scheme, host_port, unused_url_path,
  4137     (unused_scheme, host_port, unused_url_path,
  2420      unused_query, unused_fragment) = urlparse.urlsplit(url)
  4138      unused_query, unused_fragment) = urlparse.urlsplit(url)
  2421     suffix_idx = host_port.find('.appspot.com')
  4139     suffix_idx = host_port.find('.appspot.com')
  2422     if suffix_idx > -1:
  4140     if suffix_idx > -1:
  2423       app_id = host_port[:suffix_idx]
  4141       arg_dict['app_id'] = host_port[:suffix_idx]
  2424     elif host_port.split(':')[0].endswith('google.com'):
  4142     elif host_port.split(':')[0].endswith('google.com'):
  2425       app_id = host_port.split('.')[0]
  4143       arg_dict['app_id'] = host_port.split('.')[0]
  2426     else:
  4144     else:
  2427       print >>sys.stderr, 'app_id required for non appspot.com domains'
  4145       errors.append('app_id argument required for non appspot.com domains')
  2428       die_fn()
  4146 
  2429 
  4147   if errors:
  2430   return (app_id, url, filename, batch_size, kind, num_threads,
  4148     print >>sys.stderr, '\n'.join(errors)
  2431           bandwidth_limit, rps_limit, http_limit, db_filename, config_file,
  4149     die_fn()
  2432           auth_domain)
  4150 
  2433 
  4151   return arg_dict
  2434 
  4152 
  2435 def _PerformBulkload(app_id=None,
  4153 
  2436                      url=None,
  4154 def ParseKind(kind):
  2437                      filename=None,
  4155   if kind and kind[0] == '(' and kind[-1] == ')':
  2438                      batch_size=DEFAULT_BATCH_SIZE,
  4156     return tuple(kind[1:-1].split(','))
  2439                      kind=None,
  4157   else:
  2440                      num_threads=DEFAULT_THREAD_COUNT,
  4158     return kind
  2441                      bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
  4159 
  2442                      rps_limit=DEFAULT_RPS_LIMIT,
  4160 
  2443                      http_limit=DEFAULT_REQUEST_LIMIT,
  4161 def _PerformBulkload(arg_dict,
  2444                      db_filename=None,
  4162                      check_file=CheckFile,
  2445                      config_file=None,
  4163                      check_output_file=CheckOutputFile):
  2446                      auth_domain='gmail.com'):
  4164   """Runs the bulkloader, given the command line options.
  2447   """Runs the bulkloader, given the options as keyword arguments.
       
  2448 
  4165 
  2449   Args:
  4166   Args:
  2450     app_id: The application id.
  4167     arg_dict: Dictionary of bulkloader options.
  2451     url: The url of the remote_api endpoint.
  4168     check_file: Used for dependency injection.
  2452     filename: The name of the file containing the CSV data.
  4169     check_output_file: Used for dependency injection.
  2453     batch_size: The number of records to send per request.
  4170 
  2454     kind: The kind of entity to transfer.
  4171   Returns:
  2455     num_threads: The number of threads to use to transfer data.
  4172     An exit code.
  2456     bandwidth_limit: Maximum bytes/second to transfers.
  4173 
  2457     rps_limit: Maximum records/second to transfer.
  4174   Raises:
  2458     http_limit: Maximum requests/second for transfers.
  4175     ConfigurationError: if inconsistent options are passed.
  2459     db_filename: The name of the SQLite3 progress database file.
  4176   """
  2460     config_file: The name of the configuration file.
  4177   app_id = arg_dict['app_id']
  2461     auth_domain: The auth domain to use for logins and UserProperty.
  4178   url = arg_dict['url']
       
  4179   filename = arg_dict['filename']
       
  4180   batch_size = arg_dict['batch_size']
       
  4181   kind = arg_dict['kind']
       
  4182   num_threads = arg_dict['num_threads']
       
  4183   bandwidth_limit = arg_dict['bandwidth_limit']
       
  4184   rps_limit = arg_dict['rps_limit']
       
  4185   http_limit = arg_dict['http_limit']
       
  4186   db_filename = arg_dict['db_filename']
       
  4187   config_file = arg_dict['config_file']
       
  4188   auth_domain = arg_dict['auth_domain']
       
  4189   has_header = arg_dict['has_header']
       
  4190   download = arg_dict['download']
       
  4191   result_db_filename = arg_dict['result_db_filename']
       
  4192   loader_opts = arg_dict['loader_opts']
       
  4193   exporter_opts = arg_dict['exporter_opts']
       
  4194   email = arg_dict['email']
       
  4195   passin = arg_dict['passin']
       
  4196 
       
  4197   os.environ['AUTH_DOMAIN'] = auth_domain
       
  4198 
       
  4199   kind = ParseKind(kind)
       
  4200 
       
  4201   check_file(config_file)
       
  4202   if not download:
       
  4203     check_file(filename)
       
  4204   else:
       
  4205     check_output_file(filename)
       
  4206 
       
  4207   LoadConfig(config_file)
       
  4208 
       
  4209   os.environ['APPLICATION_ID'] = app_id
       
  4210 
       
  4211   throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
       
  4212 
       
  4213   throttle = Throttle(layout=throttle_layout)
       
  4214   signature = _MakeSignature(app_id=app_id,
       
  4215                              url=url,
       
  4216                              kind=kind,
       
  4217                              db_filename=db_filename,
       
  4218                              download=download,
       
  4219                              has_header=has_header,
       
  4220                              result_db_filename=result_db_filename)
       
  4221 
       
  4222 
       
  4223   max_queue_size = max(DEFAULT_QUEUE_SIZE, 3 * num_threads + 5)
       
  4224 
       
  4225   if db_filename == 'skip':
       
  4226     progress_db = StubProgressDatabase()
       
  4227   elif not download:
       
  4228     progress_db = ProgressDatabase(db_filename, signature)
       
  4229   else:
       
  4230     progress_db = ExportProgressDatabase(db_filename, signature)
       
  4231 
       
  4232   if download:
       
  4233     result_db = ResultDatabase(result_db_filename, signature)
       
  4234 
       
  4235   return_code = 1
       
  4236 
       
  4237   if not download:
       
  4238     loader = Loader.RegisteredLoader(kind)
       
  4239     try:
       
  4240       loader.initialize(filename, loader_opts)
       
  4241       workitem_generator_factory = GetCSVGeneratorFactory(
       
  4242           kind, filename, batch_size, has_header)
       
  4243 
       
  4244       app = BulkUploaderApp(arg_dict,
       
  4245                             workitem_generator_factory,
       
  4246                             throttle,
       
  4247                             progress_db,
       
  4248                             BulkLoaderThread,
       
  4249                             ProgressTrackerThread,
       
  4250                             max_queue_size,
       
  4251                             RequestManager,
       
  4252                             DataSourceThread,
       
  4253                             ReQueue,
       
  4254                             Queue.Queue)
       
  4255       try:
       
  4256         return_code = app.Run()
       
  4257       except AuthenticationError:
       
  4258         logger.info('Authentication Failed')
       
  4259     finally:
       
  4260       loader.finalize()
       
  4261   else:
       
  4262     exporter = Exporter.RegisteredExporter(kind)
       
  4263     try:
       
  4264       exporter.initialize(filename, exporter_opts)
       
  4265 
       
  4266       def KeyRangeGeneratorFactory(progress_queue, progress_gen):
       
  4267         return KeyRangeGenerator(kind, progress_queue, progress_gen)
       
  4268 
       
  4269       def ExportProgressThreadFactory(progress_queue, progress_db):
       
  4270         return ExportProgressThread(kind,
       
  4271                                     progress_queue,
       
  4272                                     progress_db,
       
  4273                                     result_db)
       
  4274       app = BulkDownloaderApp(arg_dict,
       
  4275                               KeyRangeGeneratorFactory,
       
  4276                               throttle,
       
  4277                               progress_db,
       
  4278                               BulkExporterThread,
       
  4279                               ExportProgressThreadFactory,
       
  4280                               0,
       
  4281                               RequestManager,
       
  4282                               DataSourceThread,
       
  4283                               ReQueue,
       
  4284                               Queue.Queue)
       
  4285       try:
       
  4286         return_code = app.Run()
       
  4287       except AuthenticationError:
       
  4288         logger.info('Authentication Failed')
       
  4289     finally:
       
  4290       exporter.finalize()
       
  4291   return return_code
       
  4292 
       
  4293 
       
  4294 def SetupLogging(arg_dict):
       
  4295   """Sets up logging for the bulkloader.
       
  4296 
       
  4297   Args:
       
  4298     arg_dict: Dictionary mapping flag names to their arguments.
       
  4299   """
       
  4300   format = '[%(levelname)-8s %(asctime)s %(filename)s] %(message)s'
       
  4301   debug = arg_dict['debug']
       
  4302   log_file = arg_dict['log_file']
       
  4303 
       
  4304   logger.setLevel(logging.DEBUG)
       
  4305 
       
  4306   logger.propagate = False
       
  4307 
       
  4308   file_handler = logging.FileHandler(log_file, 'w')
       
  4309   file_handler.setLevel(logging.DEBUG)
       
  4310   file_formatter = logging.Formatter(format)
       
  4311   file_handler.setFormatter(file_formatter)
       
  4312   logger.addHandler(file_handler)
       
  4313 
       
  4314   console = logging.StreamHandler()
       
  4315   level = logging.INFO
       
  4316   if debug:
       
  4317     level = logging.DEBUG
       
  4318   console.setLevel(level)
       
  4319   console_format = '[%(levelname)-8s] %(message)s'
       
  4320   formatter = logging.Formatter(console_format)
       
  4321   console.setFormatter(formatter)
       
  4322   logger.addHandler(console)
       
  4323 
       
  4324   logger.info('Logging to %s', log_file)
       
  4325 
       
  4326   appengine_rpc.logger.setLevel(logging.WARN)
       
  4327 
       
  4328 
       
  4329 def Run(arg_dict):
       
  4330   """Sets up and runs the bulkloader, given the options as keyword arguments.
       
  4331 
       
  4332   Args:
       
  4333     arg_dict: Dictionary of bulkloader options
  2462 
  4334 
  2463   Returns:
  4335   Returns:
  2464     An exit code.
  4336     An exit code.
  2465   """
  4337   """
  2466   os.environ['AUTH_DOMAIN'] = auth_domain
  4338   arg_dict = ProcessArguments(arg_dict)
  2467   LoadConfig(config_file)
  4339 
  2468 
  4340   SetupLogging(arg_dict)
  2469   throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
  4341 
  2470 
  4342   return _PerformBulkload(arg_dict)
  2471   throttle = Throttle(layout=throttle_layout)
       
  2472 
       
  2473 
       
  2474   workitem_generator_factory = GetCSVGeneratorFactory(filename, batch_size)
       
  2475 
       
  2476   if db_filename == 'skip':
       
  2477     progress_db = StubProgressDatabase()
       
  2478   else:
       
  2479     progress_db = ProgressDatabase(db_filename)
       
  2480 
       
  2481 
       
  2482   max_queue_size = max(DEFAULT_QUEUE_SIZE, 2 * num_threads + 5)
       
  2483 
       
  2484   PerformBulkUpload(app_id,
       
  2485                     url,
       
  2486                     kind,
       
  2487                     workitem_generator_factory,
       
  2488                     num_threads,
       
  2489                     throttle,
       
  2490                     progress_db,
       
  2491                     max_queue_size=max_queue_size)
       
  2492 
       
  2493   return 0
       
  2494 
       
  2495 
       
  2496 def Run(app_id=None,
       
  2497         url=None,
       
  2498         filename=None,
       
  2499         batch_size=DEFAULT_BATCH_SIZE,
       
  2500         kind=None,
       
  2501         num_threads=DEFAULT_THREAD_COUNT,
       
  2502         bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
       
  2503         rps_limit=DEFAULT_RPS_LIMIT,
       
  2504         http_limit=DEFAULT_REQUEST_LIMIT,
       
  2505         db_filename=None,
       
  2506         auth_domain='gmail.com',
       
  2507         config_file=None):
       
  2508   """Sets up and runs the bulkloader, given the options as keyword arguments.
       
  2509 
       
  2510   Args:
       
  2511     app_id: The application id.
       
  2512     url: The url of the remote_api endpoint.
       
  2513     filename: The name of the file containing the CSV data.
       
  2514     batch_size: The number of records to send per request.
       
  2515     kind: The kind of entity to transfer.
       
  2516     num_threads: The number of threads to use to transfer data.
       
  2517     bandwidth_limit: Maximum bytes/second to transfers.
       
  2518     rps_limit: Maximum records/second to transfer.
       
  2519     http_limit: Maximum requests/second for transfers.
       
  2520     db_filename: The name of the SQLite3 progress database file.
       
  2521     config_file: The name of the configuration file.
       
  2522     auth_domain: The auth domain to use for logins and UserProperty.
       
  2523 
       
  2524   Returns:
       
  2525     An exit code.
       
  2526   """
       
  2527   logging.basicConfig(
       
  2528       format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')
       
  2529   args = ProcessArguments(app_id=app_id,
       
  2530                           url=url,
       
  2531                           filename=filename,
       
  2532                           batch_size=batch_size,
       
  2533                           kind=kind,
       
  2534                           num_threads=num_threads,
       
  2535                           bandwidth_limit=bandwidth_limit,
       
  2536                           rps_limit=rps_limit,
       
  2537                           http_limit=http_limit,
       
  2538                           db_filename=db_filename,
       
  2539                           config_file=config_file)
       
  2540 
       
  2541   (app_id, url, filename, batch_size, kind, num_threads, bandwidth_limit,
       
  2542    rps_limit, http_limit, db_filename, config_file, auth_domain) = args
       
  2543 
       
  2544   return _PerformBulkload(app_id=app_id,
       
  2545                           url=url,
       
  2546                           filename=filename,
       
  2547                           batch_size=batch_size,
       
  2548                           kind=kind,
       
  2549                           num_threads=num_threads,
       
  2550                           bandwidth_limit=bandwidth_limit,
       
  2551                           rps_limit=rps_limit,
       
  2552                           http_limit=http_limit,
       
  2553                           db_filename=db_filename,
       
  2554                           config_file=config_file,
       
  2555                           auth_domain=auth_domain)
       
  2556 
  4343 
  2557 
  4344 
  2558 def main(argv):
  4345 def main(argv):
  2559   """Runs the importer from the command line."""
  4346   """Runs the importer from the command line."""
  2560   logging.basicConfig(
  4347 
  2561       level=logging.INFO,
  4348   arg_dict = ParseArguments(argv)
  2562       format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')
  4349 
  2563 
  4350   errors = ['%s argument required' % key
  2564   args = ParseArguments(argv)
  4351             for (key, value) in arg_dict.iteritems()
  2565   if None in args:
  4352             if value is REQUIRED_OPTION]
  2566     print >>sys.stderr, 'Invalid arguments'
  4353   if errors:
       
  4354     print >>sys.stderr, '\n'.join(errors)
  2567     PrintUsageExit(1)
  4355     PrintUsageExit(1)
  2568 
  4356 
  2569   (app_id, url, filename, batch_size, kind, num_threads,
  4357   SetupLogging(arg_dict)
  2570    bandwidth_limit, rps_limit, http_limit, db_filename, config_file,
  4358   return _PerformBulkload(arg_dict)
  2571    auth_domain) = args
       
  2572 
       
  2573   return _PerformBulkload(app_id=app_id,
       
  2574                           url=url,
       
  2575                           filename=filename,
       
  2576                           batch_size=batch_size,
       
  2577                           kind=kind,
       
  2578                           num_threads=num_threads,
       
  2579                           bandwidth_limit=bandwidth_limit,
       
  2580                           rps_limit=rps_limit,
       
  2581                           http_limit=http_limit,
       
  2582                           db_filename=db_filename,
       
  2583                           config_file=config_file,
       
  2584                           auth_domain=auth_domain)
       
  2585 
  4359 
  2586 
  4360 
  2587 if __name__ == '__main__':
  4361 if __name__ == '__main__':
  2588   sys.exit(main(sys.argv))
  4362   sys.exit(main(sys.argv))