thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 2864 2e0b0af889be
parent 2413 d0b7dac5325c
child 3031 7678f72140e6
equal deleted inserted replaced
2862:27971a13089f 2864:2e0b0af889be
    12 # distributed under the License is distributed on an "AS IS" BASIS,
    12 # distributed under the License is distributed on an "AS IS" BASIS,
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14 # See the License for the specific language governing permissions and
    14 # See the License for the specific language governing permissions and
    15 # limitations under the License.
    15 # limitations under the License.
    16 #
    16 #
    17 
       
    18 """Imports data over HTTP.
    17 """Imports data over HTTP.
    19 
    18 
    20 Usage:
    19 Usage:
    21   %(arg0)s [flags]
    20   %(arg0)s [flags]
    22 
    21 
    31                             restricted to this rate. (Default 250000)
    30                             restricted to this rate. (Default 250000)
    32     --batch_size=<int>      Number of Entity objects to include in each post to
    31     --batch_size=<int>      Number of Entity objects to include in each post to
    33                             the URL endpoint. The more data per row/Entity, the
    32                             the URL endpoint. The more data per row/Entity, the
    34                             smaller the batch size should be. (Default 10)
    33                             smaller the batch size should be. (Default 10)
    35     --config_file=<path>    File containing Model and Loader definitions.
    34     --config_file=<path>    File containing Model and Loader definitions.
    36                             (Required)
    35                             (Required unless --dump or --restore are used)
    37     --db_filename=<path>    Specific progress database to write to, or to
    36     --db_filename=<path>    Specific progress database to write to, or to
    38                             resume from. If not supplied, then a new database
    37                             resume from. If not supplied, then a new database
    39                             will be started, named:
    38                             will be started, named:
    40                             bulkloader-progress-TIMESTAMP.
    39                             bulkloader-progress-TIMESTAMP.
    41                             The special filename "skip" may be used to simply
    40                             The special filename "skip" may be used to simply
    42                             skip reading/writing any progress information.
    41                             skip reading/writing any progress information.
    43     --download              Export entities to a file.
    42     --download              Export entities to a file.
       
    43     --dry_run               Do not execute any remote_api calls.
       
    44     --dump                  Use zero-configuration dump format.
    44     --email=<string>        The username to use. Will prompt if omitted.
    45     --email=<string>        The username to use. Will prompt if omitted.
    45     --exporter_opts=<string>
    46     --exporter_opts=<string>
    46                             A string to pass to the Exporter.initialize method.
    47                             A string to pass to the Exporter.initialize method.
    47     --filename=<path>       Path to the file to import. (Required)
    48     --filename=<path>       Path to the file to import. (Required)
    48     --has_header            Skip the first row of the input.
    49     --has_header            Skip the first row of the input.
    52                             datastore. (Required)
    53                             datastore. (Required)
    53     --loader_opts=<string>  A string to pass to the Loader.initialize method.
    54     --loader_opts=<string>  A string to pass to the Loader.initialize method.
    54     --log_file=<path>       File to write bulkloader logs.  If not supplied
    55     --log_file=<path>       File to write bulkloader logs.  If not supplied
    55                             then a new log file will be created, named:
    56                             then a new log file will be created, named:
    56                             bulkloader-log-TIMESTAMP.
    57                             bulkloader-log-TIMESTAMP.
       
    58     --map                   Map an action across datastore entities.
       
    59     --mapper_opts=<string>  A string to pass to the Mapper.Initialize method.
    57     --num_threads=<int>     Number of threads to use for uploading entities
    60     --num_threads=<int>     Number of threads to use for uploading entities
    58                             (Default 10)
    61                             (Default 10)
    59     --passin                Read the login password from stdin.
    62     --passin                Read the login password from stdin.
       
    63     --restore               Restore from zero-configuration dump format.
    60     --result_db_filename=<path>
    64     --result_db_filename=<path>
    61                             Result database to write to for downloads.
    65                             Result database to write to for downloads.
    62     --rps_limit=<int>       The maximum number of records per second to
    66     --rps_limit=<int>       The maximum number of records per second to
    63                             transfer to the server. (Default: 20)
    67                             transfer to the server. (Default: 20)
    64     --url=<string>          URL endpoint to post to for importing data.
    68     --url=<string>          URL endpoint to post to for importing data.
    76 
    80 
    77 """
    81 """
    78 
    82 
    79 
    83 
    80 
    84 
    81 import cPickle
       
    82 import csv
    85 import csv
    83 import errno
    86 import errno
    84 import getopt
    87 import getopt
    85 import getpass
    88 import getpass
    86 import imp
    89 import imp
    87 import logging
    90 import logging
    88 import os
    91 import os
    89 import Queue
    92 import Queue
    90 import re
    93 import re
       
    94 import shutil
    91 import signal
    95 import signal
    92 import StringIO
    96 import StringIO
    93 import sys
    97 import sys
    94 import threading
    98 import threading
    95 import time
    99 import time
       
   100 import traceback
    96 import urllib2
   101 import urllib2
    97 import urlparse
   102 import urlparse
    98 
   103 
       
   104 from google.appengine.datastore import entity_pb
       
   105 
       
   106 from google.appengine.api import apiproxy_stub_map
       
   107 from google.appengine.api import datastore
    99 from google.appengine.api import datastore_errors
   108 from google.appengine.api import datastore_errors
       
   109 from google.appengine.datastore import datastore_pb
   100 from google.appengine.ext import db
   110 from google.appengine.ext import db
       
   111 from google.appengine.ext import key_range as key_range_module
   101 from google.appengine.ext.db import polymodel
   112 from google.appengine.ext.db import polymodel
   102 from google.appengine.ext.remote_api import remote_api_stub
   113 from google.appengine.ext.remote_api import remote_api_stub
       
   114 from google.appengine.ext.remote_api import throttle as remote_api_throttle
   103 from google.appengine.runtime import apiproxy_errors
   115 from google.appengine.runtime import apiproxy_errors
       
   116 from google.appengine.tools import adaptive_thread_pool
   104 from google.appengine.tools import appengine_rpc
   117 from google.appengine.tools import appengine_rpc
       
   118 from google.appengine.tools.requeue import ReQueue
   105 
   119 
   106 try:
   120 try:
   107   import sqlite3
   121   import sqlite3
   108 except ImportError:
   122 except ImportError:
   109   pass
   123   pass
   110 
   124 
   111 logger = logging.getLogger('google.appengine.tools.bulkloader')
   125 logger = logging.getLogger('google.appengine.tools.bulkloader')
   112 
   126 
       
   127 KeyRange = key_range_module.KeyRange
       
   128 
   113 DEFAULT_THREAD_COUNT = 10
   129 DEFAULT_THREAD_COUNT = 10
   114 
   130 
   115 DEFAULT_BATCH_SIZE = 10
   131 DEFAULT_BATCH_SIZE = 10
       
   132 
       
   133 DEFAULT_DOWNLOAD_BATCH_SIZE = 100
   116 
   134 
   117 DEFAULT_QUEUE_SIZE = DEFAULT_THREAD_COUNT * 10
   135 DEFAULT_QUEUE_SIZE = DEFAULT_THREAD_COUNT * 10
   118 
   136 
   119 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
   137 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
   120 
   138 
   123 STATE_SENT = 2
   141 STATE_SENT = 2
   124 STATE_NOT_SENT = 3
   142 STATE_NOT_SENT = 3
   125 
   143 
   126 STATE_GETTING = 1
   144 STATE_GETTING = 1
   127 STATE_GOT = 2
   145 STATE_GOT = 2
   128 STATE_NOT_GOT = 3
   146 STATE_ERROR = 3
   129 
       
   130 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
       
   131 
   147 
   132 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
   148 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
   133 
   149 
   134 INITIAL_BACKOFF = 1.0
   150 INITIAL_BACKOFF = 1.0
   135 
   151 
   140 
   156 
   141 DEFAULT_RPS_LIMIT = 20
   157 DEFAULT_RPS_LIMIT = 20
   142 
   158 
   143 DEFAULT_REQUEST_LIMIT = 8
   159 DEFAULT_REQUEST_LIMIT = 8
   144 
   160 
   145 BANDWIDTH_UP = 'http-bandwidth-up'
   161 MAXIMUM_INCREASE_DURATION = 5.0
   146 BANDWIDTH_DOWN = 'http-bandwidth-down'
   162 MAXIMUM_HOLD_DURATION = 12.0
   147 REQUESTS = 'http-requests'
       
   148 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up'
       
   149 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down'
       
   150 HTTPS_REQUESTS = 'https-requests'
       
   151 RECORDS = 'records'
       
   152 
       
   153 MAXIMUM_INCREASE_DURATION = 8.0
       
   154 MAXIMUM_HOLD_DURATION = 10.0
       
   155 
   163 
   156 
   164 
   157 def ImportStateMessage(state):
   165 def ImportStateMessage(state):
   158   """Converts a numeric state identifier to a status message."""
   166   """Converts a numeric state identifier to a status message."""
   159   return ({
   167   return ({
   168   """Converts a numeric state identifier to a status message."""
   176   """Converts a numeric state identifier to a status message."""
   169   return ({
   177   return ({
   170       STATE_READ: 'Batch read from file.',
   178       STATE_READ: 'Batch read from file.',
   171       STATE_GETTING: 'Fetching batch from server',
   179       STATE_GETTING: 'Fetching batch from server',
   172       STATE_GOT: 'Batch successfully fetched.',
   180       STATE_GOT: 'Batch successfully fetched.',
   173       STATE_NOT_GOT: 'Error while fetching batch'
   181       STATE_ERROR: 'Error while fetching batch'
       
   182   }[state])
       
   183 
       
   184 
       
   185 def MapStateMessage(state):
       
   186   """Converts a numeric state identifier to a status message."""
       
   187   return ({
       
   188       STATE_READ: 'Batch read from file.',
       
   189       STATE_GETTING: 'Querying for batch from server',
       
   190       STATE_GOT: 'Batch successfully fetched.',
       
   191       STATE_ERROR: 'Error while fetching or mapping.'
   174   }[state])
   192   }[state])
   175 
   193 
   176 
   194 
   177 def ExportStateName(state):
   195 def ExportStateName(state):
   178   """Converts a numeric state identifier to a string."""
   196   """Converts a numeric state identifier to a string."""
   179   return ({
   197   return ({
   180       STATE_READ: 'READ',
   198       STATE_READ: 'READ',
   181       STATE_GETTING: 'GETTING',
   199       STATE_GETTING: 'GETTING',
   182       STATE_GOT: 'GOT',
   200       STATE_GOT: 'GOT',
   183       STATE_NOT_GOT: 'NOT_GOT'
   201       STATE_ERROR: 'NOT_GOT'
   184   }[state])
   202   }[state])
   185 
   203 
   186 
   204 
   187 def ImportStateName(state):
   205 def ImportStateName(state):
   188   """Converts a numeric state identifier to a string."""
   206   """Converts a numeric state identifier to a string."""
   189   return ({
   207   return ({
   190       STATE_READ: 'READ',
   208       STATE_READ: 'READ',
   191       STATE_GETTING: 'SENDING',
   209       STATE_GETTING: 'SENDING',
   192       STATE_GOT: 'SENT',
   210       STATE_GOT: 'SENT',
   193       STATE_NOT_GOT: 'NOT_SENT'
   211       STATE_NOT_SENT: 'NOT_SENT'
   194   }[state])
   212   }[state])
   195 
   213 
   196 
   214 
   197 class Error(Exception):
   215 class Error(Exception):
   198   """Base-class for exceptions in this module."""
   216   """Base-class for exceptions in this module."""
   232 
   250 
   233 class FileNotWritableError(Error):
   251 class FileNotWritableError(Error):
   234   """A filename passed in by the user refers to a non-writable output file."""
   252   """A filename passed in by the user refers to a non-writable output file."""
   235 
   253 
   236 
   254 
   237 class KeyRangeError(Error):
       
   238   """Error while trying to generate a KeyRange."""
       
   239 
       
   240 
       
   241 class BadStateError(Error):
   255 class BadStateError(Error):
   242   """A work item in an unexpected state was encountered."""
   256   """A work item in an unexpected state was encountered."""
   243 
   257 
   244 
   258 
       
   259 class KeyRangeError(Error):
       
   260   """An error during construction of a KeyRangeItem."""
       
   261 
       
   262 
       
   263 class FieldSizeLimitError(Error):
       
   264   """The csv module tried to read a field larger than the size limit."""
       
   265 
       
   266   def __init__(self, limit):
       
   267     self.message = """
       
   268 A field in your CSV input file has exceeded the current limit of %d.
       
   269 
       
   270 You can raise this limit by adding the following lines to your config file:
       
   271 
       
   272 import csv
       
   273 csv.field_size_limit(new_limit)
       
   274 
       
   275 where new_limit is number larger than the size in bytes of the largest
       
   276 field in your CSV.
       
   277 """ % limit
       
   278     Error.__init__(self, self.message)
       
   279 
       
   280 
   245 class NameClashError(Error):
   281 class NameClashError(Error):
   246   """A name clash occurred while trying to alias old method names."""
   282   """A name clash occurred while trying to alias old method names."""
       
   283 
   247   def __init__(self, old_name, new_name, klass):
   284   def __init__(self, old_name, new_name, klass):
   248     Error.__init__(self, old_name, new_name, klass)
   285     Error.__init__(self, old_name, new_name, klass)
   249     self.old_name = old_name
   286     self.old_name = old_name
   250     self.new_name = new_name
   287     self.new_name = new_name
   251     self.klass = klass
   288     self.klass = klass
   252 
   289 
   253 
   290 
   254 def GetCSVGeneratorFactory(kind, csv_filename, batch_size, csv_has_header,
   291 def GetCSVGeneratorFactory(kind, csv_filename, batch_size, csv_has_header,
   255                            openfile=open, create_csv_reader=csv.reader):
   292                            openfile=open, create_csv_reader=csv.reader):
   256   """Return a factory that creates a CSV-based WorkItem generator.
   293   """Return a factory that creates a CSV-based UploadWorkItem generator.
   257 
   294 
   258   Args:
   295   Args:
   259     kind: The kind of the entities being uploaded.
   296     kind: The kind of the entities being uploaded.
   260     csv_filename: File on disk containing CSV data.
   297     csv_filename: File on disk containing CSV data.
   261     batch_size: Maximum number of CSV rows to stash into a WorkItem.
   298     batch_size: Maximum number of CSV rows to stash into an UploadWorkItem.
   262     csv_has_header: Whether to skip the first row of the CSV.
   299     csv_has_header: Whether to skip the first row of the CSV.
   263     openfile: Used for dependency injection.
   300     openfile: Used for dependency injection.
   264     create_csv_reader: Used for dependency injection.
   301     create_csv_reader: Used for dependency injection.
   265 
   302 
   266   Returns:
   303   Returns:
   267     A callable (accepting the Progress Queue and Progress Generators
   304     A callable (accepting the Progress Queue and Progress Generators
   268     as input) which creates the WorkItem generator.
   305     as input) which creates the UploadWorkItem generator.
   269   """
   306   """
   270   loader = Loader.RegisteredLoader(kind)
   307   loader = Loader.RegisteredLoader(kind)
   271   loader._Loader__openfile = openfile
   308   loader._Loader__openfile = openfile
   272   loader._Loader__create_csv_reader = create_csv_reader
   309   loader._Loader__create_csv_reader = create_csv_reader
   273   record_generator = loader.generate_records(csv_filename)
   310   record_generator = loader.generate_records(csv_filename)
   274 
   311 
   275   def CreateGenerator(progress_queue, progress_generator):
   312   def CreateGenerator(request_manager, progress_queue, progress_generator):
   276     """Initialize a WorkItem generator linked to a progress generator and queue.
   313     """Initialize a UploadWorkItem generator.
   277 
   314 
   278     Args:
   315     Args:
       
   316       request_manager: A RequestManager instance.
   279       progress_queue: A ProgressQueue instance to send progress information.
   317       progress_queue: A ProgressQueue instance to send progress information.
   280       progress_generator: A generator of progress information or None.
   318       progress_generator: A generator of progress information or None.
   281 
   319 
   282     Returns:
   320     Returns:
   283       A WorkItemGenerator instance.
   321       An UploadWorkItemGenerator instance.
   284     """
   322     """
   285     return WorkItemGenerator(progress_queue,
   323     return UploadWorkItemGenerator(request_manager,
   286                              progress_generator,
   324                                    progress_queue,
   287                              record_generator,
   325                                    progress_generator,
   288                              csv_has_header,
   326                                    record_generator,
   289                              batch_size)
   327                                    csv_has_header,
       
   328                                    batch_size)
   290 
   329 
   291   return CreateGenerator
   330   return CreateGenerator
   292 
   331 
   293 
   332 
   294 class WorkItemGenerator(object):
   333 class UploadWorkItemGenerator(object):
   295   """Reads rows from a row generator and generates WorkItems of batches."""
   334   """Reads rows from a row generator and generates UploadWorkItems."""
   296 
   335 
   297   def __init__(self,
   336   def __init__(self,
       
   337                request_manager,
   298                progress_queue,
   338                progress_queue,
   299                progress_generator,
   339                progress_generator,
   300                record_generator,
   340                record_generator,
   301                skip_first,
   341                skip_first,
   302                batch_size):
   342                batch_size):
   303     """Initialize a WorkItemGenerator.
   343     """Initialize a WorkItemGenerator.
   304 
   344 
   305     Args:
   345     Args:
       
   346       request_manager: A RequestManager instance with which to associate
       
   347         WorkItems.
   306       progress_queue: A progress queue with which to associate WorkItems.
   348       progress_queue: A progress queue with which to associate WorkItems.
   307       progress_generator: A generator of progress information.
   349       progress_generator: A generator of progress information.
   308       record_generator: A generator of data records.
   350       record_generator: A generator of data records.
   309       skip_first: Whether to skip the first data record.
   351       skip_first: Whether to skip the first data record.
   310       batch_size: The number of data records per WorkItem.
   352       batch_size: The number of data records per WorkItem.
   311     """
   353     """
       
   354     self.request_manager = request_manager
   312     self.progress_queue = progress_queue
   355     self.progress_queue = progress_queue
   313     self.progress_generator = progress_generator
   356     self.progress_generator = progress_generator
   314     self.reader = record_generator
   357     self.reader = record_generator
   315     self.skip_first = skip_first
   358     self.skip_first = skip_first
   316     self.batch_size = batch_size
   359     self.batch_size = batch_size
   358                             (self.column_count, str(row)))
   401                             (self.column_count, str(row)))
   359       self.read_rows.append((self.line_number, row))
   402       self.read_rows.append((self.line_number, row))
   360       self.line_number += 1
   403       self.line_number += 1
   361 
   404 
   362   def _MakeItem(self, key_start, key_end, rows, progress_key=None):
   405   def _MakeItem(self, key_start, key_end, rows, progress_key=None):
   363     """Makes a WorkItem containing the given rows, with the given keys.
   406     """Makes a UploadWorkItem containing the given rows, with the given keys.
   364 
   407 
   365     Args:
   408     Args:
   366       key_start: The start key for the WorkItem.
   409       key_start: The start key for the UploadWorkItem.
   367       key_end: The end key for the WorkItem.
   410       key_end: The end key for the UploadWorkItem.
   368       rows: A list of the rows for the WorkItem.
   411       rows: A list of the rows for the UploadWorkItem.
   369       progress_key: The progress key for the WorkItem
   412       progress_key: The progress key for the UploadWorkItem
   370 
   413 
   371     Returns:
   414     Returns:
   372       A WorkItem instance for the given batch.
   415       An UploadWorkItem instance for the given batch.
   373     """
   416     """
   374     assert rows
   417     assert rows
   375 
   418 
   376     item = WorkItem(self.progress_queue, rows,
   419     item = UploadWorkItem(self.request_manager, self.progress_queue, rows,
   377                     key_start, key_end,
   420                           key_start, key_end, progress_key=progress_key)
   378                     progress_key=progress_key)
       
   379 
   421 
   380     return item
   422     return item
   381 
   423 
   382   def Batches(self):
   424   def Batches(self):
   383     """Reads from the record_generator and generates WorkItems.
   425     """Reads from the record_generator and generates UploadWorkItems.
   384 
   426 
   385     Yields:
   427     Yields:
   386       Instances of class WorkItem
   428       Instances of class UploadWorkItem
   387 
   429 
   388     Raises:
   430     Raises:
   389       ResumeError: If the progress database and data file indicate a different
   431       ResumeError: If the progress database and data file indicate a different
   390         number of rows.
   432         number of rows.
   391     """
   433     """
   466       ResumeError: If the progress database and data file indicate a different
   508       ResumeError: If the progress database and data file indicate a different
   467         number of rows.
   509         number of rows.
   468     """
   510     """
   469     csv_file = self.openfile(self.csv_filename, 'rb')
   511     csv_file = self.openfile(self.csv_filename, 'rb')
   470     reader = self.create_csv_reader(csv_file, skipinitialspace=True)
   512     reader = self.create_csv_reader(csv_file, skipinitialspace=True)
   471     return reader
   513     try:
   472 
   514       for record in reader:
   473 
   515         yield record
   474 class KeyRangeGenerator(object):
   516     except csv.Error, e:
       
   517       if e.args and e.args[0].startswith('field larger than field limit'):
       
   518         limit = e.args[1]
       
   519         raise FieldSizeLimitError(limit)
       
   520       else:
       
   521         raise
       
   522 
       
   523 
       
   524 class KeyRangeItemGenerator(object):
   475   """Generates ranges of keys to download.
   525   """Generates ranges of keys to download.
   476 
   526 
   477   Reads progress information from the progress database and creates
   527   Reads progress information from the progress database and creates
   478   KeyRange objects corresponding to incompletely downloaded parts of an
   528   KeyRangeItem objects corresponding to incompletely downloaded parts of an
   479   export.
   529   export.
   480   """
   530   """
   481 
   531 
   482   def __init__(self, kind, progress_queue, progress_generator):
   532   def __init__(self, request_manager, kind, progress_queue, progress_generator,
   483     """Initialize the KeyRangeGenerator.
   533                key_range_item_factory):
   484 
   534     """Initialize the KeyRangeItemGenerator.
   485     Args:
   535 
       
   536     Args:
       
   537       request_manager: A RequestManager instance.
   486       kind: The kind of entities being transferred.
   538       kind: The kind of entities being transferred.
   487       progress_queue: A queue used for tracking progress information.
   539       progress_queue: A queue used for tracking progress information.
   488       progress_generator: A generator of prior progress information, or None
   540       progress_generator: A generator of prior progress information, or None
   489         if there is no prior status.
   541         if there is no prior status.
   490     """
   542       key_range_item_factory: A factory to produce KeyRangeItems.
       
   543     """
       
   544     self.request_manager = request_manager
   491     self.kind = kind
   545     self.kind = kind
   492     self.row_count = 0
   546     self.row_count = 0
   493     self.xfer_count = 0
   547     self.xfer_count = 0
   494     self.progress_queue = progress_queue
   548     self.progress_queue = progress_queue
   495     self.progress_generator = progress_generator
   549     self.progress_generator = progress_generator
       
   550     self.key_range_item_factory = key_range_item_factory
   496 
   551 
   497   def Batches(self):
   552   def Batches(self):
   498     """Iterate through saved progress information.
   553     """Iterate through saved progress information.
   499 
   554 
   500     Yields:
   555     Yields:
   501       KeyRange instances corresponding to undownloaded key ranges.
   556       KeyRangeItem instances corresponding to undownloaded key ranges.
   502     """
   557     """
   503     if self.progress_generator is not None:
   558     if self.progress_generator is not None:
   504       for progress_key, state, key_start, key_end in self.progress_generator:
   559       for progress_key, state, key_start, key_end in self.progress_generator:
   505         if state is not None and state != STATE_GOT and key_start is not None:
   560         if state is not None and state != STATE_GOT and key_start is not None:
   506           key_start = ParseKey(key_start)
   561           key_start = ParseKey(key_start)
   507           key_end = ParseKey(key_end)
   562           key_end = ParseKey(key_end)
   508 
   563 
   509           result = KeyRange(self.progress_queue,
   564           key_range = KeyRange(key_start=key_start,
   510                             self.kind,
   565                                key_end=key_end)
   511                             key_start=key_start,
   566 
   512                             key_end=key_end,
   567           result = self.key_range_item_factory(self.request_manager,
   513                             progress_key=progress_key,
   568                                                self.progress_queue,
   514                             direction=KeyRange.ASC,
   569                                                self.kind,
   515                             state=STATE_READ)
   570                                                key_range,
       
   571                                                progress_key=progress_key,
       
   572                                                state=STATE_READ)
   516           yield result
   573           yield result
   517     else:
   574     else:
   518 
   575       key_range = KeyRange()
   519       yield KeyRange(
   576 
   520           self.progress_queue, self.kind,
   577       yield self.key_range_item_factory(self.request_manager,
   521           key_start=None,
   578                                         self.progress_queue,
   522           key_end=None,
   579                                         self.kind,
   523           direction=KeyRange.DESC)
   580                                         key_range)
   524 
   581 
   525 
   582 
   526 class ReQueue(object):
   583 class DownloadResult(object):
   527   """A special thread-safe queue.
   584   """Holds the result of an entity download."""
   528 
       
   529   A ReQueue allows unfinished work items to be returned with a call to
       
   530   reput().  When an item is reput, task_done() should *not* be called
       
   531   in addition, getting an item that has been reput does not increase
       
   532   the number of outstanding tasks.
       
   533 
       
   534   This class shares an interface with Queue.Queue and provides the
       
   535   additional reput method.
       
   536   """
       
   537 
       
   538   def __init__(self,
       
   539                queue_capacity,
       
   540                requeue_capacity=None,
       
   541                queue_factory=Queue.Queue,
       
   542                get_time=time.time):
       
   543     """Initialize a ReQueue instance.
       
   544 
       
   545     Args:
       
   546       queue_capacity: The number of items that can be put in the ReQueue.
       
   547       requeue_capacity: The numer of items that can be reput in the ReQueue.
       
   548       queue_factory: Used for dependency injection.
       
   549       get_time: Used for dependency injection.
       
   550     """
       
   551     if requeue_capacity is None:
       
   552       requeue_capacity = queue_capacity
       
   553 
       
   554     self.get_time = get_time
       
   555     self.queue = queue_factory(queue_capacity)
       
   556     self.requeue = queue_factory(requeue_capacity)
       
   557     self.lock = threading.Lock()
       
   558     self.put_cond = threading.Condition(self.lock)
       
   559     self.get_cond = threading.Condition(self.lock)
       
   560 
       
   561   def _DoWithTimeout(self,
       
   562                      action,
       
   563                      exc,
       
   564                      wait_cond,
       
   565                      done_cond,
       
   566                      lock,
       
   567                      timeout=None,
       
   568                      block=True):
       
   569     """Performs the given action with a timeout.
       
   570 
       
   571     The action must be non-blocking, and raise an instance of exc on a
       
   572     recoverable failure.  If the action fails with an instance of exc,
       
   573     we wait on wait_cond before trying again.  Failure after the
       
   574     timeout is reached is propagated as an exception.  Success is
       
   575     signalled by notifying on done_cond and returning the result of
       
   576     the action.  If action raises any exception besides an instance of
       
   577     exc, it is immediately propagated.
       
   578 
       
   579     Args:
       
   580       action: A callable that performs a non-blocking action.
       
   581       exc: An exception type that is thrown by the action to indicate
       
   582         a recoverable error.
       
   583       wait_cond: A condition variable which should be waited on when
       
   584         action throws exc.
       
   585       done_cond: A condition variable to signal if the action returns.
       
   586       lock: The lock used by wait_cond and done_cond.
       
   587       timeout: A non-negative float indicating the maximum time to wait.
       
   588       block: Whether to block if the action cannot complete immediately.
       
   589 
       
   590     Returns:
       
   591       The result of the action, if it is successful.
       
   592 
       
   593     Raises:
       
   594       ValueError: If the timeout argument is negative.
       
   595     """
       
   596     if timeout is not None and timeout < 0.0:
       
   597       raise ValueError('\'timeout\' must not be a negative  number')
       
   598     if not block:
       
   599       timeout = 0.0
       
   600     result = None
       
   601     success = False
       
   602     start_time = self.get_time()
       
   603     lock.acquire()
       
   604     try:
       
   605       while not success:
       
   606         try:
       
   607           result = action()
       
   608           success = True
       
   609         except Exception, e:
       
   610           if not isinstance(e, exc):
       
   611             raise e
       
   612           if timeout is not None:
       
   613             elapsed_time = self.get_time() - start_time
       
   614             timeout -= elapsed_time
       
   615             if timeout <= 0.0:
       
   616               raise e
       
   617           wait_cond.wait(timeout)
       
   618     finally:
       
   619       if success:
       
   620         done_cond.notify()
       
   621       lock.release()
       
   622     return result
       
   623 
       
   624   def put(self, item, block=True, timeout=None):
       
   625     """Put an item into the requeue.
       
   626 
       
   627     Args:
       
   628       item: An item to add to the requeue.
       
   629       block: Whether to block if the requeue is full.
       
   630       timeout: Maximum on how long to wait until the queue is non-full.
       
   631 
       
   632     Raises:
       
   633       Queue.Full if the queue is full and the timeout expires.
       
   634     """
       
   635     def PutAction():
       
   636       self.queue.put(item, block=False)
       
   637     self._DoWithTimeout(PutAction,
       
   638                         Queue.Full,
       
   639                         self.get_cond,
       
   640                         self.put_cond,
       
   641                         self.lock,
       
   642                         timeout=timeout,
       
   643                         block=block)
       
   644 
       
   645   def reput(self, item, block=True, timeout=None):
       
   646     """Re-put an item back into the requeue.
       
   647 
       
   648     Re-putting an item does not increase the number of outstanding
       
   649     tasks, so the reput item should be uniquely associated with an
       
   650     item that was previously removed from the requeue and for which
       
   651     TaskDone has not been called.
       
   652 
       
   653     Args:
       
   654       item: An item to add to the requeue.
       
   655       block: Whether to block if the requeue is full.
       
   656       timeout: Maximum on how long to wait until the queue is non-full.
       
   657 
       
   658     Raises:
       
   659       Queue.Full is the queue is full and the timeout expires.
       
   660     """
       
   661     def ReputAction():
       
   662       self.requeue.put(item, block=False)
       
   663     self._DoWithTimeout(ReputAction,
       
   664                         Queue.Full,
       
   665                         self.get_cond,
       
   666                         self.put_cond,
       
   667                         self.lock,
       
   668                         timeout=timeout,
       
   669                         block=block)
       
   670 
       
   671   def get(self, block=True, timeout=None):
       
   672     """Get an item from the requeue.
       
   673 
       
   674     Args:
       
   675       block: Whether to block if the requeue is empty.
       
   676       timeout: Maximum on how long to wait until the requeue is non-empty.
       
   677 
       
   678     Returns:
       
   679       An item from the requeue.
       
   680 
       
   681     Raises:
       
   682       Queue.Empty if the queue is empty and the timeout expires.
       
   683     """
       
   684     def GetAction():
       
   685       try:
       
   686         result = self.requeue.get(block=False)
       
   687         self.requeue.task_done()
       
   688       except Queue.Empty:
       
   689         result = self.queue.get(block=False)
       
   690       return result
       
   691     return self._DoWithTimeout(GetAction,
       
   692                                Queue.Empty,
       
   693                                self.put_cond,
       
   694                                self.get_cond,
       
   695                                self.lock,
       
   696                                timeout=timeout,
       
   697                                block=block)
       
   698 
       
   699   def join(self):
       
   700     """Blocks until all of the items in the requeue have been processed."""
       
   701     self.queue.join()
       
   702 
       
   703   def task_done(self):
       
   704     """Indicate that a previously enqueued item has been fully processed."""
       
   705     self.queue.task_done()
       
   706 
       
   707   def empty(self):
       
   708     """Returns true if the requeue is empty."""
       
   709     return self.queue.empty() and self.requeue.empty()
       
   710 
       
   711   def get_nowait(self):
       
   712     """Try to get an item from the queue without blocking."""
       
   713     return self.get(block=False)
       
   714 
       
   715   def qsize(self):
       
   716     return self.queue.qsize() + self.requeue.qsize()
       
   717 
       
   718 
       
   719 class ThrottleHandler(urllib2.BaseHandler):
       
   720   """A urllib2 handler for http and https requests that adds to a throttle."""
       
   721 
       
   722   def __init__(self, throttle):
       
   723     """Initialize a ThrottleHandler.
       
   724 
       
   725     Args:
       
   726       throttle: A Throttle instance to call for bandwidth and http/https request
       
   727         throttling.
       
   728     """
       
   729     self.throttle = throttle
       
   730 
       
   731   def AddRequest(self, throttle_name, req):
       
   732     """Add to bandwidth throttle for given request.
       
   733 
       
   734     Args:
       
   735       throttle_name: The name of the bandwidth throttle to add to.
       
   736       req: The request whose size will be added to the throttle.
       
   737     """
       
   738     size = 0
       
   739     for key, value in req.headers.iteritems():
       
   740       size += len('%s: %s\n' % (key, value))
       
   741     for key, value in req.unredirected_hdrs.iteritems():
       
   742       size += len('%s: %s\n' % (key, value))
       
   743     (unused_scheme,
       
   744      unused_host_port, url_path,
       
   745      unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
       
   746     size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
       
   747     data = req.get_data()
       
   748     if data:
       
   749       size += len(data)
       
   750     self.throttle.AddTransfer(throttle_name, size)
       
   751 
       
   752   def AddResponse(self, throttle_name, res):
       
   753     """Add to bandwidth throttle for given response.
       
   754 
       
   755     Args:
       
   756       throttle_name: The name of the bandwidth throttle to add to.
       
   757       res: The response whose size will be added to the throttle.
       
   758     """
       
   759     content = res.read()
       
   760     def ReturnContent():
       
   761       return content
       
   762     res.read = ReturnContent
       
   763     size = len(content)
       
   764     headers = res.info()
       
   765     for key, value in headers.items():
       
   766       size += len('%s: %s\n' % (key, value))
       
   767     self.throttle.AddTransfer(throttle_name, size)
       
   768 
       
   769   def http_request(self, req):
       
   770     """Process an HTTP request.
       
   771 
       
   772     If the throttle is over quota, sleep first.  Then add request size to
       
   773     throttle before returning it to be sent.
       
   774 
       
   775     Args:
       
   776       req: A urllib2.Request object.
       
   777 
       
   778     Returns:
       
   779       The request passed in.
       
   780     """
       
   781     self.throttle.Sleep()
       
   782     self.AddRequest(BANDWIDTH_UP, req)
       
   783     return req
       
   784 
       
   785   def https_request(self, req):
       
   786     """Process an HTTPS request.
       
   787 
       
   788     If the throttle is over quota, sleep first.  Then add request size to
       
   789     throttle before returning it to be sent.
       
   790 
       
   791     Args:
       
   792       req: A urllib2.Request object.
       
   793 
       
   794     Returns:
       
   795       The request passed in.
       
   796     """
       
   797     self.throttle.Sleep()
       
   798     self.AddRequest(HTTPS_BANDWIDTH_UP, req)
       
   799     return req
       
   800 
       
   801   def http_response(self, unused_req, res):
       
   802     """Process an HTTP response.
       
   803 
       
   804     The size of the response is added to the bandwidth throttle and the request
       
   805     throttle is incremented by one.
       
   806 
       
   807     Args:
       
   808       unused_req: The urllib2 request for this response.
       
   809       res: A urllib2 response object.
       
   810 
       
   811     Returns:
       
   812       The response passed in.
       
   813     """
       
   814     self.AddResponse(BANDWIDTH_DOWN, res)
       
   815     self.throttle.AddTransfer(REQUESTS, 1)
       
   816     return res
       
   817 
       
   818   def https_response(self, unused_req, res):
       
   819     """Process an HTTPS response.
       
   820 
       
   821     The size of the response is added to the bandwidth throttle and the request
       
   822     throttle is incremented by one.
       
   823 
       
   824     Args:
       
   825       unused_req: The urllib2 request for this response.
       
   826       res: A urllib2 response object.
       
   827 
       
   828     Returns:
       
   829       The response passed in.
       
   830     """
       
   831     self.AddResponse(HTTPS_BANDWIDTH_DOWN, res)
       
   832     self.throttle.AddTransfer(HTTPS_REQUESTS, 1)
       
   833     return res
       
   834 
       
   835 
       
   836 class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer):
       
   837   """Provides a simplified RPC-style interface for HTTP requests.
       
   838 
       
   839   This RPC server uses a Throttle to prevent exceeding quotas.
       
   840   """
       
   841 
       
   842   def __init__(self, throttle, request_manager, *args, **kwargs):
       
   843     """Initialize a ThrottledHttpRpcServer.
       
   844 
       
   845     Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance.
       
   846 
       
   847     Args:
       
   848       throttle: A Throttles instance.
       
   849       request_manager: A RequestManager instance.
       
   850       args: Positional arguments to pass through to
       
   851         appengine_rpc.HttpRpcServer.__init__
       
   852       kwargs: Keyword arguments to pass through to
       
   853         appengine_rpc.HttpRpcServer.__init__
       
   854     """
       
   855     self.throttle = throttle
       
   856     appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs)
       
   857     request_manager.rpc_server = self
       
   858 
       
   859   def _GetOpener(self):
       
   860     """Returns an OpenerDirector that supports cookies and ignores redirects.
       
   861 
       
   862     Returns:
       
   863       A urllib2.OpenerDirector object.
       
   864     """
       
   865     opener = appengine_rpc.HttpRpcServer._GetOpener(self)
       
   866     opener.add_handler(ThrottleHandler(self.throttle))
       
   867 
       
   868     return opener
       
   869 
       
   870 
       
   871 def ThrottledHttpRpcServerFactory(throttle, request_manager):
       
   872   """Create a factory to produce ThrottledHttpRpcServer for a given throttle.
       
   873 
       
   874   Args:
       
   875     throttle: A Throttle instance to use for the ThrottledHttpRpcServer.
       
   876     request_manager: A RequestManager instance.
       
   877 
       
   878   Returns:
       
   879     A factory to produce a ThrottledHttpRpcServer.
       
   880   """
       
   881 
       
   882   def MakeRpcServer(*args, **kwargs):
       
   883     """Factory to produce a ThrottledHttpRpcServer.
       
   884 
       
   885     Args:
       
   886       args: Positional args to pass to ThrottledHttpRpcServer.
       
   887       kwargs: Keyword args to pass to ThrottledHttpRpcServer.
       
   888 
       
   889     Returns:
       
   890       A ThrottledHttpRpcServer instance.
       
   891     """
       
   892     kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
       
   893     kwargs['save_cookies'] = True
       
   894     return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs)
       
   895   return MakeRpcServer
       
   896 
       
   897 
       
   898 class ExportResult(object):
       
   899   """Holds the decoded content for the result of an export requests."""
       
   900 
   585 
   901   def __init__(self, continued, direction, keys, entities):
   586   def __init__(self, continued, direction, keys, entities):
   902     self.continued = continued
   587     self.continued = continued
   903     self.direction = direction
   588     self.direction = direction
   904     self.keys = keys
   589     self.keys = keys
   905     self.entities = entities
   590     self.entities = entities
   906     self.count = len(keys)
   591     self.count = len(keys)
   907     assert self.count == len(entities)
   592     assert self.count == len(entities)
   908     assert direction in (KeyRange.ASC, KeyRange.DESC)
   593     assert direction in (key_range_module.KeyRange.ASC,
       
   594                          key_range_module.KeyRange.DESC)
   909     if self.count > 0:
   595     if self.count > 0:
   910       if direction == KeyRange.ASC:
   596       if direction == key_range_module.KeyRange.ASC:
   911         self.key_start = keys[0]
   597         self.key_start = keys[0]
   912         self.key_end = keys[-1]
   598         self.key_end = keys[-1]
   913       else:
   599       else:
   914         self.key_start = keys[-1]
   600         self.key_start = keys[-1]
   915         self.key_end = keys[0]
   601         self.key_end = keys[0]
   916 
   602 
       
   603   def Entities(self):
       
   604     """Returns the list of entities for this result in key order."""
       
   605     if self.direction == key_range_module.KeyRange.ASC:
       
   606       return list(self.entities)
       
   607     else:
       
   608       result = list(self.entities)
       
   609       result.reverse()
       
   610       return result
       
   611 
   917   def __str__(self):
   612   def __str__(self):
   918     return 'continued = %s\n%s' % (
   613     return 'continued = %s\n%s' % (
   919         str(self.continued), '\n'.join(self.entities))
   614         str(self.continued), '\n'.join(str(self.entities)))
   920 
   615 
   921 
   616 
   922 class _WorkItem(object):
   617 class _WorkItem(adaptive_thread_pool.WorkItem):
   923   """Holds a description of a unit of upload or download work."""
   618   """Holds a description of a unit of upload or download work."""
   924 
   619 
   925   def __init__(self, progress_queue, key_start, key_end, state_namer,
   620   def __init__(self, progress_queue, key_start, key_end, state_namer,
   926                state=STATE_READ, progress_key=None):
   621                state=STATE_READ, progress_key=None):
   927     """Initialize the _WorkItem instance.
   622     """Initialize the _WorkItem instance.
   928 
   623 
   929     Args:
   624     Args:
   930       progress_queue: A queue used for tracking progress information.
   625       progress_queue: A queue used for tracking progress information.
   931       key_start: The starting key, inclusive.
   626       key_start: The start key of the work item.
   932       key_end: The ending key, inclusive.
   627       key_end: The end key of the work item.
   933       state_namer: Function to describe work item states.
   628       state_namer: Function to describe work item states.
   934       state: The initial state of the work item.
   629       state: The initial state of the work item.
   935       progress_key: If this WorkItem represents state from a prior run,
   630       progress_key: If this WorkItem represents state from a prior run,
   936         then this will be the key within the progress database.
   631         then this will be the key within the progress database.
   937     """
   632     """
       
   633     adaptive_thread_pool.WorkItem.__init__(self,
       
   634                                            '[%s-%s]' % (key_start, key_end))
   938     self.progress_queue = progress_queue
   635     self.progress_queue = progress_queue
   939     self.key_start = key_start
       
   940     self.key_end = key_end
       
   941     self.state_namer = state_namer
   636     self.state_namer = state_namer
   942     self.state = state
   637     self.state = state
   943     self.progress_key = progress_key
   638     self.progress_key = progress_key
   944     self.progress_event = threading.Event()
   639     self.progress_event = threading.Event()
       
   640     self.key_start = key_start
       
   641     self.key_end = key_end
       
   642     self.error = None
       
   643     self.traceback = None
       
   644 
       
   645   def _TransferItem(self, thread_pool):
       
   646     raise NotImplementedError()
       
   647 
       
   648   def SetError(self):
       
   649     """Sets the error and traceback information for this thread.
       
   650 
       
   651     This must be called from an exception handler.
       
   652     """
       
   653     if not self.error:
       
   654       exc_info = sys.exc_info()
       
   655       self.error = exc_info[1]
       
   656       self.traceback = exc_info[2]
       
   657 
       
   658   def PerformWork(self, thread_pool):
       
   659     """Perform the work of this work item and report the results.
       
   660 
       
   661     Args:
       
   662       thread_pool: An AdaptiveThreadPool instance.
       
   663 
       
   664     Returns:
       
   665       A tuple (status, instruction) of the work status and an instruction
       
   666       for the ThreadGate.
       
   667     """
       
   668     status = adaptive_thread_pool.WorkItem.FAILURE
       
   669     instruction = adaptive_thread_pool.ThreadGate.DECREASE
       
   670 
       
   671     try:
       
   672       self.MarkAsTransferring()
       
   673 
       
   674       try:
       
   675         transfer_time = self._TransferItem(thread_pool)
       
   676         if transfer_time is None:
       
   677           status = adaptive_thread_pool.WorkItem.RETRY
       
   678           instruction = adaptive_thread_pool.ThreadGate.HOLD
       
   679         else:
       
   680           logger.debug('[%s] %s Transferred %d entities in %0.1f seconds',
       
   681                        threading.currentThread().getName(), self, self.count,
       
   682                        transfer_time)
       
   683           sys.stdout.write('.')
       
   684           sys.stdout.flush()
       
   685           status = adaptive_thread_pool.WorkItem.SUCCESS
       
   686           if transfer_time <= MAXIMUM_INCREASE_DURATION:
       
   687             instruction = adaptive_thread_pool.ThreadGate.INCREASE
       
   688           elif transfer_time <= MAXIMUM_HOLD_DURATION:
       
   689             instruction = adaptive_thread_pool.ThreadGate.HOLD
       
   690       except (db.InternalError, db.NotSavedError, db.Timeout,
       
   691               db.TransactionFailedError,
       
   692               apiproxy_errors.OverQuotaError,
       
   693               apiproxy_errors.DeadlineExceededError,
       
   694               apiproxy_errors.ApplicationError), e:
       
   695         status = adaptive_thread_pool.WorkItem.RETRY
       
   696         logger.exception('Retrying on non-fatal datastore error: %s', e)
       
   697       except urllib2.HTTPError, e:
       
   698         http_status = e.code
       
   699         if http_status == 403 or (http_status >= 500 and http_status < 600):
       
   700           status = adaptive_thread_pool.WorkItem.RETRY
       
   701           logger.exception('Retrying on non-fatal HTTP error: %d %s',
       
   702                            http_status, e.msg)
       
   703         else:
       
   704           self.SetError()
       
   705           status = adaptive_thread_pool.WorkItem.FAILURE
       
   706       except urllib2.URLError, e:
       
   707         if IsURLErrorFatal(e):
       
   708           self.SetError()
       
   709           status = adaptive_thread_pool.WorkItem.FAILURE
       
   710         else:
       
   711           status = adaptive_thread_pool.WorkItem.RETRY
       
   712           logger.exception('Retrying on non-fatal URL error: %s', e.reason)
       
   713 
       
   714     finally:
       
   715       if status == adaptive_thread_pool.WorkItem.SUCCESS:
       
   716         self.MarkAsTransferred()
       
   717       else:
       
   718         self.MarkAsError()
       
   719 
       
   720     return (status, instruction)
   945 
   721 
   946   def _AssertInState(self, *states):
   722   def _AssertInState(self, *states):
   947     """Raises an Error if the state of this range is not in states."""
   723     """Raises an Error if the state of this range is not in states."""
   948     if not self.state in states:
   724     if not self.state in states:
   949       raise BadStateError('%s:%s not in %s' %
   725       raise BadStateError('%s:%s not in %s' %
   961     self._AssertInState(STATE_READ)
   737     self._AssertInState(STATE_READ)
   962     self._StateTransition(STATE_READ, blocking=True)
   738     self._StateTransition(STATE_READ, blocking=True)
   963 
   739 
   964   def MarkAsTransferring(self):
   740   def MarkAsTransferring(self):
   965     """Mark this _WorkItem as transferring, updating the progress database."""
   741     """Mark this _WorkItem as transferring, updating the progress database."""
   966     self._AssertInState(STATE_READ, STATE_NOT_GOT)
   742     self._AssertInState(STATE_READ, STATE_ERROR)
   967     self._AssertProgressKey()
   743     self._AssertProgressKey()
   968     self._StateTransition(STATE_GETTING, blocking=True)
   744     self._StateTransition(STATE_GETTING, blocking=True)
   969 
   745 
   970   def MarkAsTransferred(self):
   746   def MarkAsTransferred(self):
   971     """Mark this _WorkItem as transferred, updating the progress database."""
   747     """Mark this _WorkItem as transferred, updating the progress database."""
   973 
   749 
   974   def MarkAsError(self):
   750   def MarkAsError(self):
   975     """Mark this _WorkItem as failed, updating the progress database."""
   751     """Mark this _WorkItem as failed, updating the progress database."""
   976     self._AssertInState(STATE_GETTING)
   752     self._AssertInState(STATE_GETTING)
   977     self._AssertProgressKey()
   753     self._AssertProgressKey()
   978     self._StateTransition(STATE_NOT_GOT, blocking=True)
   754     self._StateTransition(STATE_ERROR, blocking=True)
   979 
   755 
   980   def _StateTransition(self, new_state, blocking=False):
   756   def _StateTransition(self, new_state, blocking=False):
   981     """Transition the work item to a new state, storing progress information.
   757     """Transition the work item to a new state, storing progress information.
   982 
   758 
   983     Args:
   759     Args:
   996 
   772 
   997       self.progress_event.clear()
   773       self.progress_event.clear()
   998 
   774 
   999 
   775 
  1000 
   776 
  1001 class WorkItem(_WorkItem):
   777 class UploadWorkItem(_WorkItem):
  1002   """Holds a unit of uploading work.
   778   """Holds a unit of uploading work.
  1003 
   779 
  1004   A WorkItem represents a number of entities that need to be uploaded to
   780   A UploadWorkItem represents a number of entities that need to be uploaded to
  1005   Google App Engine. These entities are encoded in the "content" field of
   781   Google App Engine. These entities are encoded in the "content" field of
  1006   the WorkItem, and will be POST'd as-is to the server.
   782   the UploadWorkItem, and will be POST'd as-is to the server.
  1007 
   783 
  1008   The entities are identified by a range of numeric keys, inclusively. In
   784   The entities are identified by a range of numeric keys, inclusively. In
  1009   the case of a resumption of an upload, or a replay to correct errors,
   785   the case of a resumption of an upload, or a replay to correct errors,
  1010   these keys must be able to identify the same set of entities.
   786   these keys must be able to identify the same set of entities.
  1011 
   787 
  1012   Note that keys specify a range. The entities do not have to sequentially
   788   Note that keys specify a range. The entities do not have to sequentially
  1013   fill the entire range, they must simply bound a range of valid keys.
   789   fill the entire range, they must simply bound a range of valid keys.
  1014   """
   790   """
  1015 
   791 
  1016   def __init__(self, progress_queue, rows, key_start, key_end,
   792   def __init__(self, request_manager, progress_queue, rows, key_start, key_end,
  1017                progress_key=None):
   793                progress_key=None):
  1018     """Initialize the WorkItem instance.
   794     """Initialize the UploadWorkItem instance.
  1019 
   795 
  1020     Args:
   796     Args:
       
   797       request_manager: A RequestManager instance.
  1021       progress_queue: A queue used for tracking progress information.
   798       progress_queue: A queue used for tracking progress information.
  1022       rows: A list of pairs of a line number and a list of column values
   799       rows: A list of pairs of a line number and a list of column values
  1023       key_start: The (numeric) starting key, inclusive.
   800       key_start: The (numeric) starting key, inclusive.
  1024       key_end: The (numeric) ending key, inclusive.
   801       key_end: The (numeric) ending key, inclusive.
  1025       progress_key: If this WorkItem represents state from a prior run,
   802       progress_key: If this UploadWorkItem represents state from a prior run,
  1026         then this will be the key within the progress database.
   803         then this will be the key within the progress database.
  1027     """
   804     """
  1028     _WorkItem.__init__(self, progress_queue, key_start, key_end,
   805     _WorkItem.__init__(self, progress_queue, key_start, key_end,
  1029                        ImportStateName, state=STATE_READ,
   806                        ImportStateName, state=STATE_READ,
  1030                        progress_key=progress_key)
   807                        progress_key=progress_key)
  1031 
   808 
  1032     assert isinstance(key_start, (int, long))
   809     assert isinstance(key_start, (int, long))
  1033     assert isinstance(key_end, (int, long))
   810     assert isinstance(key_end, (int, long))
  1034     assert key_start <= key_end
   811     assert key_start <= key_end
  1035 
   812 
       
   813     self.request_manager = request_manager
  1036     self.rows = rows
   814     self.rows = rows
  1037     self.content = None
   815     self.content = None
  1038     self.count = len(rows)
   816     self.count = len(rows)
  1039 
   817 
  1040   def __str__(self):
   818   def __str__(self):
  1041     return '[%s-%s]' % (self.key_start, self.key_end)
   819     return '[%s-%s]' % (self.key_start, self.key_end)
  1042 
   820 
       
   821   def _TransferItem(self, thread_pool, get_time=time.time):
       
   822     """Transfers the entities associated with an item.
       
   823 
       
   824     Args:
       
   825       thread_pool: An AdaptiveThreadPool instance.
       
   826       get_time: Used for dependency injection.
       
   827     """
       
   828     t = get_time()
       
   829     if not self.content:
       
   830       self.content = self.request_manager.EncodeContent(self.rows)
       
   831     try:
       
   832       self.request_manager.PostEntities(self.content)
       
   833     except:
       
   834       raise
       
   835     return get_time() - t
       
   836 
  1043   def MarkAsTransferred(self):
   837   def MarkAsTransferred(self):
  1044     """Mark this WorkItem as sucessfully-sent to the server."""
   838     """Mark this UploadWorkItem as sucessfully-sent to the server."""
  1045 
   839 
  1046     self._AssertInState(STATE_SENDING)
   840     self._AssertInState(STATE_SENDING)
  1047     self._AssertProgressKey()
   841     self._AssertProgressKey()
  1048 
   842 
  1049     self._StateTransition(STATE_SENT, blocking=False)
   843     self._StateTransition(STATE_SENT, blocking=False)
  1066                          kind_or_class_key)
   860                          kind_or_class_key)
  1067   else:
   861   else:
  1068     implementation_class = db.class_for_kind(kind_or_class_key)
   862     implementation_class = db.class_for_kind(kind_or_class_key)
  1069   return implementation_class
   863   return implementation_class
  1070 
   864 
  1071 class EmptyQuery(db.Query):
       
  1072   def get(self):
       
  1073     return None
       
  1074 
       
  1075   def fetch(self, limit=1000, offset=0):
       
  1076     return []
       
  1077 
       
  1078   def count(self, limit=1000):
       
  1079     return 0
       
  1080 
       
  1081 
   865 
  1082 def KeyLEQ(key1, key2):
   866 def KeyLEQ(key1, key2):
  1083   """Compare two keys for less-than-or-equal-to.
   867   """Compare two keys for less-than-or-equal-to.
  1084 
   868 
  1085   All keys with numeric ids come before all keys with names.
   869   All keys with numeric ids come before all keys with names. None represents
       
   870   an unbounded end-point so it is both greater and less than any other key.
  1086 
   871 
  1087   Args:
   872   Args:
  1088     key1: An int or db.Key instance.
   873     key1: An int or datastore.Key instance.
  1089     key2: An int or db.Key instance.
   874     key2: An int or datastore.Key instance.
  1090 
   875 
  1091   Returns:
   876   Returns:
  1092     True if key1 <= key2
   877     True if key1 <= key2
  1093   """
   878   """
  1094   if isinstance(key1, int) and isinstance(key2, int):
       
  1095     return key1 <= key2
       
  1096   if key1 is None or key2 is None:
   879   if key1 is None or key2 is None:
  1097     return True
   880     return True
  1098   if key1.id() and not key2.id():
   881   return key1 <= key2
  1099     return True
   882 
  1100   return key1.id_or_name() <= key2.id_or_name()
   883 
  1101 
   884 class KeyRangeItem(_WorkItem):
  1102 
   885   """Represents an item of work that scans over a key range.
  1103 class KeyRange(_WorkItem):
   886 
  1104   """Represents an item of download work.
   887   A KeyRangeItem object represents holds a KeyRange
  1105 
   888   and has an associated state: STATE_READ, STATE_GETTING, STATE_GOT,
  1106   A KeyRange object represents a key range (key_start, key_end) and a
   889   and STATE_ERROR.
  1107   scan direction (KeyRange.DESC or KeyRange.ASC).  The KeyRange object
       
  1108   has an associated state: STATE_READ, STATE_GETTING, STATE_GOT, and
       
  1109   STATE_ERROR.
       
  1110 
   890 
  1111   - STATE_READ indicates the range ready to be downloaded by a worker thread.
   891   - STATE_READ indicates the range ready to be downloaded by a worker thread.
  1112   - STATE_GETTING indicates the range is currently being downloaded.
   892   - STATE_GETTING indicates the range is currently being downloaded.
  1113   - STATE_GOT indicates that the range was successfully downloaded
   893   - STATE_GOT indicates that the range was successfully downloaded
  1114   - STATE_ERROR indicates that an error occurred during the last download
   894   - STATE_ERROR indicates that an error occurred during the last download
  1115     attempt
   895     attempt
  1116 
   896 
  1117   KeyRanges not in the STATE_GOT state are stored in the progress database.
   897   KeyRangeItems not in the STATE_GOT state are stored in the progress database.
  1118   When a piece of KeyRange work is downloaded, the download may cover only
   898   When a piece of KeyRangeItem work is downloaded, the download may cover only
  1119   a portion of the range.  In this case, the old KeyRange is removed from
   899   a portion of the range.  In this case, the old KeyRangeItem is removed from
  1120   the progress database and ranges covering the undownloaded range are
   900   the progress database and ranges covering the undownloaded range are
  1121   generated and stored as STATE_READ in the export progress database.
   901   generated and stored as STATE_READ in the export progress database.
  1122   """
   902   """
  1123 
   903 
  1124   DESC = 0
       
  1125   ASC = 1
       
  1126 
       
  1127   MAX_KEY_LEN = 500
       
  1128 
       
  1129   def __init__(self,
   904   def __init__(self,
       
   905                request_manager,
  1130                progress_queue,
   906                progress_queue,
  1131                kind,
   907                kind,
  1132                direction,
   908                key_range,
  1133                key_start=None,
       
  1134                key_end=None,
       
  1135                include_start=True,
       
  1136                include_end=True,
       
  1137                progress_key=None,
   909                progress_key=None,
  1138                state=STATE_READ):
   910                state=STATE_READ):
  1139     """Initialize a KeyRange object.
   911     """Initialize a KeyRangeItem object.
  1140 
   912 
  1141     Args:
   913     Args:
       
   914       request_manager: A RequestManager instance.
  1142       progress_queue: A queue used for tracking progress information.
   915       progress_queue: A queue used for tracking progress information.
  1143       kind: The kind of entities for this range.
   916       kind: The kind of entities for this range.
  1144       direction: The direction of the query for this range.
   917       key_range: A KeyRange instance for this work item.
  1145       key_start: The starting key for this range.
       
  1146       key_end: The ending key for this range.
       
  1147       include_start: Whether the start key should be included in the range.
       
  1148       include_end: Whether the end key should be included in the range.
       
  1149       progress_key: The key for this range within the progress database.
   918       progress_key: The key for this range within the progress database.
  1150       state: The initial state of this range.
   919       state: The initial state of this range.
  1151 
   920     """
  1152     Raises:
   921     _WorkItem.__init__(self, progress_queue, key_range.key_start,
  1153       KeyRangeError: if key_start is None.
   922                        key_range.key_end, ExportStateName, state=state,
  1154     """
   923                        progress_key=progress_key)
  1155     assert direction in (KeyRange.ASC, KeyRange.DESC)
   924     self.request_manager = request_manager
  1156     _WorkItem.__init__(self, progress_queue, key_start, key_end,
       
  1157                        ExportStateName, state=state, progress_key=progress_key)
       
  1158     self.kind = kind
   925     self.kind = kind
  1159     self.direction = direction
   926     self.key_range = key_range
  1160     self.export_result = None
   927     self.download_result = None
  1161     self.count = 0
   928     self.count = 0
  1162     self.include_start = include_start
   929     self.key_start = key_range.key_start
  1163     self.include_end = include_end
   930     self.key_end = key_range.key_end
  1164     self.SPLIT_KEY = db.Key.from_path(self.kind, unichr(0))
       
  1165 
   931 
  1166   def __str__(self):
   932   def __str__(self):
  1167     return '[%s-%s]' % (PrettyKey(self.key_start), PrettyKey(self.key_end))
   933     return str(self.key_range)
  1168 
   934 
  1169   def __repr__(self):
   935   def __repr__(self):
  1170     return self.__str__()
   936     return self.__str__()
  1171 
   937 
  1172   def MarkAsTransferred(self):
   938   def MarkAsTransferred(self):
  1173     """Mark this KeyRange as transferred, updating the progress database."""
   939     """Mark this KeyRangeItem as transferred, updating the progress database."""
  1174     pass
   940     pass
  1175 
   941 
  1176   def Process(self, export_result, num_threads, batch_size, work_queue):
   942   def Process(self, download_result, thread_pool, batch_size,
  1177     """Mark this KeyRange as success, updating the progress database.
   943               new_state=STATE_GOT):
  1178 
   944     """Mark this KeyRangeItem as success, updating the progress database.
  1179     Process will split this KeyRange based on the content of export_result and
   945 
  1180     adds the unfinished ranges to the work queue.
   946     Process will split this KeyRangeItem based on the content of
  1181 
   947     download_result and adds the unfinished ranges to the work queue.
  1182     Args:
   948 
  1183       export_result: An ExportResult instance.
   949     Args:
  1184       num_threads: The number of threads for parallel transfers.
   950       download_result: A DownloadResult instance.
       
   951       thread_pool: An AdaptiveThreadPool instance.
  1185       batch_size: The number of entities to transfer per request.
   952       batch_size: The number of entities to transfer per request.
  1186       work_queue: The work queue to add unfinished ranges to.
   953       new_state: The state to transition the completed range to.
  1187 
       
  1188     Returns:
       
  1189       A list of KeyRanges representing undownloaded datastore key ranges.
       
  1190     """
   954     """
  1191     self._AssertInState(STATE_GETTING)
   955     self._AssertInState(STATE_GETTING)
  1192     self._AssertProgressKey()
   956     self._AssertProgressKey()
  1193 
   957 
  1194     self.export_result = export_result
   958     self.download_result = download_result
  1195     self.count = len(export_result.keys)
   959     self.count = len(download_result.keys)
  1196     if export_result.continued:
   960     if download_result.continued:
  1197       self._FinishedRange()._StateTransition(STATE_GOT, blocking=True)
   961       self._FinishedRange()._StateTransition(new_state, blocking=True)
  1198       self._AddUnfinishedRanges(num_threads, batch_size, work_queue)
   962       self._AddUnfinishedRanges(thread_pool, batch_size)
  1199     else:
   963     else:
  1200       self._StateTransition(STATE_GOT, blocking=True)
   964       self._StateTransition(new_state, blocking=True)
  1201 
   965 
  1202   def _FinishedRange(self):
   966   def _FinishedRange(self):
  1203     """Returns the range completed by the export_result.
   967     """Returns the range completed by the download_result.
  1204 
   968 
  1205     Returns:
   969     Returns:
  1206       A KeyRange representing a completed range.
   970       A KeyRangeItem representing a completed range.
  1207     """
   971     """
  1208     assert self.export_result is not None
   972     assert self.download_result is not None
  1209 
   973 
  1210     if self.direction == KeyRange.ASC:
   974     if self.key_range.direction == key_range_module.KeyRange.ASC:
  1211       key_start = self.key_start
   975       key_start = self.key_range.key_start
  1212       if self.export_result.continued:
   976       if self.download_result.continued:
  1213         key_end = self.export_result.key_end
   977         key_end = self.download_result.key_end
  1214       else:
   978       else:
  1215         key_end = self.key_end
   979         key_end = self.key_range.key_end
  1216     else:
   980     else:
  1217       key_end = self.key_end
   981       key_end = self.key_range.key_end
  1218       if self.export_result.continued:
   982       if self.download_result.continued:
  1219         key_start = self.export_result.key_start
   983         key_start = self.download_result.key_start
  1220       else:
   984       else:
  1221         key_start = self.key_start
   985         key_start = self.key_range.key_start
  1222 
   986 
  1223     result = KeyRange(self.progress_queue,
   987     key_range = KeyRange(key_start=key_start,
  1224                       self.kind,
   988                          key_end=key_end,
  1225                       key_start=key_start,
   989                          direction=self.key_range.direction)
  1226                       key_end=key_end,
   990 
  1227                       direction=self.direction)
   991     result = self.__class__(self.request_manager,
  1228 
   992                             self.progress_queue,
  1229     result.progress_key = self.progress_key
   993                             self.kind,
  1230     result.export_result = self.export_result
   994                             key_range,
  1231     result.state = self.state
   995                             progress_key=self.progress_key,
       
   996                             state=self.state)
       
   997 
       
   998     result.download_result = self.download_result
  1232     result.count = self.count
   999     result.count = self.count
  1233     return result
  1000     return result
  1234 
  1001 
  1235   def FilterQuery(self, query):
  1002   def _SplitAndAddRanges(self, thread_pool, batch_size):
  1236     """Add query filter to restrict to this key range.
  1003     """Split the key range [key_start, key_end] into a list of ranges."""
  1237 
  1004     if self.download_result.direction == key_range_module.KeyRange.ASC:
  1238     Args:
  1005       key_range = KeyRange(
  1239       query: A db.Query instance.
  1006           key_start=self.download_result.key_end,
  1240     """
  1007           key_end=self.key_range.key_end,
  1241     if self.key_start == self.key_end and not (
  1008           include_start=False)
  1242         self.include_start or self.include_end):
       
  1243       return EmptyQuery()
       
  1244     if self.include_start:
       
  1245       start_comparator = '>='
       
  1246     else:
  1009     else:
  1247       start_comparator = '>'
  1010       key_range = KeyRange(
  1248     if self.include_end:
  1011           key_start=self.key_range.key_start,
  1249       end_comparator = '<='
  1012           key_end=self.download_result.key_start,
       
  1013           include_end=False)
       
  1014 
       
  1015     if thread_pool.QueuedItemCount() > 2 * thread_pool.num_threads():
       
  1016       ranges = [key_range]
  1250     else:
  1017     else:
  1251       end_comparator = '<'
  1018       ranges = key_range.split_range(batch_size=batch_size)
  1252     if self.key_start and self.key_end:
       
  1253       query.filter('__key__ %s' % start_comparator, self.key_start)
       
  1254       query.filter('__key__ %s' % end_comparator, self.key_end)
       
  1255     elif self.key_start:
       
  1256       query.filter('__key__ %s' % start_comparator, self.key_start)
       
  1257     elif self.key_end:
       
  1258       query.filter('__key__ %s' % end_comparator, self.key_end)
       
  1259 
       
  1260     return query
       
  1261 
       
  1262   def MakeParallelQuery(self):
       
  1263     """Construct a query for this key range, for parallel downloading.
       
  1264 
       
  1265     Returns:
       
  1266       A db.Query instance.
       
  1267 
       
  1268     Raises:
       
  1269       KeyRangeError: if self.direction is not one of
       
  1270         KeyRange.ASC, KeyRange.DESC
       
  1271     """
       
  1272     if self.direction == KeyRange.ASC:
       
  1273       direction = ''
       
  1274     elif self.direction == KeyRange.DESC:
       
  1275       direction = '-'
       
  1276     else:
       
  1277       raise KeyRangeError('KeyRange direction unexpected: %s', self.direction)
       
  1278     query = db.Query(GetImplementationClass(self.kind))
       
  1279     query.order('%s__key__' % direction)
       
  1280 
       
  1281     return self.FilterQuery(query)
       
  1282 
       
  1283   def MakeSerialQuery(self):
       
  1284     """Construct a query for this key range without descending __key__ scan.
       
  1285 
       
  1286     Returns:
       
  1287       A db.Query instance.
       
  1288     """
       
  1289     query = db.Query(GetImplementationClass(self.kind))
       
  1290     query.order('__key__')
       
  1291 
       
  1292     return self.FilterQuery(query)
       
  1293 
       
  1294   def _BisectStringRange(self, start, end):
       
  1295     if start == end:
       
  1296       return (start, start, end)
       
  1297     start += '\0'
       
  1298     end += '\0'
       
  1299     midpoint = []
       
  1300     expected_max = 127
       
  1301     for i in xrange(min(len(start), len(end))):
       
  1302       if start[i] == end[i]:
       
  1303         midpoint.append(start[i])
       
  1304       else:
       
  1305         ord_sum = ord(start[i]) + ord(end[i])
       
  1306         midpoint.append(unichr(ord_sum / 2))
       
  1307         if ord_sum % 2:
       
  1308           if len(start) > i + 1:
       
  1309             ord_start = ord(start[i+1])
       
  1310           else:
       
  1311             ord_start = 0
       
  1312           if ord_start < expected_max:
       
  1313             ord_split = (expected_max + ord_start) / 2
       
  1314           else:
       
  1315             ord_split = (0xFFFF + ord_start) / 2
       
  1316           midpoint.append(unichr(ord_split))
       
  1317         break
       
  1318     return (start[:-1], ''.join(midpoint), end[:-1])
       
  1319 
       
  1320   def SplitRange(self, key_start, include_start, key_end, include_end,
       
  1321                  export_result, num_threads, batch_size, work_queue):
       
  1322     """Split the key range [key_start, key_end] into a list of ranges."""
       
  1323     if export_result.direction == KeyRange.ASC:
       
  1324       key_start = export_result.key_end
       
  1325       include_start = False
       
  1326     else:
       
  1327       key_end = export_result.key_start
       
  1328       include_end = False
       
  1329     key_pairs = []
       
  1330     if not key_start:
       
  1331       key_pairs.append((key_start, include_start, key_end, include_end,
       
  1332                         KeyRange.ASC))
       
  1333     elif not key_end:
       
  1334       key_pairs.append((key_start, include_start, key_end, include_end,
       
  1335                         KeyRange.DESC))
       
  1336     elif work_queue.qsize() > 2 * num_threads:
       
  1337       key_pairs.append((key_start, include_start, key_end, include_end,
       
  1338                         KeyRange.ASC))
       
  1339     elif key_start.id() and key_end.id():
       
  1340       if key_end.id() - key_start.id() > batch_size:
       
  1341         key_half = db.Key.from_path(self.kind,
       
  1342                                     (key_start.id() + key_end.id()) / 2)
       
  1343         key_pairs.append((key_start, include_start,
       
  1344                           key_half, True,
       
  1345                           KeyRange.DESC))
       
  1346         key_pairs.append((key_half, False,
       
  1347                           key_end, include_end,
       
  1348                           KeyRange.ASC))
       
  1349       else:
       
  1350         key_pairs.append((key_start, include_start, key_end, include_end,
       
  1351                           KeyRange.ASC))
       
  1352     elif key_start.name() and key_end.name():
       
  1353       (start, middle, end) = self._BisectStringRange(key_start.name(),
       
  1354                                                      key_end.name())
       
  1355       key_pairs.append((key_start, include_start,
       
  1356                         db.Key.from_path(self.kind, middle), True,
       
  1357                         KeyRange.DESC))
       
  1358       key_pairs.append((db.Key.from_path(self.kind, middle), False,
       
  1359                         key_end, include_end,
       
  1360                         KeyRange.ASC))
       
  1361     else:
       
  1362       assert key_start.id() and key_end.name()
       
  1363       key_pairs.append((key_start, include_start,
       
  1364                         self.SPLIT_KEY, False,
       
  1365                         KeyRange.DESC))
       
  1366       key_pairs.append((self.SPLIT_KEY, True,
       
  1367                         key_end, include_end,
       
  1368                         KeyRange.ASC))
       
  1369 
       
  1370     ranges = [KeyRange(self.progress_queue,
       
  1371                        self.kind,
       
  1372                        key_start=start,
       
  1373                        include_start=include_start,
       
  1374                        key_end=end,
       
  1375                        include_end=include_end,
       
  1376                        direction=direction)
       
  1377               for (start, include_start, end, include_end, direction)
       
  1378               in key_pairs]
       
  1379 
  1019 
  1380     for key_range in ranges:
  1020     for key_range in ranges:
  1381       key_range.MarkAsRead()
  1021       key_range_item = self.__class__(self.request_manager,
  1382       work_queue.put(key_range, block=True)
  1022                                       self.progress_queue,
  1383 
  1023                                       self.kind,
  1384   def _AddUnfinishedRanges(self, num_threads, batch_size, work_queue):
  1024                                       key_range)
  1385     """Adds incomplete KeyRanges to the work_queue.
  1025       key_range_item.MarkAsRead()
  1386 
  1026       thread_pool.SubmitItem(key_range_item, block=True)
  1387     Args:
  1027 
  1388       num_threads: The number of threads for parallel transfers.
  1028   def _AddUnfinishedRanges(self, thread_pool, batch_size):
       
  1029     """Adds incomplete KeyRanges to the thread_pool.
       
  1030 
       
  1031     Args:
       
  1032       thread_pool: An AdaptiveThreadPool instance.
  1389       batch_size: The number of entities to transfer per request.
  1033       batch_size: The number of entities to transfer per request.
  1390       work_queue: The work queue to add unfinished ranges to.
       
  1391 
  1034 
  1392     Returns:
  1035     Returns:
  1393       A list of KeyRanges representing incomplete datastore key ranges.
  1036       A list of KeyRanges representing incomplete datastore key ranges.
  1394 
  1037 
  1395     Raises:
  1038     Raises:
  1396       KeyRangeError: if this key range has already been completely transferred.
  1039       KeyRangeError: if this key range has already been completely transferred.
  1397     """
  1040     """
  1398     assert self.export_result is not None
  1041     assert self.download_result is not None
  1399     if self.export_result.continued:
  1042     if self.download_result.continued:
  1400       self.SplitRange(self.key_start, self.include_start, self.key_end,
  1043       self._SplitAndAddRanges(thread_pool, batch_size)
  1401                       self.include_end, self.export_result,
       
  1402                       num_threads, batch_size, work_queue)
       
  1403     else:
  1044     else:
  1404       raise KeyRangeError('No unfinished part of key range.')
  1045       raise KeyRangeError('No unfinished part of key range.')
       
  1046 
       
  1047 
       
  1048 class DownloadItem(KeyRangeItem):
       
  1049   """A KeyRangeItem for downloading key ranges."""
       
  1050 
       
  1051   def _TransferItem(self, thread_pool, get_time=time.time):
       
  1052     """Transfers the entities associated with an item."""
       
  1053     t = get_time()
       
  1054     download_result = self.request_manager.GetEntities(self)
       
  1055     transfer_time = get_time() - t
       
  1056     self.Process(download_result, thread_pool,
       
  1057                  self.request_manager.batch_size)
       
  1058     return transfer_time
       
  1059 
       
  1060 
       
  1061 class MapperItem(KeyRangeItem):
       
  1062   """A KeyRangeItem for mapping over key ranges."""
       
  1063 
       
  1064   def _TransferItem(self, thread_pool, get_time=time.time):
       
  1065     t = get_time()
       
  1066     download_result = self.request_manager.GetEntities(self)
       
  1067     transfer_time = get_time() - t
       
  1068     mapper = self.request_manager.GetMapper()
       
  1069     try:
       
  1070       mapper.batch_apply(download_result.Entities())
       
  1071     except MapperRetry:
       
  1072       return None
       
  1073     self.Process(download_result, thread_pool,
       
  1074                  self.request_manager.batch_size)
       
  1075     return transfer_time
  1405 
  1076 
  1406 
  1077 
  1407 class RequestManager(object):
  1078 class RequestManager(object):
  1408   """A class which wraps a connection to the server."""
  1079   """A class which wraps a connection to the server."""
  1409 
  1080 
  1414                kind,
  1085                kind,
  1415                throttle,
  1086                throttle,
  1416                batch_size,
  1087                batch_size,
  1417                secure,
  1088                secure,
  1418                email,
  1089                email,
  1419                passin):
  1090                passin,
       
  1091                dry_run=False):
  1420     """Initialize a RequestManager object.
  1092     """Initialize a RequestManager object.
  1421 
  1093 
  1422     Args:
  1094     Args:
  1423       app_id: String containing the application id for requests.
  1095       app_id: String containing the application id for requests.
  1424       host_port: String containing the "host:port" pair; the port is optional.
  1096       host_port: String containing the "host:port" pair; the port is optional.
  1443     self.authenticated = False
  1115     self.authenticated = False
  1444     self.auth_called = False
  1116     self.auth_called = False
  1445     self.parallel_download = True
  1117     self.parallel_download = True
  1446     self.email = email
  1118     self.email = email
  1447     self.passin = passin
  1119     self.passin = passin
  1448     throttled_rpc_server_factory = ThrottledHttpRpcServerFactory(
  1120     self.mapper = None
  1449         self.throttle, self)
  1121     self.dry_run = dry_run
       
  1122 
       
  1123     if self.dry_run:
       
  1124       logger.info('Running in dry run mode, skipping remote_api setup')
       
  1125       return
       
  1126 
  1450     logger.debug('Configuring remote_api. url_path = %s, '
  1127     logger.debug('Configuring remote_api. url_path = %s, '
  1451                  'servername = %s' % (url_path, host_port))
  1128                  'servername = %s' % (url_path, host_port))
       
  1129 
       
  1130     def CookieHttpRpcServer(*args, **kwargs):
       
  1131       kwargs['save_cookies'] = True
       
  1132       kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
       
  1133       return appengine_rpc.HttpRpcServer(*args, **kwargs)
       
  1134 
  1452     remote_api_stub.ConfigureRemoteDatastore(
  1135     remote_api_stub.ConfigureRemoteDatastore(
  1453         app_id,
  1136         app_id,
  1454         url_path,
  1137         url_path,
  1455         self.AuthFunction,
  1138         self.AuthFunction,
  1456         servername=host_port,
  1139         servername=host_port,
  1457         rpc_server_factory=throttled_rpc_server_factory,
  1140         rpc_server_factory=CookieHttpRpcServer,
  1458         secure=self.secure)
  1141         secure=self.secure)
       
  1142     remote_api_throttle.ThrottleRemoteDatastore(self.throttle)
  1459     logger.debug('Bulkloader using app_id: %s', os.environ['APPLICATION_ID'])
  1143     logger.debug('Bulkloader using app_id: %s', os.environ['APPLICATION_ID'])
  1460 
  1144 
  1461   def Authenticate(self):
  1145   def Authenticate(self):
  1462     """Invoke authentication if necessary."""
  1146     """Invoke authentication if necessary."""
  1463     logger.info('Connecting to %s', self.url_path)
  1147     logger.info('Connecting to %s%s', self.host_port, self.url_path)
  1464     self.rpc_server.Send(self.url_path, payload=None)
  1148     if self.dry_run:
       
  1149       self.authenticated = True
       
  1150       return
       
  1151 
       
  1152     remote_api_stub.MaybeInvokeAuthentication()
  1465     self.authenticated = True
  1153     self.authenticated = True
  1466 
  1154 
  1467   def AuthFunction(self,
  1155   def AuthFunction(self,
  1468                    raw_input_fn=raw_input,
  1156                    raw_input_fn=raw_input,
  1469                    password_input_fn=getpass.getpass):
  1157                    password_input_fn=getpass.getpass):
  1504     Args:
  1192     Args:
  1505       rows: A list of pairs of a line number and a list of column values.
  1193       rows: A list of pairs of a line number and a list of column values.
  1506       loader: Used for dependency injection.
  1194       loader: Used for dependency injection.
  1507 
  1195 
  1508     Returns:
  1196     Returns:
  1509       A list of db.Model instances.
  1197       A list of datastore.Entity instances.
  1510 
  1198 
  1511     Raises:
  1199     Raises:
  1512       ConfigurationError: if no loader is defined for self.kind
  1200       ConfigurationError: if no loader is defined for self.kind
  1513     """
  1201     """
  1514     if not loader:
  1202     if not loader:
  1518         logger.error('No Loader defined for kind %s.' % self.kind)
  1206         logger.error('No Loader defined for kind %s.' % self.kind)
  1519         raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
  1207         raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
  1520     entities = []
  1208     entities = []
  1521     for line_number, values in rows:
  1209     for line_number, values in rows:
  1522       key = loader.generate_key(line_number, values)
  1210       key = loader.generate_key(line_number, values)
  1523       if isinstance(key, db.Key):
  1211       if isinstance(key, datastore.Key):
  1524         parent = key.parent()
  1212         parent = key.parent()
  1525         key = key.name()
  1213         key = key.name()
  1526       else:
  1214       else:
  1527         parent = None
  1215         parent = None
  1528       entity = loader.create_entity(values, key_name=key, parent=parent)
  1216       entity = loader.create_entity(values, key_name=key, parent=parent)
       
  1217 
       
  1218       def ToEntity(entity):
       
  1219         if isinstance(entity, db.Model):
       
  1220           return entity._populate_entity()
       
  1221         else:
       
  1222           return entity
       
  1223 
  1529       if isinstance(entity, list):
  1224       if isinstance(entity, list):
  1530         entities.extend(entity)
  1225         entities.extend(map(ToEntity, entity))
  1531       elif entity:
  1226       elif entity:
  1532         entities.append(entity)
  1227         entities.append(ToEntity(entity))
  1533 
  1228 
  1534     return entities
  1229     return entities
  1535 
  1230 
  1536   def PostEntities(self, item):
  1231   def PostEntities(self, entities):
  1537     """Posts Entity records to a remote endpoint over HTTP.
  1232     """Posts Entity records to a remote endpoint over HTTP.
  1538 
  1233 
  1539     Args:
  1234     Args:
  1540       item: A workitem containing the entities to post.
  1235       entities: A list of datastore entities.
       
  1236     """
       
  1237     if self.dry_run:
       
  1238       return
       
  1239     datastore.Put(entities)
       
  1240 
       
  1241   def _QueryForPbs(self, query):
       
  1242     """Perform the given query and return a list of entity_pb's."""
       
  1243     try:
       
  1244       query_pb = query._ToPb(limit=self.batch_size)
       
  1245       result_pb = datastore_pb.QueryResult()
       
  1246       apiproxy_stub_map.MakeSyncCall('datastore_v3', 'RunQuery', query_pb,
       
  1247                                      result_pb)
       
  1248       next_pb = datastore_pb.NextRequest()
       
  1249       next_pb.set_count(self.batch_size)
       
  1250       next_pb.mutable_cursor().CopyFrom(result_pb.cursor())
       
  1251       result_pb = datastore_pb.QueryResult()
       
  1252       apiproxy_stub_map.MakeSyncCall('datastore_v3', 'Next', next_pb, result_pb)
       
  1253       return result_pb.result_list()
       
  1254     except apiproxy_errors.ApplicationError, e:
       
  1255       raise datastore._ToDatastoreError(e)
       
  1256 
       
  1257   def GetEntities(self, key_range_item, key_factory=datastore.Key):
       
  1258     """Gets Entity records from a remote endpoint over HTTP.
       
  1259 
       
  1260     Args:
       
  1261      key_range_item: Range of keys to get.
       
  1262      key_factory: Used for dependency injection.
  1541 
  1263 
  1542     Returns:
  1264     Returns:
  1543       A pair of the estimated size of the request in bytes and the response
  1265       A DownloadResult instance.
  1544         from the server as a str.
       
  1545     """
       
  1546     entities = item.content
       
  1547     db.put(entities)
       
  1548 
       
  1549   def GetEntities(self, key_range):
       
  1550     """Gets Entity records from a remote endpoint over HTTP.
       
  1551 
       
  1552     Args:
       
  1553      key_range: Range of keys to get.
       
  1554 
       
  1555     Returns:
       
  1556       An ExportResult instance.
       
  1557 
  1266 
  1558     Raises:
  1267     Raises:
  1559       ConfigurationError: if no Exporter is defined for self.kind
  1268       ConfigurationError: if no Exporter is defined for self.kind
  1560     """
  1269     """
  1561     try:
       
  1562       Exporter.RegisteredExporter(self.kind)
       
  1563     except KeyError:
       
  1564       raise ConfigurationError('No Exporter defined for kind %s.' % self.kind)
       
  1565 
       
  1566     keys = []
  1270     keys = []
  1567     entities = []
  1271     entities = []
  1568 
  1272 
  1569     if self.parallel_download:
  1273     if self.parallel_download:
  1570       query = key_range.MakeParallelQuery()
  1274       query = key_range_item.key_range.make_directed_datastore_query(self.kind)
  1571       try:
  1275       try:
  1572         results = query.fetch(self.batch_size)
  1276         results = self._QueryForPbs(query)
  1573       except datastore_errors.NeedIndexError:
  1277       except datastore_errors.NeedIndexError:
  1574         logger.info('%s: No descending index on __key__, '
  1278         logger.info('%s: No descending index on __key__, '
  1575                     'performing serial download', self.kind)
  1279                     'performing serial download', self.kind)
  1576         self.parallel_download = False
  1280         self.parallel_download = False
  1577 
  1281 
  1578     if not self.parallel_download:
  1282     if not self.parallel_download:
  1579       key_range.direction = KeyRange.ASC
  1283       key_range_item.key_range.direction = key_range_module.KeyRange.ASC
  1580       query = key_range.MakeSerialQuery()
  1284       query = key_range_item.key_range.make_ascending_datastore_query(self.kind)
  1581       results = query.fetch(self.batch_size)
  1285       results = self._QueryForPbs(query)
  1582 
  1286 
  1583     size = len(results)
  1287     size = len(results)
  1584 
  1288 
  1585     for model in results:
  1289     for entity in results:
  1586       key = model.key()
  1290       key = key_factory()
  1587       entities.append(cPickle.dumps(model))
  1291       key._Key__reference = entity.key()
       
  1292       entities.append(entity)
  1588       keys.append(key)
  1293       keys.append(key)
  1589 
  1294 
  1590     continued = (size == self.batch_size)
  1295     continued = (size == self.batch_size)
  1591     key_range.count = size
  1296     key_range_item.count = size
  1592 
  1297 
  1593     return ExportResult(continued, key_range.direction, keys, entities)
  1298     return DownloadResult(continued, key_range_item.key_range.direction,
       
  1299                           keys, entities)
       
  1300 
       
  1301   def GetMapper(self):
       
  1302     """Returns a mapper for the registered kind.
       
  1303 
       
  1304     Returns:
       
  1305       A Mapper instance.
       
  1306 
       
  1307     Raises:
       
  1308       ConfigurationError: if no Mapper is defined for self.kind
       
  1309     """
       
  1310     if not self.mapper:
       
  1311       try:
       
  1312         self.mapper = Mapper.RegisteredMapper(self.kind)
       
  1313       except KeyError:
       
  1314         logger.error('No Mapper defined for kind %s.' % self.kind)
       
  1315         raise ConfigurationError('No Mapper defined for kind %s.' % self.kind)
       
  1316     return self.mapper
  1594 
  1317 
  1595 
  1318 
  1596 def InterruptibleSleep(sleep_time):
  1319 def InterruptibleSleep(sleep_time):
  1597   """Puts thread to sleep, checking this threads exit_flag twice a second.
  1320   """Puts thread to sleep, checking this threads exit_flag twice a second.
  1598 
  1321 
  1609     slept += this_sleep_time
  1332     slept += this_sleep_time
  1610     if thread.exit_flag:
  1333     if thread.exit_flag:
  1611       return
  1334       return
  1612 
  1335 
  1613 
  1336 
  1614 class ThreadGate(object):
       
  1615   """Manage the number of active worker threads.
       
  1616 
       
  1617   The ThreadGate limits the number of threads that are simultaneously
       
  1618   uploading batches of records in order to implement adaptive rate
       
  1619   control.  The number of simultaneous upload threads that it takes to
       
  1620   start causing timeout varies widely over the course of the day, so
       
  1621   adaptive rate control allows the uploader to do many uploads while
       
  1622   reducing the error rate and thus increasing the throughput.
       
  1623 
       
  1624   Initially the ThreadGate allows only one uploader thread to be active.
       
  1625   For each successful upload, another thread is activated and for each
       
  1626   failed upload, the number of active threads is reduced by one.
       
  1627   """
       
  1628 
       
  1629   def __init__(self, enabled,
       
  1630                threshhold1=MAXIMUM_INCREASE_DURATION,
       
  1631                threshhold2=MAXIMUM_HOLD_DURATION,
       
  1632                sleep=InterruptibleSleep):
       
  1633     """Constructor for ThreadGate instances.
       
  1634 
       
  1635     Args:
       
  1636       enabled: Whether the thread gate is enabled
       
  1637       threshhold1: Maximum duration (in seconds) for a transfer to increase
       
  1638         the number of active threads.
       
  1639       threshhold2: Maximum duration (in seconds) for a transfer to not decrease
       
  1640         the number of active threads.
       
  1641     """
       
  1642     self.enabled = enabled
       
  1643     self.enabled_count = 1
       
  1644     self.lock = threading.Lock()
       
  1645     self.thread_semaphore = threading.Semaphore(self.enabled_count)
       
  1646     self._threads = []
       
  1647     self.backoff_time = 0
       
  1648     self.sleep = sleep
       
  1649     self.threshhold1 = threshhold1
       
  1650     self.threshhold2 = threshhold2
       
  1651 
       
  1652   def Register(self, thread):
       
  1653     """Register a thread with the thread gate."""
       
  1654     self._threads.append(thread)
       
  1655 
       
  1656   def Threads(self):
       
  1657     """Yields the registered threads."""
       
  1658     for thread in self._threads:
       
  1659       yield thread
       
  1660 
       
  1661   def EnableThread(self):
       
  1662     """Enable one more worker thread."""
       
  1663     self.lock.acquire()
       
  1664     try:
       
  1665       self.enabled_count += 1
       
  1666     finally:
       
  1667       self.lock.release()
       
  1668     self.thread_semaphore.release()
       
  1669 
       
  1670   def EnableAllThreads(self):
       
  1671     """Enable all worker threads."""
       
  1672     for unused_idx in xrange(len(self._threads) - self.enabled_count):
       
  1673       self.EnableThread()
       
  1674 
       
  1675   def StartWork(self):
       
  1676     """Starts a critical section in which the number of workers is limited.
       
  1677 
       
  1678     If thread throttling is enabled then this method starts a critical
       
  1679     section which allows self.enabled_count simultaneously operating
       
  1680     threads. The critical section is ended by calling self.FinishWork().
       
  1681     """
       
  1682     if self.enabled:
       
  1683       self.thread_semaphore.acquire()
       
  1684       if self.backoff_time > 0.0:
       
  1685         if not threading.currentThread().exit_flag:
       
  1686           logger.info('Backing off: %.1f seconds',
       
  1687                       self.backoff_time)
       
  1688         self.sleep(self.backoff_time)
       
  1689 
       
  1690   def FinishWork(self):
       
  1691     """Ends a critical section started with self.StartWork()."""
       
  1692     if self.enabled:
       
  1693       self.thread_semaphore.release()
       
  1694 
       
  1695   def TransferSuccess(self, duration):
       
  1696     """Informs the throttler that an item was successfully sent.
       
  1697 
       
  1698     If thread throttling is enabled and the duration is low enough, this
       
  1699     method will cause an additional thread to run in the critical section.
       
  1700 
       
  1701     Args:
       
  1702       duration: The duration of the transfer in seconds.
       
  1703     """
       
  1704     if duration > self.threshhold2:
       
  1705       logger.debug('Transfer took %s, decreasing workers.', duration)
       
  1706       self.DecreaseWorkers(backoff=False)
       
  1707       return
       
  1708     elif duration > self.threshhold1:
       
  1709       logger.debug('Transfer took %s, not increasing workers.', duration)
       
  1710       return
       
  1711     elif self.enabled:
       
  1712       if self.backoff_time > 0.0:
       
  1713         logger.info('Resetting backoff to 0.0')
       
  1714         self.backoff_time = 0.0
       
  1715       do_enable = False
       
  1716       self.lock.acquire()
       
  1717       try:
       
  1718         if self.enabled and len(self._threads) > self.enabled_count:
       
  1719           do_enable = True
       
  1720           self.enabled_count += 1
       
  1721       finally:
       
  1722         self.lock.release()
       
  1723       if do_enable:
       
  1724         logger.debug('Increasing active thread count to %d',
       
  1725                      self.enabled_count)
       
  1726         self.thread_semaphore.release()
       
  1727 
       
  1728   def DecreaseWorkers(self, backoff=True):
       
  1729     """Informs the thread_gate that an item failed to send.
       
  1730 
       
  1731     If thread throttling is enabled, this method will cause the
       
  1732     throttler to allow one fewer thread in the critical section. If
       
  1733     there is only one thread remaining, failures will result in
       
  1734     exponential backoff until there is a success.
       
  1735 
       
  1736     Args:
       
  1737       backoff: Whether to increase exponential backoff if there is only
       
  1738         one thread enabled.
       
  1739     """
       
  1740     if self.enabled:
       
  1741       do_disable = False
       
  1742       self.lock.acquire()
       
  1743       try:
       
  1744         if self.enabled:
       
  1745           if self.enabled_count > 1:
       
  1746             do_disable = True
       
  1747             self.enabled_count -= 1
       
  1748           elif backoff:
       
  1749             if self.backoff_time == 0.0:
       
  1750               self.backoff_time = INITIAL_BACKOFF
       
  1751             else:
       
  1752               self.backoff_time *= BACKOFF_FACTOR
       
  1753       finally:
       
  1754         self.lock.release()
       
  1755       if do_disable:
       
  1756         logger.debug('Decreasing the number of active threads to %d',
       
  1757                      self.enabled_count)
       
  1758         self.thread_semaphore.acquire()
       
  1759 
       
  1760 
       
  1761 class Throttle(object):
       
  1762   """A base class for upload rate throttling.
       
  1763 
       
  1764   Transferring large number of records, too quickly, to an application
       
  1765   could trigger quota limits and cause the transfer process to halt.
       
  1766   In order to stay within the application's quota, we throttle the
       
  1767   data transfer to a specified limit (across all transfer threads).
       
  1768   This limit defaults to about half of the Google App Engine default
       
  1769   for an application, but can be manually adjusted faster/slower as
       
  1770   appropriate.
       
  1771 
       
  1772   This class tracks a moving average of some aspect of the transfer
       
  1773   rate (bandwidth, records per second, http connections per
       
  1774   second). It keeps two windows of counts of bytes transferred, on a
       
  1775   per-thread basis. One block is the "current" block, and the other is
       
  1776   the "prior" block. It will rotate the counts from current to prior
       
  1777   when ROTATE_PERIOD has passed.  Thus, the current block will
       
  1778   represent from 0 seconds to ROTATE_PERIOD seconds of activity
       
  1779   (determined by: time.time() - self.last_rotate).  The prior block
       
  1780   will always represent a full ROTATE_PERIOD.
       
  1781 
       
  1782   Sleeping is performed just before a transfer of another block, and is
       
  1783   based on the counts transferred *before* the next transfer. It really
       
  1784   does not matter how much will be transferred, but only that for all the
       
  1785   data transferred SO FAR that we have interspersed enough pauses to
       
  1786   ensure the aggregate transfer rate is within the specified limit.
       
  1787 
       
  1788   These counts are maintained on a per-thread basis, so we do not require
       
  1789   any interlocks around incrementing the counts. There IS an interlock on
       
  1790   the rotation of the counts because we do not want multiple threads to
       
  1791   multiply-rotate the counts.
       
  1792 
       
  1793   There are various race conditions in the computation and collection
       
  1794   of these counts. We do not require precise values, but simply to
       
  1795   keep the overall transfer within the bandwidth limits. If a given
       
  1796   pause is a little short, or a little long, then the aggregate delays
       
  1797   will be correct.
       
  1798   """
       
  1799 
       
  1800   ROTATE_PERIOD = 600
       
  1801 
       
  1802   def __init__(self,
       
  1803                get_time=time.time,
       
  1804                thread_sleep=InterruptibleSleep,
       
  1805                layout=None):
       
  1806     self.get_time = get_time
       
  1807     self.thread_sleep = thread_sleep
       
  1808 
       
  1809     self.start_time = get_time()
       
  1810     self.transferred = {}
       
  1811     self.prior_block = {}
       
  1812     self.totals = {}
       
  1813     self.throttles = {}
       
  1814 
       
  1815     self.last_rotate = {}
       
  1816     self.rotate_mutex = {}
       
  1817     if layout:
       
  1818       self.AddThrottles(layout)
       
  1819 
       
  1820   def AddThrottle(self, name, limit):
       
  1821     self.throttles[name] = limit
       
  1822     self.transferred[name] = {}
       
  1823     self.prior_block[name] = {}
       
  1824     self.totals[name] = {}
       
  1825     self.last_rotate[name] = self.get_time()
       
  1826     self.rotate_mutex[name] = threading.Lock()
       
  1827 
       
  1828   def AddThrottles(self, layout):
       
  1829     for key, value in layout.iteritems():
       
  1830       self.AddThrottle(key, value)
       
  1831 
       
  1832   def Register(self, thread):
       
  1833     """Register this thread with the throttler."""
       
  1834     thread_name = thread.getName()
       
  1835     for throttle_name in self.throttles.iterkeys():
       
  1836       self.transferred[throttle_name][thread_name] = 0
       
  1837       self.prior_block[throttle_name][thread_name] = 0
       
  1838       self.totals[throttle_name][thread_name] = 0
       
  1839 
       
  1840   def VerifyName(self, throttle_name):
       
  1841     if throttle_name not in self.throttles:
       
  1842       raise AssertionError('%s is not a registered throttle' % throttle_name)
       
  1843 
       
  1844   def AddTransfer(self, throttle_name, token_count):
       
  1845     """Add a count to the amount this thread has transferred.
       
  1846 
       
  1847     Each time a thread transfers some data, it should call this method to
       
  1848     note the amount sent. The counts may be rotated if sufficient time
       
  1849     has passed since the last rotation.
       
  1850 
       
  1851     Note: this method should only be called by the BulkLoaderThread
       
  1852     instances. The token count is allocated towards the
       
  1853     "current thread".
       
  1854 
       
  1855     Args:
       
  1856       throttle_name: The name of the throttle to add to.
       
  1857       token_count: The number to add to the throttle counter.
       
  1858     """
       
  1859     self.VerifyName(throttle_name)
       
  1860     transferred = self.transferred[throttle_name]
       
  1861     transferred[threading.currentThread().getName()] += token_count
       
  1862 
       
  1863     if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time():
       
  1864       self._RotateCounts(throttle_name)
       
  1865 
       
  1866   def Sleep(self, throttle_name=None):
       
  1867     """Possibly sleep in order to limit the transfer rate.
       
  1868 
       
  1869     Note that we sleep based on *prior* transfers rather than what we
       
  1870     may be about to transfer. The next transfer could put us under/over
       
  1871     and that will be rectified *after* that transfer. Net result is that
       
  1872     the average transfer rate will remain within bounds. Spiky behavior
       
  1873     or uneven rates among the threads could possibly bring the transfer
       
  1874     rate above the requested limit for short durations.
       
  1875 
       
  1876     Args:
       
  1877       throttle_name: The name of the throttle to sleep on.  If None or
       
  1878         omitted, then sleep on all throttles.
       
  1879     """
       
  1880     if throttle_name is None:
       
  1881       for throttle_name in self.throttles:
       
  1882         self.Sleep(throttle_name=throttle_name)
       
  1883       return
       
  1884 
       
  1885     self.VerifyName(throttle_name)
       
  1886 
       
  1887     thread = threading.currentThread()
       
  1888 
       
  1889     while True:
       
  1890       duration = self.get_time() - self.last_rotate[throttle_name]
       
  1891 
       
  1892       total = 0
       
  1893       for count in self.prior_block[throttle_name].values():
       
  1894         total += count
       
  1895 
       
  1896       if total:
       
  1897         duration += self.ROTATE_PERIOD
       
  1898 
       
  1899       for count in self.transferred[throttle_name].values():
       
  1900         total += count
       
  1901 
       
  1902       sleep_time = (float(total) / self.throttles[throttle_name]) - duration
       
  1903 
       
  1904       if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
       
  1905         break
       
  1906 
       
  1907       logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
       
  1908                    '(duration=%.1f ms, total=%d)',
       
  1909                    thread.getName(), throttle_name,
       
  1910                    sleep_time * 1000, duration * 1000, total)
       
  1911       self.thread_sleep(sleep_time)
       
  1912       if thread.exit_flag:
       
  1913         break
       
  1914       self._RotateCounts(throttle_name)
       
  1915 
       
  1916   def _RotateCounts(self, throttle_name):
       
  1917     """Rotate the transfer counters.
       
  1918 
       
  1919     If sufficient time has passed, then rotate the counters from active to
       
  1920     the prior-block of counts.
       
  1921 
       
  1922     This rotation is interlocked to ensure that multiple threads do not
       
  1923     over-rotate the counts.
       
  1924 
       
  1925     Args:
       
  1926       throttle_name: The name of the throttle to rotate.
       
  1927     """
       
  1928     self.VerifyName(throttle_name)
       
  1929     self.rotate_mutex[throttle_name].acquire()
       
  1930     try:
       
  1931       next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD
       
  1932       if next_rotate_time >= self.get_time():
       
  1933         return
       
  1934 
       
  1935       for name, count in self.transferred[throttle_name].items():
       
  1936 
       
  1937 
       
  1938         self.prior_block[throttle_name][name] = count
       
  1939         self.transferred[throttle_name][name] = 0
       
  1940 
       
  1941         self.totals[throttle_name][name] += count
       
  1942 
       
  1943       self.last_rotate[throttle_name] = self.get_time()
       
  1944 
       
  1945     finally:
       
  1946       self.rotate_mutex[throttle_name].release()
       
  1947 
       
  1948   def TotalTransferred(self, throttle_name):
       
  1949     """Return the total transferred, and over what period.
       
  1950 
       
  1951     Args:
       
  1952       throttle_name: The name of the throttle to total.
       
  1953 
       
  1954     Returns:
       
  1955       A tuple of the total count and running time for the given throttle name.
       
  1956     """
       
  1957     total = 0
       
  1958     for count in self.totals[throttle_name].values():
       
  1959       total += count
       
  1960     for count in self.transferred[throttle_name].values():
       
  1961       total += count
       
  1962     return total, self.get_time() - self.start_time
       
  1963 
       
  1964 
       
  1965 class _ThreadBase(threading.Thread):
  1337 class _ThreadBase(threading.Thread):
  1966   """Provide some basic features for the threads used in the uploader.
  1338   """Provide some basic features for the threads used in the uploader.
  1967 
  1339 
  1968   This abstract base class is used to provide some common features:
  1340   This abstract base class is used to provide some common features:
  1969 
  1341 
  1991 
  1363 
  1992     self.setDaemon(True)
  1364     self.setDaemon(True)
  1993 
  1365 
  1994     self.exit_flag = False
  1366     self.exit_flag = False
  1995     self.error = None
  1367     self.error = None
       
  1368     self.traceback = None
  1996 
  1369 
  1997   def run(self):
  1370   def run(self):
  1998     """Perform the work of the thread."""
  1371     """Perform the work of the thread."""
  1999     logger.info('[%s] %s: started', self.getName(), self.__class__.__name__)
  1372     logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__)
  2000 
  1373 
  2001     try:
  1374     try:
  2002       self.PerformWork()
  1375       self.PerformWork()
  2003     except:
  1376     except:
  2004       self.error = sys.exc_info()[1]
  1377       self.SetError()
  2005       logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
  1378       logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
  2006 
  1379 
  2007     logger.info('[%s] %s: exiting', self.getName(), self.__class__.__name__)
  1380     logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__)
       
  1381 
       
  1382   def SetError(self):
       
  1383     """Sets the error and traceback information for this thread.
       
  1384 
       
  1385     This must be called from an exception handler.
       
  1386     """
       
  1387     if not self.error:
       
  1388       exc_info = sys.exc_info()
       
  1389       self.error = exc_info[1]
       
  1390       self.traceback = exc_info[2]
  2008 
  1391 
  2009   def PerformWork(self):
  1392   def PerformWork(self):
  2010     """Perform the thread-specific work."""
  1393     """Perform the thread-specific work."""
  2011     raise NotImplementedError()
  1394     raise NotImplementedError()
  2012 
  1395 
  2013   def CheckError(self):
  1396   def CheckError(self):
  2014     """If an error is present, then log it."""
  1397     """If an error is present, then log it."""
  2015     if self.error:
  1398     if self.error:
  2016       logger.error('Error in %s: %s', self.GetFriendlyName(), self.error)
  1399       logger.error('Error in %s: %s', self.GetFriendlyName(), self.error)
       
  1400       if self.traceback:
       
  1401         logger.debug(''.join(traceback.format_exception(self.error.__class__,
       
  1402                                                         self.error,
       
  1403                                                         self.traceback)))
  2017 
  1404 
  2018   def GetFriendlyName(self):
  1405   def GetFriendlyName(self):
  2019     """Returns a human-friendly description of the thread."""
  1406     """Returns a human-friendly description of the thread."""
  2020     if hasattr(self, 'NAME'):
  1407     if hasattr(self, 'NAME'):
  2021       return self.NAME
  1408       return self.NAME
  2042   if not isinstance(error.reason[0], int):
  1429   if not isinstance(error.reason[0], int):
  2043     return True
  1430     return True
  2044   return error.reason[0] not in non_fatal_error_codes
  1431   return error.reason[0] not in non_fatal_error_codes
  2045 
  1432 
  2046 
  1433 
  2047 def PrettyKey(key):
       
  2048   """Returns a nice string representation of the given key."""
       
  2049   if key is None:
       
  2050     return None
       
  2051   elif isinstance(key, db.Key):
       
  2052     return repr(key.id_or_name())
       
  2053   return str(key)
       
  2054 
       
  2055 
       
  2056 class _BulkWorkerThread(_ThreadBase):
       
  2057   """A base class for worker threads.
       
  2058 
       
  2059   This thread will read WorkItem instances from the work_queue and upload
       
  2060   the entities to the server application. Progress information will be
       
  2061   pushed into the progress_queue as the work is being performed.
       
  2062 
       
  2063   If a _BulkWorkerThread encounters a transient error, the entities will be
       
  2064   resent, if a fatal error is encoutered the BulkWorkerThread exits.
       
  2065 
       
  2066   Subclasses must provide implementations for PreProcessItem, TransferItem,
       
  2067   and ProcessResponse.
       
  2068   """
       
  2069 
       
  2070   def __init__(self,
       
  2071                work_queue,
       
  2072                throttle,
       
  2073                thread_gate,
       
  2074                request_manager,
       
  2075                num_threads,
       
  2076                batch_size,
       
  2077                state_message,
       
  2078                get_time):
       
  2079     """Initialize the BulkLoaderThread instance.
       
  2080 
       
  2081     Args:
       
  2082       work_queue: A queue containing WorkItems for processing.
       
  2083       throttle: A Throttles to control upload bandwidth.
       
  2084       thread_gate: A ThreadGate to control number of simultaneous uploads.
       
  2085       request_manager: A RequestManager instance.
       
  2086       num_threads: The number of threads for parallel transfers.
       
  2087       batch_size: The number of entities to transfer per request.
       
  2088       state_message: Used for dependency injection.
       
  2089       get_time: Used for dependency injection.
       
  2090     """
       
  2091     _ThreadBase.__init__(self)
       
  2092 
       
  2093     self.work_queue = work_queue
       
  2094     self.throttle = throttle
       
  2095     self.thread_gate = thread_gate
       
  2096     self.request_manager = request_manager
       
  2097     self.num_threads = num_threads
       
  2098     self.batch_size = batch_size
       
  2099     self.state_message = state_message
       
  2100     self.get_time = get_time
       
  2101 
       
  2102   def PreProcessItem(self, item):
       
  2103     """Performs pre transfer processing on a work item."""
       
  2104     raise NotImplementedError()
       
  2105 
       
  2106   def TransferItem(self, item):
       
  2107     """Transfers the entities associated with an item.
       
  2108 
       
  2109     Args:
       
  2110       item: An item of upload (WorkItem) or download (KeyRange) work.
       
  2111 
       
  2112     Returns:
       
  2113       A tuple of (estimated transfer size, response)
       
  2114     """
       
  2115     raise NotImplementedError()
       
  2116 
       
  2117   def ProcessResponse(self, item, result):
       
  2118     """Processes the response from the server application."""
       
  2119     raise NotImplementedError()
       
  2120 
       
  2121   def PerformWork(self):
       
  2122     """Perform the work of a _BulkWorkerThread."""
       
  2123     while not self.exit_flag:
       
  2124       transferred = False
       
  2125       self.thread_gate.StartWork()
       
  2126       try:
       
  2127         try:
       
  2128           item = self.work_queue.get(block=True, timeout=1.0)
       
  2129         except Queue.Empty:
       
  2130           continue
       
  2131         if item == _THREAD_SHOULD_EXIT:
       
  2132           break
       
  2133 
       
  2134         logger.debug('[%s] Got work item %s', self.getName(), item)
       
  2135 
       
  2136         try:
       
  2137 
       
  2138           item.MarkAsTransferring()
       
  2139           self.PreProcessItem(item)
       
  2140           response = None
       
  2141           try:
       
  2142             try:
       
  2143               t = self.get_time()
       
  2144               response = self.TransferItem(item)
       
  2145               status = 200
       
  2146               transferred = True
       
  2147               transfer_time = self.get_time() - t
       
  2148               logger.debug('[%s] %s Transferred %d entities in %0.1f seconds',
       
  2149                            self.getName(), item, item.count, transfer_time)
       
  2150               self.throttle.AddTransfer(RECORDS, item.count)
       
  2151             except (db.InternalError, db.NotSavedError, db.Timeout,
       
  2152                     apiproxy_errors.OverQuotaError,
       
  2153                     apiproxy_errors.DeadlineExceededError), e:
       
  2154               logger.exception('Caught non-fatal datastore error: %s', e)
       
  2155             except urllib2.HTTPError, e:
       
  2156               status = e.code
       
  2157               if status == 403 or (status >= 500 and status < 600):
       
  2158                 logger.exception('Caught non-fatal HTTP error: %d %s',
       
  2159                                  status, e.msg)
       
  2160               else:
       
  2161                 raise e
       
  2162             except urllib2.URLError, e:
       
  2163               if IsURLErrorFatal(e):
       
  2164                 raise e
       
  2165               else:
       
  2166                 logger.exception('Caught non-fatal URL error: %s', e.reason)
       
  2167 
       
  2168             self.ProcessResponse(item, response)
       
  2169 
       
  2170           except:
       
  2171             self.error = sys.exc_info()[1]
       
  2172             logger.exception('[%s] %s: caught exception %s', self.getName(),
       
  2173                              self.__class__.__name__, str(sys.exc_info()))
       
  2174             raise
       
  2175 
       
  2176         finally:
       
  2177           if transferred:
       
  2178             item.MarkAsTransferred()
       
  2179             self.work_queue.task_done()
       
  2180             self.thread_gate.TransferSuccess(transfer_time)
       
  2181           else:
       
  2182             item.MarkAsError()
       
  2183             try:
       
  2184               self.work_queue.reput(item, block=False)
       
  2185             except Queue.Full:
       
  2186               logger.error('[%s] Failed to reput work item.', self.getName())
       
  2187               raise Error('Failed to reput work item')
       
  2188             self.thread_gate.DecreaseWorkers()
       
  2189           logger.info('%s %s',
       
  2190                       item,
       
  2191                       self.state_message(item.state))
       
  2192 
       
  2193       finally:
       
  2194         self.thread_gate.FinishWork()
       
  2195 
       
  2196 
       
  2197   def GetFriendlyName(self):
       
  2198     """Returns a human-friendly name for this thread."""
       
  2199     return 'worker [%s]' % self.getName()
       
  2200 
       
  2201 
       
  2202 class BulkLoaderThread(_BulkWorkerThread):
       
  2203   """A thread which transmits entities to the server application.
       
  2204 
       
  2205   This thread will read WorkItem instances from the work_queue and upload
       
  2206   the entities to the server application. Progress information will be
       
  2207   pushed into the progress_queue as the work is being performed.
       
  2208 
       
  2209   If a BulkLoaderThread encounters a transient error, the entities will be
       
  2210   resent, if a fatal error is encoutered the BulkLoaderThread exits.
       
  2211   """
       
  2212 
       
  2213   def __init__(self,
       
  2214                work_queue,
       
  2215                throttle,
       
  2216                thread_gate,
       
  2217                request_manager,
       
  2218                num_threads,
       
  2219                batch_size,
       
  2220                get_time=time.time):
       
  2221     """Initialize the BulkLoaderThread instance.
       
  2222 
       
  2223     Args:
       
  2224       work_queue: A queue containing WorkItems for processing.
       
  2225       throttle: A Throttles to control upload bandwidth.
       
  2226       thread_gate: A ThreadGate to control number of simultaneous uploads.
       
  2227       request_manager: A RequestManager instance.
       
  2228       num_threads: The number of threads for parallel transfers.
       
  2229       batch_size: The number of entities to transfer per request.
       
  2230       get_time: Used for dependency injection.
       
  2231     """
       
  2232     _BulkWorkerThread.__init__(self,
       
  2233                                work_queue,
       
  2234                                throttle,
       
  2235                                thread_gate,
       
  2236                                request_manager,
       
  2237                                num_threads,
       
  2238                                batch_size,
       
  2239                                ImportStateMessage,
       
  2240                                get_time)
       
  2241 
       
  2242   def PreProcessItem(self, item):
       
  2243     """Performs pre transfer processing on a work item."""
       
  2244     if item and not item.content:
       
  2245       item.content = self.request_manager.EncodeContent(item.rows)
       
  2246 
       
  2247   def TransferItem(self, item):
       
  2248     """Transfers the entities associated with an item.
       
  2249 
       
  2250     Args:
       
  2251       item: An item of upload (WorkItem) work.
       
  2252 
       
  2253     Returns:
       
  2254       A tuple of (estimated transfer size, response)
       
  2255     """
       
  2256     return self.request_manager.PostEntities(item)
       
  2257 
       
  2258   def ProcessResponse(self, item, response):
       
  2259     """Processes the response from the server application."""
       
  2260     pass
       
  2261 
       
  2262 
       
  2263 class BulkExporterThread(_BulkWorkerThread):
       
  2264   """A thread which recieved entities to the server application.
       
  2265 
       
  2266   This thread will read KeyRange instances from the work_queue and export
       
  2267   the entities from the server application. Progress information will be
       
  2268   pushed into the progress_queue as the work is being performed.
       
  2269 
       
  2270   If a BulkExporterThread encounters an error when trying to post data,
       
  2271   the thread will exit and cause the application to terminate.
       
  2272   """
       
  2273 
       
  2274   def __init__(self,
       
  2275                work_queue,
       
  2276                throttle,
       
  2277                thread_gate,
       
  2278                request_manager,
       
  2279                num_threads,
       
  2280                batch_size,
       
  2281                get_time=time.time):
       
  2282 
       
  2283     """Initialize the BulkExporterThread instance.
       
  2284 
       
  2285     Args:
       
  2286       work_queue: A queue containing KeyRanges for processing.
       
  2287       throttle: A Throttles to control upload bandwidth.
       
  2288       thread_gate: A ThreadGate to control number of simultaneous uploads.
       
  2289       request_manager: A RequestManager instance.
       
  2290       num_threads: The number of threads for parallel transfers.
       
  2291       batch_size: The number of entities to transfer per request.
       
  2292       get_time: Used for dependency injection.
       
  2293     """
       
  2294     _BulkWorkerThread.__init__(self,
       
  2295                                work_queue,
       
  2296                                throttle,
       
  2297                                thread_gate,
       
  2298                                request_manager,
       
  2299                                num_threads,
       
  2300                                batch_size,
       
  2301                                ExportStateMessage,
       
  2302                                get_time)
       
  2303 
       
  2304   def PreProcessItem(self, unused_item):
       
  2305     """Performs pre transfer processing on a work item."""
       
  2306     pass
       
  2307 
       
  2308   def TransferItem(self, item):
       
  2309     """Transfers the entities associated with an item.
       
  2310 
       
  2311     Args:
       
  2312       item: An item of download (KeyRange) work.
       
  2313 
       
  2314     Returns:
       
  2315       A tuple of (estimated transfer size, response)
       
  2316     """
       
  2317     return self.request_manager.GetEntities(item)
       
  2318 
       
  2319   def ProcessResponse(self, item, export_result):
       
  2320     """Processes the response from the server application."""
       
  2321     if export_result:
       
  2322       item.Process(export_result, self.num_threads, self.batch_size,
       
  2323                    self.work_queue)
       
  2324     item.state = STATE_GOT
       
  2325 
       
  2326 
       
  2327 class DataSourceThread(_ThreadBase):
  1434 class DataSourceThread(_ThreadBase):
  2328   """A thread which reads WorkItems and pushes them into queue.
  1435   """A thread which reads WorkItems and pushes them into queue.
  2329 
  1436 
  2330   This thread will read/consume WorkItems from a generator (produced by
  1437   This thread will read/consume WorkItems from a generator (produced by
  2331   the generator factory). These WorkItems will then be pushed into the
  1438   the generator factory). These WorkItems will then be pushed into the
  2332   work_queue. Note that reading will block if/when the work_queue becomes
  1439   thread_pool. Note that reading will block if/when the thread_pool becomes
  2333   full. Information on content consumed from the generator will be pushed
  1440   full. Information on content consumed from the generator will be pushed
  2334   into the progress_queue.
  1441   into the progress_queue.
  2335   """
  1442   """
  2336 
  1443 
  2337   NAME = 'data source thread'
  1444   NAME = 'data source thread'
  2338 
  1445 
  2339   def __init__(self,
  1446   def __init__(self,
  2340                work_queue,
  1447                request_manager,
       
  1448                thread_pool,
  2341                progress_queue,
  1449                progress_queue,
  2342                workitem_generator_factory,
  1450                workitem_generator_factory,
  2343                progress_generator_factory):
  1451                progress_generator_factory):
  2344     """Initialize the DataSourceThread instance.
  1452     """Initialize the DataSourceThread instance.
  2345 
  1453 
  2346     Args:
  1454     Args:
  2347       work_queue: A queue containing WorkItems for processing.
  1455       request_manager: A RequestManager instance.
       
  1456       thread_pool: An AdaptiveThreadPool instance.
  2348       progress_queue: A queue used for tracking progress information.
  1457       progress_queue: A queue used for tracking progress information.
  2349       workitem_generator_factory: A factory that creates a WorkItem generator
  1458       workitem_generator_factory: A factory that creates a WorkItem generator
  2350       progress_generator_factory: A factory that creates a generator which
  1459       progress_generator_factory: A factory that creates a generator which
  2351         produces prior progress status, or None if there is no prior status
  1460         produces prior progress status, or None if there is no prior status
  2352         to use.
  1461         to use.
  2353     """
  1462     """
  2354     _ThreadBase.__init__(self)
  1463     _ThreadBase.__init__(self)
  2355 
  1464 
  2356     self.work_queue = work_queue
  1465     self.request_manager = request_manager
       
  1466     self.thread_pool = thread_pool
  2357     self.progress_queue = progress_queue
  1467     self.progress_queue = progress_queue
  2358     self.workitem_generator_factory = workitem_generator_factory
  1468     self.workitem_generator_factory = workitem_generator_factory
  2359     self.progress_generator_factory = progress_generator_factory
  1469     self.progress_generator_factory = progress_generator_factory
  2360     self.entity_count = 0
  1470     self.entity_count = 0
  2361 
  1471 
  2364     if self.progress_generator_factory:
  1474     if self.progress_generator_factory:
  2365       progress_gen = self.progress_generator_factory()
  1475       progress_gen = self.progress_generator_factory()
  2366     else:
  1476     else:
  2367       progress_gen = None
  1477       progress_gen = None
  2368 
  1478 
  2369     content_gen = self.workitem_generator_factory(self.progress_queue,
  1479     content_gen = self.workitem_generator_factory(self.request_manager,
       
  1480                                                   self.progress_queue,
  2370                                                   progress_gen)
  1481                                                   progress_gen)
  2371 
  1482 
  2372     self.xfer_count = 0
  1483     self.xfer_count = 0
  2373     self.read_count = 0
  1484     self.read_count = 0
  2374     self.read_all = False
  1485     self.read_all = False
  2376     for item in content_gen.Batches():
  1487     for item in content_gen.Batches():
  2377       item.MarkAsRead()
  1488       item.MarkAsRead()
  2378 
  1489 
  2379       while not self.exit_flag:
  1490       while not self.exit_flag:
  2380         try:
  1491         try:
  2381           self.work_queue.put(item, block=True, timeout=1.0)
  1492           self.thread_pool.SubmitItem(item, block=True, timeout=1.0)
  2382           self.entity_count += item.count
  1493           self.entity_count += item.count
  2383           break
  1494           break
  2384         except Queue.Full:
  1495         except Queue.Full:
  2385           pass
  1496           pass
  2386 
  1497 
  2524 
  1635 
  2525     self.insert_cursor = self.secondary_conn.cursor()
  1636     self.insert_cursor = self.secondary_conn.cursor()
  2526     self.update_cursor = self.secondary_conn.cursor()
  1637     self.update_cursor = self.secondary_conn.cursor()
  2527 
  1638 
  2528 
  1639 
       
  1640 zero_matcher = re.compile(r'\x00')
       
  1641 
       
  1642 zero_one_matcher = re.compile(r'\x00\x01')
       
  1643 
       
  1644 
       
  1645 def KeyStr(key):
       
  1646   """Returns a string to represent a key, preserving ordering.
       
  1647 
       
  1648   Unlike datastore.Key.__str__(), we have the property:
       
  1649 
       
  1650     key1 < key2 ==> KeyStr(key1) < KeyStr(key2)
       
  1651 
       
  1652   The key string is constructed from the key path as follows:
       
  1653     (1) Strings are prepended with ':' and numeric id's are padded to
       
  1654         20 digits.
       
  1655     (2) Any null characters (u'\0') present are replaced with u'\0\1'
       
  1656     (3) The sequence u'\0\0' is used to separate each component of the path.
       
  1657 
       
  1658   (1) assures that names and ids compare properly, while (2) and (3) enforce
       
  1659   the part-by-part comparison of pieces of the path.
       
  1660 
       
  1661   Args:
       
  1662     key: A datastore.Key instance.
       
  1663 
       
  1664   Returns:
       
  1665     A string representation of the key, which preserves ordering.
       
  1666   """
       
  1667   assert isinstance(key, datastore.Key)
       
  1668   path = key.to_path()
       
  1669 
       
  1670   out_path = []
       
  1671   for part in path:
       
  1672     if isinstance(part, (int, long)):
       
  1673       part = '%020d' % part
       
  1674     else:
       
  1675       part = ':%s' % part
       
  1676 
       
  1677     out_path.append(zero_matcher.sub(u'\0\1', part))
       
  1678 
       
  1679   out_str = u'\0\0'.join(out_path)
       
  1680 
       
  1681   return out_str
       
  1682 
       
  1683 
       
  1684 def StrKey(key_str):
       
  1685   """The inverse of the KeyStr function.
       
  1686 
       
  1687   Args:
       
  1688     key_str: A string in the range of KeyStr.
       
  1689 
       
  1690   Returns:
       
  1691     A datastore.Key instance k, such that KeyStr(k) == key_str.
       
  1692   """
       
  1693   parts = key_str.split(u'\0\0')
       
  1694   for i in xrange(len(parts)):
       
  1695     if parts[i][0] == ':':
       
  1696       part = parts[i][1:]
       
  1697       part = zero_one_matcher.sub(u'\0', part)
       
  1698       parts[i] = part
       
  1699     else:
       
  1700       parts[i] = int(parts[i])
       
  1701   return datastore.Key.from_path(*parts)
       
  1702 
       
  1703 
  2529 class ResultDatabase(_Database):
  1704 class ResultDatabase(_Database):
  2530   """Persistently record all the entities downloaded during an export.
  1705   """Persistently record all the entities downloaded during an export.
  2531 
  1706 
  2532   The entities are held in the database by their unique datastore key
  1707   The entities are held in the database by their unique datastore key
  2533   in order to avoid duplication if an export is restarted.
  1708   in order to avoid duplication if an export is restarted.
  2542         used to make sure we are not using an old database.
  1717         used to make sure we are not using an old database.
  2543       commit_periodicity: How many operations to perform between commits.
  1718       commit_periodicity: How many operations to perform between commits.
  2544     """
  1719     """
  2545     self.complete = False
  1720     self.complete = False
  2546     create_table = ('create table result (\n'
  1721     create_table = ('create table result (\n'
  2547                     'id TEXT primary key,\n'
  1722                     'id BLOB primary key,\n'
  2548                     'value BLOB not null)')
  1723                     'value BLOB not null)')
  2549 
  1724 
  2550     _Database.__init__(self,
  1725     _Database.__init__(self,
  2551                        db_filename,
  1726                        db_filename,
  2552                        create_table,
  1727                        create_table,
  2558       self.existing_count = int(cursor.fetchone()[0])
  1733       self.existing_count = int(cursor.fetchone()[0])
  2559     else:
  1734     else:
  2560       self.existing_count = 0
  1735       self.existing_count = 0
  2561     self.count = self.existing_count
  1736     self.count = self.existing_count
  2562 
  1737 
  2563   def _StoreEntity(self, entity_id, value):
  1738   def _StoreEntity(self, entity_id, entity):
  2564     """Store an entity in the result database.
  1739     """Store an entity in the result database.
  2565 
  1740 
  2566     Args:
  1741     Args:
  2567       entity_id: A db.Key for the entity.
  1742       entity_id: A datastore.Key for the entity.
  2568       value: A string of the contents of the entity.
  1743       entity: The entity to store.
  2569 
  1744 
  2570     Returns:
  1745     Returns:
  2571       True if this entities is not already present in the result database.
  1746       True if this entities is not already present in the result database.
  2572     """
  1747     """
  2573 
  1748 
  2574     assert _RunningInThread(self.secondary_thread)
  1749     assert _RunningInThread(self.secondary_thread)
  2575     assert isinstance(entity_id, db.Key)
  1750     assert isinstance(entity_id, datastore.Key), (
  2576 
  1751         'expected a datastore.Key, got a %s' % entity_id.__class__.__name__)
  2577     entity_id = entity_id.id_or_name()
  1752 
       
  1753     key_str = buffer(KeyStr(entity_id).encode('utf-8'))
  2578     self.insert_cursor.execute(
  1754     self.insert_cursor.execute(
  2579         'select count(*) from result where id = ?', (unicode(entity_id),))
  1755         'select count(*) from result where id = ?', (key_str,))
       
  1756 
  2580     already_present = self.insert_cursor.fetchone()[0]
  1757     already_present = self.insert_cursor.fetchone()[0]
  2581     result = True
  1758     result = True
  2582     if already_present:
  1759     if already_present:
  2583       result = False
  1760       result = False
  2584       self.insert_cursor.execute('delete from result where id = ?',
  1761       self.insert_cursor.execute('delete from result where id = ?',
  2585                                  (unicode(entity_id),))
  1762                                  (key_str,))
  2586     else:
  1763     else:
  2587       self.count += 1
  1764       self.count += 1
       
  1765     value = entity.Encode()
  2588     self.insert_cursor.execute(
  1766     self.insert_cursor.execute(
  2589         'insert into result (id, value) values (?, ?)',
  1767         'insert into result (id, value) values (?, ?)',
  2590         (unicode(entity_id), buffer(value)))
  1768         (key_str, buffer(value)))
  2591     return result
  1769     return result
  2592 
  1770 
  2593   def StoreEntities(self, keys, entities):
  1771   def StoreEntities(self, keys, entities):
  2594     """Store a group of entities in the result database.
  1772     """Store a group of entities in the result database.
  2595 
  1773 
  2601       The number of new entities stored in the result database.
  1779       The number of new entities stored in the result database.
  2602     """
  1780     """
  2603     self._OpenSecondaryConnection()
  1781     self._OpenSecondaryConnection()
  2604     t = time.time()
  1782     t = time.time()
  2605     count = 0
  1783     count = 0
  2606     for entity_id, value in zip(keys,
  1784     for entity_id, entity in zip(keys,
  2607                                 entities):
  1785                                  entities):
  2608       if self._StoreEntity(entity_id, value):
  1786       if self._StoreEntity(entity_id, entity):
  2609         count += 1
  1787         count += 1
  2610     logger.debug('%s insert: delta=%.3f',
  1788     logger.debug('%s insert: delta=%.3f',
  2611                  self.db_filename,
  1789                  self.db_filename,
  2612                  time.time() - t)
  1790                  time.time() - t)
  2613     logger.debug('Entities transferred total: %s', self.count)
  1791     logger.debug('Entities transferred total: %s', self.count)
  2625 
  1803 
  2626     cursor.execute(
  1804     cursor.execute(
  2627         'select id, value from result order by id')
  1805         'select id, value from result order by id')
  2628 
  1806 
  2629     for unused_entity_id, entity in cursor:
  1807     for unused_entity_id, entity in cursor:
  2630       yield cPickle.loads(str(entity))
  1808       entity_proto = entity_pb.EntityProto(contents=entity)
       
  1809       yield datastore.Entity._FromPb(entity_proto)
  2631 
  1810 
  2632 
  1811 
  2633 class _ProgressDatabase(_Database):
  1812 class _ProgressDatabase(_Database):
  2634   """Persistently record all progress information during an upload.
  1813   """Persistently record all progress information during an upload.
  2635 
  1814 
  2721       A string to later be used as a unique key to update this state.
  1900       A string to later be used as a unique key to update this state.
  2722     """
  1901     """
  2723     self._OpenSecondaryConnection()
  1902     self._OpenSecondaryConnection()
  2724 
  1903 
  2725     assert _RunningInThread(self.secondary_thread)
  1904     assert _RunningInThread(self.secondary_thread)
  2726     assert not key_start or isinstance(key_start, self.py_type)
  1905     assert (not key_start) or isinstance(key_start, self.py_type), (
  2727     assert not key_end or isinstance(key_end, self.py_type), '%s is a %s' % (
  1906         '%s is a %s, %s expected %s' % (key_start,
  2728         key_end, key_end.__class__)
  1907                                         key_start.__class__,
       
  1908                                         self.__class__.__name__,
       
  1909                                         self.py_type))
       
  1910     assert (not key_end) or isinstance(key_end, self.py_type), (
       
  1911         '%s is a %s, %s expected %s' % (key_end,
       
  1912                                         key_end.__class__,
       
  1913                                         self.__class__.__name__,
       
  1914                                         self.py_type))
  2729     assert KeyLEQ(key_start, key_end), '%s not less than %s' % (
  1915     assert KeyLEQ(key_start, key_end), '%s not less than %s' % (
  2730         repr(key_start), repr(key_end))
  1916         repr(key_start), repr(key_end))
  2731 
  1917 
  2732     self.insert_cursor.execute(
  1918     self.insert_cursor.execute(
  2733         'insert into progress (state, key_start, key_end) values (?, ?, ?)',
  1919         'insert into progress (state, key_start, key_end) values (?, ?, ?)',
  2841   def __init__(self, db_filename, signature):
  2027   def __init__(self, db_filename, signature):
  2842     """Initialize an ExportProgressDatabase."""
  2028     """Initialize an ExportProgressDatabase."""
  2843     _ProgressDatabase.__init__(self,
  2029     _ProgressDatabase.__init__(self,
  2844                                db_filename,
  2030                                db_filename,
  2845                                'TEXT',
  2031                                'TEXT',
  2846                                db.Key,
  2032                                datastore.Key,
  2847                                signature,
  2033                                signature,
  2848                                commit_periodicity=1)
  2034                                commit_periodicity=1)
  2849 
  2035 
  2850   def UseProgressData(self):
  2036   def UseProgressData(self):
  2851     """Check if the progress database contains progress data.
  2037     """Check if the progress database contains progress data.
  3009     """Write the contents of the result database."""
  2195     """Write the contents of the result database."""
  3010     exporter = Exporter.RegisteredExporter(self.kind)
  2196     exporter = Exporter.RegisteredExporter(self.kind)
  3011     exporter.output_entities(self.result_db.AllEntities())
  2197     exporter.output_entities(self.result_db.AllEntities())
  3012 
  2198 
  3013   def UpdateProgress(self, item):
  2199   def UpdateProgress(self, item):
  3014     """Update the state of the given KeyRange.
  2200     """Update the state of the given KeyRangeItem.
  3015 
  2201 
  3016     Args:
  2202     Args:
  3017       item: A KeyRange instance.
  2203       item: A KeyRange instance.
  3018     """
  2204     """
  3019     if item.state == STATE_GOT:
  2205     if item.state == STATE_GOT:
  3020       count = self.result_db.StoreEntities(item.export_result.keys,
  2206       count = self.result_db.StoreEntities(item.download_result.keys,
  3021                                            item.export_result.entities)
  2207                                            item.download_result.entities)
  3022       self.db.DeleteKey(item.progress_key)
  2208       self.db.DeleteKey(item.progress_key)
  3023       self.entities_transferred += count
  2209       self.entities_transferred += count
  3024     else:
  2210     else:
  3025       self.db.UpdateState(item.progress_key, item.state)
  2211       self.db.UpdateState(item.progress_key, item.state)
  3026 
  2212 
  3027 
  2213 
       
  2214 class MapperProgressThread(_ProgressThreadBase):
       
  2215   """A thread to record progress information for maps over the datastore."""
       
  2216 
       
  2217   def __init__(self, kind, progress_queue, progress_db):
       
  2218     """Initialize the MapperProgressThread instance.
       
  2219 
       
  2220     Args:
       
  2221       kind: The kind of entities being stored in the database.
       
  2222       progress_queue: A Queue used for tracking progress information.
       
  2223       progress_db: The database for tracking progress information; should
       
  2224         be an instance of ProgressDatabase.
       
  2225     """
       
  2226     _ProgressThreadBase.__init__(self, progress_queue, progress_db)
       
  2227 
       
  2228     self.kind = kind
       
  2229     self.mapper = Mapper.RegisteredMapper(self.kind)
       
  2230 
       
  2231   def EntitiesTransferred(self):
       
  2232     """Return the total number of unique entities transferred."""
       
  2233     return self.entities_transferred
       
  2234 
       
  2235   def WorkFinished(self):
       
  2236     """Perform actions after map is complete."""
       
  2237     pass
       
  2238 
       
  2239   def UpdateProgress(self, item):
       
  2240     """Update the state of the given KeyRangeItem.
       
  2241 
       
  2242     Args:
       
  2243       item: A KeyRange instance.
       
  2244     """
       
  2245     if item.state == STATE_GOT:
       
  2246       self.entities_transferred += item.count
       
  2247       self.db.DeleteKey(item.progress_key)
       
  2248     else:
       
  2249       self.db.UpdateState(item.progress_key, item.state)
       
  2250 
       
  2251 
  3028 def ParseKey(key_string):
  2252 def ParseKey(key_string):
  3029   """Turn a key stored in the database into a db.Key or None.
  2253   """Turn a key stored in the database into a Key or None.
  3030 
  2254 
  3031   Args:
  2255   Args:
  3032     key_string: The string representation of a db.Key.
  2256     key_string: The string representation of a Key.
  3033 
  2257 
  3034   Returns:
  2258   Returns:
  3035     A db.Key instance or None
  2259     A datastore.Key instance or None
  3036   """
  2260   """
  3037   if not key_string:
  2261   if not key_string:
  3038     return None
  2262     return None
  3039   if key_string == 'None':
  2263   if key_string == 'None':
  3040     return None
  2264     return None
  3041   return db.Key(encoded=key_string)
  2265   return datastore.Key(encoded=key_string)
  3042 
  2266 
  3043 
  2267 
  3044 def Validate(value, typ):
  2268 def Validate(value, typ):
  3045   """Checks that value is non-empty and of the right type.
  2269   """Checks that value is non-empty and of the right type.
  3046 
  2270 
  3095   __properties = None
  2319   __properties = None
  3096 
  2320 
  3097   def __init__(self, kind, properties):
  2321   def __init__(self, kind, properties):
  3098     """Constructor.
  2322     """Constructor.
  3099 
  2323 
  3100     Populates this Loader's kind and properties map. Also registers it with
  2324     Populates this Loader's kind and properties map.
  3101     the bulk loader, so that all you need to do is instantiate your Loader,
       
  3102     and the bulkload handler will automatically use it.
       
  3103 
  2325 
  3104     Args:
  2326     Args:
  3105       kind: a string containing the entity kind that this loader handles
  2327       kind: a string containing the entity kind that this loader handles
  3106 
  2328 
  3107       properties: list of (name, converter) tuples.
  2329       properties: list of (name, converter) tuples.
  3137 
  2359 
  3138     self.__properties = properties
  2360     self.__properties = properties
  3139 
  2361 
  3140   @staticmethod
  2362   @staticmethod
  3141   def RegisterLoader(loader):
  2363   def RegisterLoader(loader):
  3142 
  2364     """Register loader and the Loader instance for its kind.
       
  2365 
       
  2366     Args:
       
  2367       loader: A Loader instance.
       
  2368     """
  3143     Loader.__loaders[loader.kind] = loader
  2369     Loader.__loaders[loader.kind] = loader
  3144 
  2370 
  3145   def alias_old_names(self):
  2371   def alias_old_names(self):
  3146     """Aliases method names so that Loaders defined with old names work."""
  2372     """Aliases method names so that Loaders defined with old names work."""
  3147     aliases = (
  2373     aliases = (
  3164     """Creates a entity from a list of property values.
  2390     """Creates a entity from a list of property values.
  3165 
  2391 
  3166     Args:
  2392     Args:
  3167       values: list/tuple of str
  2393       values: list/tuple of str
  3168       key_name: if provided, the name for the (single) resulting entity
  2394       key_name: if provided, the name for the (single) resulting entity
  3169       parent: A db.Key instance for the parent, or None
  2395       parent: A datastore.Key instance for the parent, or None
  3170 
  2396 
  3171     Returns:
  2397     Returns:
  3172       list of db.Model
  2398       list of db.Model
  3173 
  2399 
  3174       The returned entities are populated with the property values from the
  2400       The returned entities are populated with the property values from the
  3220     This method can be overridden to control the key generation for
  2446     This method can be overridden to control the key generation for
  3221     uploaded entities. The value returned should be None (to use a
  2447     uploaded entities. The value returned should be None (to use a
  3222     server generated numeric key), or a string which neither starts
  2448     server generated numeric key), or a string which neither starts
  3223     with a digit nor has the form __*__ (see
  2449     with a digit nor has the form __*__ (see
  3224     http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html),
  2450     http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html),
  3225     or a db.Key instance.
  2451     or a datastore.Key instance.
  3226 
  2452 
  3227     If you generate your own string keys, keep in mind:
  2453     If you generate your own string keys, keep in mind:
  3228 
  2454 
  3229     1. The key name for each entity must be unique.
  2455     1. The key name for each entity must be unique.
  3230     2. If an entity of the same kind and key already exists in the
  2456     2. If an entity of the same kind and key already exists in the
  3303   def RegisteredLoader(kind):
  2529   def RegisteredLoader(kind):
  3304     """Returns the loader instance for the given kind if it exists."""
  2530     """Returns the loader instance for the given kind if it exists."""
  3305     return Loader.__loaders[kind]
  2531     return Loader.__loaders[kind]
  3306 
  2532 
  3307 
  2533 
       
  2534 class RestoreThread(_ThreadBase):
       
  2535   """A thread to read saved entity_pbs from sqlite3."""
       
  2536   NAME = 'RestoreThread'
       
  2537   _ENTITIES_DONE = 'Entities Done'
       
  2538 
       
  2539   def __init__(self, queue, filename):
       
  2540     _ThreadBase.__init__(self)
       
  2541     self.queue = queue
       
  2542     self.filename = filename
       
  2543 
       
  2544   def PerformWork(self):
       
  2545     db_conn = sqlite3.connect(self.filename)
       
  2546     cursor = db_conn.cursor()
       
  2547     cursor.execute('select id, value from result')
       
  2548     for entity_id, value in cursor:
       
  2549       self.queue.put([entity_id, value], block=True)
       
  2550     self.queue.put(RestoreThread._ENTITIES_DONE, block=True)
       
  2551 
       
  2552 
       
  2553 class RestoreLoader(Loader):
       
  2554   """A Loader which imports protobuffers from a file."""
       
  2555 
       
  2556   def __init__(self, kind):
       
  2557     self.kind = kind
       
  2558 
       
  2559   def initialize(self, filename, loader_opts):
       
  2560     CheckFile(filename)
       
  2561     self.queue = Queue.Queue(1000)
       
  2562     restore_thread = RestoreThread(self.queue, filename)
       
  2563     restore_thread.start()
       
  2564 
       
  2565   def generate_records(self, filename):
       
  2566     while True:
       
  2567       record = self.queue.get(block=True)
       
  2568       if id(record) == id(RestoreThread._ENTITIES_DONE):
       
  2569         break
       
  2570       yield record
       
  2571 
       
  2572   def create_entity(self, values, key_name=None, parent=None):
       
  2573     key = StrKey(unicode(values[0], 'utf-8'))
       
  2574     entity_proto = entity_pb.EntityProto(contents=str(values[1]))
       
  2575     entity_proto.mutable_key().CopyFrom(key._Key__reference)
       
  2576     return datastore.Entity._FromPb(entity_proto)
       
  2577 
       
  2578 
  3308 class Exporter(object):
  2579 class Exporter(object):
  3309   """A base class for serializing datastore entities.
  2580   """A base class for serializing datastore entities.
  3310 
  2581 
  3311   To add a handler for exporting an entity kind from your datastore,
  2582   To add a handler for exporting an entity kind from your datastore,
  3312   write a subclass of this class that calls Exporter.__init__ from your
  2583   write a subclass of this class that calls Exporter.__init__ from your
  3324   __properties = None
  2595   __properties = None
  3325 
  2596 
  3326   def __init__(self, kind, properties):
  2597   def __init__(self, kind, properties):
  3327     """Constructor.
  2598     """Constructor.
  3328 
  2599 
  3329     Populates this Exporters's kind and properties map. Also registers
  2600     Populates this Exporters's kind and properties map.
  3330     it so that all you need to do is instantiate your Exporter, and
       
  3331     the bulkload handler will automatically use it.
       
  3332 
  2601 
  3333     Args:
  2602     Args:
  3334       kind: a string containing the entity kind that this exporter handles
  2603       kind: a string containing the entity kind that this exporter handles
  3335 
  2604 
  3336       properties: list of (name, converter, default) tuples.
  2605       properties: list of (name, converter, default) tuples.
  3368 
  2637 
  3369     self.__properties = properties
  2638     self.__properties = properties
  3370 
  2639 
  3371   @staticmethod
  2640   @staticmethod
  3372   def RegisterExporter(exporter):
  2641   def RegisterExporter(exporter):
  3373 
  2642     """Register exporter and the Exporter instance for its kind.
       
  2643 
       
  2644     Args:
       
  2645       exporter: A Exporter instance.
       
  2646     """
  3374     Exporter.__exporters[exporter.kind] = exporter
  2647     Exporter.__exporters[exporter.kind] = exporter
  3375 
  2648 
  3376   def __ExtractProperties(self, entity):
  2649   def __ExtractProperties(self, entity):
  3377     """Converts an entity into a list of string values.
  2650     """Converts an entity into a list of string values.
  3378 
  2651 
  3386       MissingPropertyError: if an expected field on the entity is missing.
  2659       MissingPropertyError: if an expected field on the entity is missing.
  3387     """
  2660     """
  3388     encoding = []
  2661     encoding = []
  3389     for name, fn, default in self.__properties:
  2662     for name, fn, default in self.__properties:
  3390       try:
  2663       try:
  3391         encoding.append(fn(getattr(entity, name)))
  2664         encoding.append(fn(entity[name]))
  3392       except AttributeError:
  2665       except AttributeError:
  3393         if default is None:
  2666         if default is None:
  3394           raise MissingPropertyError(name)
  2667           raise MissingPropertyError(name)
  3395         else:
  2668         else:
  3396           encoding.append(default)
  2669           encoding.append(default)
  3466   def RegisteredExporter(kind):
  2739   def RegisteredExporter(kind):
  3467     """Returns an exporter instance for the given kind if it exists."""
  2740     """Returns an exporter instance for the given kind if it exists."""
  3468     return Exporter.__exporters[kind]
  2741     return Exporter.__exporters[kind]
  3469 
  2742 
  3470 
  2743 
       
  2744 class DumpExporter(Exporter):
       
  2745   """An exporter which dumps protobuffers to a file."""
       
  2746 
       
  2747   def __init__(self, kind, result_db_filename):
       
  2748     self.kind = kind
       
  2749     self.result_db_filename = result_db_filename
       
  2750 
       
  2751   def output_entities(self, entity_generator):
       
  2752     shutil.copyfile(self.result_db_filename, self.output_filename)
       
  2753 
       
  2754 
       
  2755 class MapperRetry(Error):
       
  2756   """An exception that indicates a non-fatal error during mapping."""
       
  2757 
       
  2758 
       
  2759 class Mapper(object):
       
  2760   """A base class for serializing datastore entities.
       
  2761 
       
  2762   To add a handler for exporting an entity kind from your datastore,
       
  2763   write a subclass of this class that calls Mapper.__init__ from your
       
  2764   class's __init__.
       
  2765 
       
  2766   You need to implement to batch_apply or apply method on your subclass
       
  2767   for the map to do anything.
       
  2768   """
       
  2769 
       
  2770   __mappers = {}
       
  2771   kind = None
       
  2772 
       
  2773   def __init__(self, kind):
       
  2774     """Constructor.
       
  2775 
       
  2776     Populates this Mappers's kind.
       
  2777 
       
  2778     Args:
       
  2779       kind: a string containing the entity kind that this mapper handles
       
  2780     """
       
  2781     Validate(kind, basestring)
       
  2782     self.kind = kind
       
  2783 
       
  2784     GetImplementationClass(kind)
       
  2785 
       
  2786   @staticmethod
       
  2787   def RegisterMapper(mapper):
       
  2788     """Register mapper and the Mapper instance for its kind.
       
  2789 
       
  2790     Args:
       
  2791       mapper: A Mapper instance.
       
  2792     """
       
  2793     Mapper.__mappers[mapper.kind] = mapper
       
  2794 
       
  2795   def initialize(self, mapper_opts):
       
  2796     """Performs initialization.
       
  2797 
       
  2798     Args:
       
  2799       mapper_opts: The string given as the --mapper_opts flag argument.
       
  2800     """
       
  2801     pass
       
  2802 
       
  2803   def finalize(self):
       
  2804     """Performs finalization actions after the download completes."""
       
  2805     pass
       
  2806 
       
  2807   def apply(self, entity):
       
  2808     print 'Default map function doing nothing to %s' % entity
       
  2809 
       
  2810   def batch_apply(self, entities):
       
  2811     for entity in entities:
       
  2812       self.apply(entity)
       
  2813 
       
  2814   @staticmethod
       
  2815   def RegisteredMappers():
       
  2816     """Returns a dictionary of the mapper instances that have been created."""
       
  2817     return dict(Mapper.__mappers)
       
  2818 
       
  2819   @staticmethod
       
  2820   def RegisteredMapper(kind):
       
  2821     """Returns an mapper instance for the given kind if it exists."""
       
  2822     return Mapper.__mappers[kind]
       
  2823 
       
  2824 
  3471 class QueueJoinThread(threading.Thread):
  2825 class QueueJoinThread(threading.Thread):
  3472   """A thread that joins a queue and exits.
  2826   """A thread that joins a queue and exits.
  3473 
  2827 
  3474   Queue joins do not have a timeout.  To simulate a queue join with
  2828   Queue joins do not have a timeout.  To simulate a queue join with
  3475   timeout, run this thread and join it with a timeout.
  2829   timeout, run this thread and join it with a timeout.
  3490     self.queue.join()
  2844     self.queue.join()
  3491 
  2845 
  3492 
  2846 
  3493 def InterruptibleQueueJoin(queue,
  2847 def InterruptibleQueueJoin(queue,
  3494                            thread_local,
  2848                            thread_local,
  3495                            thread_gate,
  2849                            thread_pool,
  3496                            queue_join_thread_factory=QueueJoinThread,
  2850                            queue_join_thread_factory=QueueJoinThread,
  3497                            check_workers=True):
  2851                            check_workers=True):
  3498   """Repeatedly joins the given ReQueue or Queue.Queue with short timeout.
  2852   """Repeatedly joins the given ReQueue or Queue.Queue with short timeout.
  3499 
  2853 
  3500   Between each timeout on the join, worker threads are checked.
  2854   Between each timeout on the join, worker threads are checked.
  3501 
  2855 
  3502   Args:
  2856   Args:
  3503     queue: A Queue.Queue or ReQueue instance.
  2857     queue: A Queue.Queue or ReQueue instance.
  3504     thread_local: A threading.local instance which indicates interrupts.
  2858     thread_local: A threading.local instance which indicates interrupts.
  3505     thread_gate: A ThreadGate instance.
  2859     thread_pool: An AdaptiveThreadPool instance.
  3506     queue_join_thread_factory: Used for dependency injection.
  2860     queue_join_thread_factory: Used for dependency injection.
  3507     check_workers: Whether to interrupt the join on worker death.
  2861     check_workers: Whether to interrupt the join on worker death.
  3508 
  2862 
  3509   Returns:
  2863   Returns:
  3510     True unless the queue join is interrupted by SIGINT or worker death.
  2864     True unless the queue join is interrupted by SIGINT or worker death.
  3517       return True
  2871       return True
  3518     if thread_local.shut_down:
  2872     if thread_local.shut_down:
  3519       logger.debug('Queue join interrupted')
  2873       logger.debug('Queue join interrupted')
  3520       return False
  2874       return False
  3521     if check_workers:
  2875     if check_workers:
  3522       for worker_thread in thread_gate.Threads():
  2876       for worker_thread in thread_pool.Threads():
  3523         if not worker_thread.isAlive():
  2877         if not worker_thread.isAlive():
  3524           return False
  2878           return False
  3525 
  2879 
  3526 
  2880 
  3527 def ShutdownThreads(data_source_thread, work_queue, thread_gate):
  2881 def ShutdownThreads(data_source_thread, thread_pool):
  3528   """Shuts down the worker and data source threads.
  2882   """Shuts down the worker and data source threads.
  3529 
  2883 
  3530   Args:
  2884   Args:
  3531     data_source_thread: A running DataSourceThread instance.
  2885     data_source_thread: A running DataSourceThread instance.
  3532     work_queue: The work queue.
  2886     thread_pool: An AdaptiveThreadPool instance with workers registered.
  3533     thread_gate: A ThreadGate instance with workers registered.
       
  3534   """
  2887   """
  3535   logger.info('An error occurred. Shutting down...')
  2888   logger.info('An error occurred. Shutting down...')
  3536 
  2889 
  3537   data_source_thread.exit_flag = True
  2890   data_source_thread.exit_flag = True
  3538 
  2891 
  3539   for thread in thread_gate.Threads():
  2892   thread_pool.Shutdown()
  3540     thread.exit_flag = True
       
  3541 
       
  3542   for unused_thread in thread_gate.Threads():
       
  3543     thread_gate.EnableThread()
       
  3544 
  2893 
  3545   data_source_thread.join(timeout=3.0)
  2894   data_source_thread.join(timeout=3.0)
  3546   if data_source_thread.isAlive():
  2895   if data_source_thread.isAlive():
  3547     logger.warn('%s hung while trying to exit',
  2896     logger.warn('%s hung while trying to exit',
  3548                 data_source_thread.GetFriendlyName())
  2897                 data_source_thread.GetFriendlyName())
  3549 
       
  3550   while not work_queue.empty():
       
  3551     try:
       
  3552       unused_item = work_queue.get_nowait()
       
  3553       work_queue.task_done()
       
  3554     except Queue.Empty:
       
  3555       pass
       
  3556 
  2898 
  3557 
  2899 
  3558 class BulkTransporterApp(object):
  2900 class BulkTransporterApp(object):
  3559   """Class to wrap bulk transport application functionality."""
  2901   """Class to wrap bulk transport application functionality."""
  3560 
  2902 
  3561   def __init__(self,
  2903   def __init__(self,
  3562                arg_dict,
  2904                arg_dict,
  3563                input_generator_factory,
  2905                input_generator_factory,
  3564                throttle,
  2906                throttle,
  3565                progress_db,
  2907                progress_db,
  3566                workerthread_factory,
       
  3567                progresstrackerthread_factory,
  2908                progresstrackerthread_factory,
  3568                max_queue_size=DEFAULT_QUEUE_SIZE,
  2909                max_queue_size=DEFAULT_QUEUE_SIZE,
  3569                request_manager_factory=RequestManager,
  2910                request_manager_factory=RequestManager,
  3570                datasourcethread_factory=DataSourceThread,
  2911                datasourcethread_factory=DataSourceThread,
  3571                work_queue_factory=ReQueue,
  2912                progress_queue_factory=Queue.Queue,
  3572                progress_queue_factory=Queue.Queue):
  2913                thread_pool_factory=adaptive_thread_pool.AdaptiveThreadPool):
  3573     """Instantiate a BulkTransporterApp.
  2914     """Instantiate a BulkTransporterApp.
  3574 
  2915 
  3575     Uploads or downloads data to or from application using HTTP requests.
  2916     Uploads or downloads data to or from application using HTTP requests.
  3576     When run, the class will spin up a number of threads to read entities
  2917     When run, the class will spin up a number of threads to read entities
  3577     from the data source, pass those to a number of worker threads
  2918     from the data source, pass those to a number of worker threads
  3582     Args:
  2923     Args:
  3583       arg_dict: Dictionary of command line options.
  2924       arg_dict: Dictionary of command line options.
  3584       input_generator_factory: A factory that creates a WorkItem generator.
  2925       input_generator_factory: A factory that creates a WorkItem generator.
  3585       throttle: A Throttle instance.
  2926       throttle: A Throttle instance.
  3586       progress_db: The database to use for replaying/recording progress.
  2927       progress_db: The database to use for replaying/recording progress.
  3587       workerthread_factory: A factory for worker threads.
       
  3588       progresstrackerthread_factory: Used for dependency injection.
  2928       progresstrackerthread_factory: Used for dependency injection.
  3589       max_queue_size: Maximum size of the queues before they should block.
  2929       max_queue_size: Maximum size of the queues before they should block.
  3590       request_manager_factory: Used for dependency injection.
  2930       request_manager_factory: Used for dependency injection.
  3591       datasourcethread_factory: Used for dependency injection.
  2931       datasourcethread_factory: Used for dependency injection.
  3592       work_queue_factory: Used for dependency injection.
       
  3593       progress_queue_factory: Used for dependency injection.
  2932       progress_queue_factory: Used for dependency injection.
       
  2933       thread_pool_factory: Used for dependency injection.
  3594     """
  2934     """
  3595     self.app_id = arg_dict['app_id']
  2935     self.app_id = arg_dict['app_id']
  3596     self.post_url = arg_dict['url']
  2936     self.post_url = arg_dict['url']
  3597     self.kind = arg_dict['kind']
  2937     self.kind = arg_dict['kind']
  3598     self.batch_size = arg_dict['batch_size']
  2938     self.batch_size = arg_dict['batch_size']
  3599     self.input_generator_factory = input_generator_factory
  2939     self.input_generator_factory = input_generator_factory
  3600     self.num_threads = arg_dict['num_threads']
  2940     self.num_threads = arg_dict['num_threads']
  3601     self.email = arg_dict['email']
  2941     self.email = arg_dict['email']
  3602     self.passin = arg_dict['passin']
  2942     self.passin = arg_dict['passin']
       
  2943     self.dry_run = arg_dict['dry_run']
  3603     self.throttle = throttle
  2944     self.throttle = throttle
  3604     self.progress_db = progress_db
  2945     self.progress_db = progress_db
  3605     self.workerthread_factory = workerthread_factory
       
  3606     self.progresstrackerthread_factory = progresstrackerthread_factory
  2946     self.progresstrackerthread_factory = progresstrackerthread_factory
  3607     self.max_queue_size = max_queue_size
  2947     self.max_queue_size = max_queue_size
  3608     self.request_manager_factory = request_manager_factory
  2948     self.request_manager_factory = request_manager_factory
  3609     self.datasourcethread_factory = datasourcethread_factory
  2949     self.datasourcethread_factory = datasourcethread_factory
  3610     self.work_queue_factory = work_queue_factory
       
  3611     self.progress_queue_factory = progress_queue_factory
  2950     self.progress_queue_factory = progress_queue_factory
       
  2951     self.thread_pool_factory = thread_pool_factory
  3612     (scheme,
  2952     (scheme,
  3613      self.host_port, self.url_path,
  2953      self.host_port, self.url_path,
  3614      unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
  2954      unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
  3615     self.secure = (scheme == 'https')
  2955     self.secure = (scheme == 'https')
  3616 
  2956 
  3621       AuthenticationError: If authentication is required and fails.
  2961       AuthenticationError: If authentication is required and fails.
  3622 
  2962 
  3623     Returns:
  2963     Returns:
  3624       Error code suitable for sys.exit, e.g. 0 on success, 1 on failure.
  2964       Error code suitable for sys.exit, e.g. 0 on success, 1 on failure.
  3625     """
  2965     """
  3626     thread_gate = ThreadGate(True)
  2966     self.error = False
       
  2967     thread_pool = self.thread_pool_factory(
       
  2968         self.num_threads, queue_size=self.max_queue_size)
  3627 
  2969 
  3628     self.throttle.Register(threading.currentThread())
  2970     self.throttle.Register(threading.currentThread())
  3629     threading.currentThread().exit_flag = False
  2971     threading.currentThread().exit_flag = False
  3630 
       
  3631     work_queue = self.work_queue_factory(self.max_queue_size)
       
  3632 
  2972 
  3633     progress_queue = self.progress_queue_factory(self.max_queue_size)
  2973     progress_queue = self.progress_queue_factory(self.max_queue_size)
  3634     request_manager = self.request_manager_factory(self.app_id,
  2974     request_manager = self.request_manager_factory(self.app_id,
  3635                                                    self.host_port,
  2975                                                    self.host_port,
  3636                                                    self.url_path,
  2976                                                    self.url_path,
  3637                                                    self.kind,
  2977                                                    self.kind,
  3638                                                    self.throttle,
  2978                                                    self.throttle,
  3639                                                    self.batch_size,
  2979                                                    self.batch_size,
  3640                                                    self.secure,
  2980                                                    self.secure,
  3641                                                    self.email,
  2981                                                    self.email,
  3642                                                    self.passin)
  2982                                                    self.passin,
       
  2983                                                    self.dry_run)
  3643     try:
  2984     try:
  3644       request_manager.Authenticate()
  2985       request_manager.Authenticate()
  3645     except Exception, e:
  2986     except Exception, e:
       
  2987       self.error = True
  3646       if not isinstance(e, urllib2.HTTPError) or (
  2988       if not isinstance(e, urllib2.HTTPError) or (
  3647           e.code != 302 and e.code != 401):
  2989           e.code != 302 and e.code != 401):
  3648         logger.exception('Exception during authentication')
  2990         logger.exception('Exception during authentication')
  3649       raise AuthenticationError()
  2991       raise AuthenticationError()
  3650     if (request_manager.auth_called and
  2992     if (request_manager.auth_called and
  3651         not request_manager.authenticated):
  2993         not request_manager.authenticated):
       
  2994       self.error = True
  3652       raise AuthenticationError('Authentication failed')
  2995       raise AuthenticationError('Authentication failed')
  3653 
  2996 
  3654     for unused_idx in xrange(self.num_threads):
  2997     for thread in thread_pool.Threads():
  3655       thread = self.workerthread_factory(work_queue,
       
  3656                                          self.throttle,
       
  3657                                          thread_gate,
       
  3658                                          request_manager,
       
  3659                                          self.num_threads,
       
  3660                                          self.batch_size)
       
  3661       self.throttle.Register(thread)
  2998       self.throttle.Register(thread)
  3662       thread_gate.Register(thread)
       
  3663 
  2999 
  3664     self.progress_thread = self.progresstrackerthread_factory(
  3000     self.progress_thread = self.progresstrackerthread_factory(
  3665         progress_queue, self.progress_db)
  3001         progress_queue, self.progress_db)
  3666 
  3002 
  3667     if self.progress_db.UseProgressData():
  3003     if self.progress_db.UseProgressData():
  3669       progress_generator_factory = self.progress_db.GetProgressStatusGenerator
  3005       progress_generator_factory = self.progress_db.GetProgressStatusGenerator
  3670     else:
  3006     else:
  3671       progress_generator_factory = None
  3007       progress_generator_factory = None
  3672 
  3008 
  3673     self.data_source_thread = (
  3009     self.data_source_thread = (
  3674         self.datasourcethread_factory(work_queue,
  3010         self.datasourcethread_factory(request_manager,
       
  3011                                       thread_pool,
  3675                                       progress_queue,
  3012                                       progress_queue,
  3676                                       self.input_generator_factory,
  3013                                       self.input_generator_factory,
  3677                                       progress_generator_factory))
  3014                                       progress_generator_factory))
  3678 
  3015 
  3679     thread_local = threading.local()
  3016     thread_local = threading.local()
  3680     thread_local.shut_down = False
  3017     thread_local.shut_down = False
  3681 
  3018 
  3682     def Interrupt(unused_signum, unused_frame):
  3019     def Interrupt(unused_signum, unused_frame):
  3683       """Shutdown gracefully in response to a signal."""
  3020       """Shutdown gracefully in response to a signal."""
  3684       thread_local.shut_down = True
  3021       thread_local.shut_down = True
       
  3022       self.error = True
  3685 
  3023 
  3686     signal.signal(signal.SIGINT, Interrupt)
  3024     signal.signal(signal.SIGINT, Interrupt)
  3687 
  3025 
  3688     self.progress_thread.start()
  3026     self.progress_thread.start()
  3689     self.data_source_thread.start()
  3027     self.data_source_thread.start()
  3690     for thread in thread_gate.Threads():
       
  3691       thread.start()
       
  3692 
  3028 
  3693 
  3029 
  3694     while not thread_local.shut_down:
  3030     while not thread_local.shut_down:
  3695       self.data_source_thread.join(timeout=0.25)
  3031       self.data_source_thread.join(timeout=0.25)
  3696 
  3032 
  3697       if self.data_source_thread.isAlive():
  3033       if self.data_source_thread.isAlive():
  3698         for thread in list(thread_gate.Threads()) + [self.progress_thread]:
  3034         for thread in list(thread_pool.Threads()) + [self.progress_thread]:
  3699           if not thread.isAlive():
  3035           if not thread.isAlive():
  3700             logger.info('Unexpected thread death: %s', thread.getName())
  3036             logger.info('Unexpected thread death: %s', thread.getName())
  3701             thread_local.shut_down = True
  3037             thread_local.shut_down = True
       
  3038             self.error = True
  3702             break
  3039             break
  3703       else:
  3040       else:
  3704         break
  3041         break
  3705 
       
  3706     if thread_local.shut_down:
       
  3707       ShutdownThreads(self.data_source_thread, work_queue, thread_gate)
       
  3708 
  3042 
  3709     def _Join(ob, msg):
  3043     def _Join(ob, msg):
  3710       logger.debug('Waiting for %s...', msg)
  3044       logger.debug('Waiting for %s...', msg)
  3711       if isinstance(ob, threading.Thread):
  3045       if isinstance(ob, threading.Thread):
  3712         ob.join(timeout=3.0)
  3046         ob.join(timeout=3.0)
  3713         if ob.isAlive():
  3047         if ob.isAlive():
  3714           logger.debug('Joining %s failed', ob.GetFriendlyName())
  3048           logger.debug('Joining %s failed', ob)
  3715         else:
  3049         else:
  3716           logger.debug('... done.')
  3050           logger.debug('... done.')
  3717       elif isinstance(ob, (Queue.Queue, ReQueue)):
  3051       elif isinstance(ob, (Queue.Queue, ReQueue)):
  3718         if not InterruptibleQueueJoin(ob, thread_local, thread_gate):
  3052         if not InterruptibleQueueJoin(ob, thread_local, thread_pool):
  3719           ShutdownThreads(self.data_source_thread, work_queue, thread_gate)
  3053           ShutdownThreads(self.data_source_thread, thread_pool)
  3720       else:
  3054       else:
  3721         ob.join()
  3055         ob.join()
  3722         logger.debug('... done.')
  3056         logger.debug('... done.')
  3723 
  3057 
  3724     _Join(work_queue, 'work_queue to flush')
  3058     if self.data_source_thread.error or thread_local.shut_down:
  3725 
  3059       ShutdownThreads(self.data_source_thread, thread_pool)
  3726     for unused_thread in thread_gate.Threads():
  3060     else:
  3727       work_queue.put(_THREAD_SHOULD_EXIT)
  3061       _Join(thread_pool.requeue, 'worker threads to finish')
  3728 
  3062 
  3729     for unused_thread in thread_gate.Threads():
  3063     thread_pool.Shutdown()
  3730       thread_gate.EnableThread()
  3064     thread_pool.JoinThreads()
  3731 
  3065     thread_pool.CheckErrors()
  3732     for thread in thread_gate.Threads():
  3066     print ''
  3733       _Join(thread, 'thread [%s] to terminate' % thread.getName())
       
  3734 
       
  3735       thread.CheckError()
       
  3736 
  3067 
  3737     if self.progress_thread.isAlive():
  3068     if self.progress_thread.isAlive():
  3738       InterruptibleQueueJoin(progress_queue, thread_local, thread_gate,
  3069       InterruptibleQueueJoin(progress_queue, thread_local, thread_pool,
  3739                              check_workers=False)
  3070                              check_workers=False)
  3740     else:
  3071     else:
  3741       logger.warn('Progress thread exited prematurely')
  3072       logger.warn('Progress thread exited prematurely')
  3742 
  3073 
  3743     progress_queue.put(_THREAD_SHOULD_EXIT)
  3074     progress_queue.put(_THREAD_SHOULD_EXIT)
  3761   def __init__(self, *args, **kwargs):
  3092   def __init__(self, *args, **kwargs):
  3762     BulkTransporterApp.__init__(self, *args, **kwargs)
  3093     BulkTransporterApp.__init__(self, *args, **kwargs)
  3763 
  3094 
  3764   def ReportStatus(self):
  3095   def ReportStatus(self):
  3765     """Display a message reporting the final status of the transfer."""
  3096     """Display a message reporting the final status of the transfer."""
  3766     total_up, duration = self.throttle.TotalTransferred(BANDWIDTH_UP)
  3097     total_up, duration = self.throttle.TotalTransferred(
       
  3098         remote_api_throttle.BANDWIDTH_UP)
  3767     s_total_up, unused_duration = self.throttle.TotalTransferred(
  3099     s_total_up, unused_duration = self.throttle.TotalTransferred(
  3768         HTTPS_BANDWIDTH_UP)
  3100         remote_api_throttle.HTTPS_BANDWIDTH_UP)
  3769     total_up += s_total_up
  3101     total_up += s_total_up
  3770     total = total_up
  3102     total = total_up
  3771     logger.info('%d entites total, %d previously transferred',
  3103     logger.info('%d entites total, %d previously transferred',
  3772                 self.data_source_thread.read_count,
  3104                 self.data_source_thread.read_count,
  3773                 self.data_source_thread.xfer_count)
  3105                 self.data_source_thread.xfer_count)
  3791   def __init__(self, *args, **kwargs):
  3123   def __init__(self, *args, **kwargs):
  3792     BulkTransporterApp.__init__(self, *args, **kwargs)
  3124     BulkTransporterApp.__init__(self, *args, **kwargs)
  3793 
  3125 
  3794   def ReportStatus(self):
  3126   def ReportStatus(self):
  3795     """Display a message reporting the final status of the transfer."""
  3127     """Display a message reporting the final status of the transfer."""
  3796     total_down, duration = self.throttle.TotalTransferred(BANDWIDTH_DOWN)
  3128     total_down, duration = self.throttle.TotalTransferred(
       
  3129         remote_api_throttle.BANDWIDTH_DOWN)
  3797     s_total_down, unused_duration = self.throttle.TotalTransferred(
  3130     s_total_down, unused_duration = self.throttle.TotalTransferred(
  3798         HTTPS_BANDWIDTH_DOWN)
  3131         remote_api_throttle.HTTPS_BANDWIDTH_DOWN)
  3799     total_down += s_total_down
  3132     total_down += s_total_down
  3800     total = total_down
  3133     total = total_down
  3801     existing_count = self.progress_thread.existing_count
  3134     existing_count = self.progress_thread.existing_count
  3802     xfer_count = self.progress_thread.EntitiesTransferred()
  3135     xfer_count = self.progress_thread.EntitiesTransferred()
  3803     logger.info('Have %d entities, %d previously transferred',
  3136     logger.info('Have %d entities, %d previously transferred',
  3804                 xfer_count + existing_count, existing_count)
  3137                 xfer_count, existing_count)
  3805     logger.info('%d entities (%d bytes) transferred in %.1f seconds',
  3138     logger.info('%d entities (%d bytes) transferred in %.1f seconds',
  3806                 xfer_count, total, duration)
  3139                 xfer_count, total, duration)
  3807     return 0
  3140     if self.error:
       
  3141       return 1
       
  3142     else:
       
  3143       return 0
       
  3144 
       
  3145 
       
  3146 class BulkMapperApp(BulkTransporterApp):
       
  3147   """Class to encapsulate bulk map functionality."""
       
  3148 
       
  3149   def __init__(self, *args, **kwargs):
       
  3150     BulkTransporterApp.__init__(self, *args, **kwargs)
       
  3151 
       
  3152   def ReportStatus(self):
       
  3153     """Display a message reporting the final status of the transfer."""
       
  3154     total_down, duration = self.throttle.TotalTransferred(
       
  3155         remote_api_throttle.BANDWIDTH_DOWN)
       
  3156     s_total_down, unused_duration = self.throttle.TotalTransferred(
       
  3157         remote_api_throttle.HTTPS_BANDWIDTH_DOWN)
       
  3158     total_down += s_total_down
       
  3159     total = total_down
       
  3160     xfer_count = self.progress_thread.EntitiesTransferred()
       
  3161     logger.info('The following may be inaccurate if any mapper tasks '
       
  3162                 'encountered errors and had to be retried.')
       
  3163     logger.info('Applied mapper to %s entities.',
       
  3164                  xfer_count)
       
  3165     logger.info('%s entities (%s bytes) transferred in %.1f seconds',
       
  3166                  xfer_count, total, duration)
       
  3167     if self.error:
       
  3168       return 1
       
  3169     else:
       
  3170       return 0
  3808 
  3171 
  3809 
  3172 
  3810 def PrintUsageExit(code):
  3173 def PrintUsageExit(code):
  3811   """Prints usage information and exits with a status code.
  3174   """Prints usage information and exits with a status code.
  3812 
  3175 
  3841              'result_db_filename=',
  3204              'result_db_filename=',
  3842              'download',
  3205              'download',
  3843              'loader_opts=',
  3206              'loader_opts=',
  3844              'exporter_opts=',
  3207              'exporter_opts=',
  3845              'log_file=',
  3208              'log_file=',
       
  3209              'mapper_opts=',
  3846              'email=',
  3210              'email=',
  3847              'passin',
  3211              'passin',
       
  3212              'map',
       
  3213              'dry_run',
       
  3214              'dump',
       
  3215              'restore',
  3848              ]
  3216              ]
  3849 
  3217 
  3850 
  3218 
  3851 def ParseArguments(argv):
  3219 def ParseArguments(argv, die_fn=lambda: PrintUsageExit(1)):
  3852   """Parses command-line arguments.
  3220   """Parses command-line arguments.
  3853 
  3221 
  3854   Prints out a help message if -h or --help is supplied.
  3222   Prints out a help message if -h or --help is supplied.
  3855 
  3223 
  3856   Args:
  3224   Args:
  3857     argv: List of command-line arguments.
  3225     argv: List of command-line arguments.
       
  3226     die_fn: Function to invoke to end the program.
  3858 
  3227 
  3859   Returns:
  3228   Returns:
  3860     A dictionary containing the value of command-line options.
  3229     A dictionary containing the value of command-line options.
  3861   """
  3230   """
  3862   opts, unused_args = getopt.getopt(
  3231   opts, unused_args = getopt.getopt(
  3865       FLAG_SPEC)
  3234       FLAG_SPEC)
  3866 
  3235 
  3867   arg_dict = {}
  3236   arg_dict = {}
  3868 
  3237 
  3869   arg_dict['url'] = REQUIRED_OPTION
  3238   arg_dict['url'] = REQUIRED_OPTION
  3870   arg_dict['filename'] = REQUIRED_OPTION
  3239   arg_dict['filename'] = None
  3871   arg_dict['config_file'] = REQUIRED_OPTION
  3240   arg_dict['config_file'] = None
  3872   arg_dict['kind'] = REQUIRED_OPTION
  3241   arg_dict['kind'] = None
  3873 
  3242 
  3874   arg_dict['batch_size'] = DEFAULT_BATCH_SIZE
  3243   arg_dict['batch_size'] = None
  3875   arg_dict['num_threads'] = DEFAULT_THREAD_COUNT
  3244   arg_dict['num_threads'] = DEFAULT_THREAD_COUNT
  3876   arg_dict['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT
  3245   arg_dict['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT
  3877   arg_dict['rps_limit'] = DEFAULT_RPS_LIMIT
  3246   arg_dict['rps_limit'] = DEFAULT_RPS_LIMIT
  3878   arg_dict['http_limit'] = DEFAULT_REQUEST_LIMIT
  3247   arg_dict['http_limit'] = DEFAULT_REQUEST_LIMIT
  3879 
  3248 
  3887   arg_dict['exporter_opts'] = None
  3256   arg_dict['exporter_opts'] = None
  3888   arg_dict['debug'] = False
  3257   arg_dict['debug'] = False
  3889   arg_dict['log_file'] = None
  3258   arg_dict['log_file'] = None
  3890   arg_dict['email'] = None
  3259   arg_dict['email'] = None
  3891   arg_dict['passin'] = False
  3260   arg_dict['passin'] = False
       
  3261   arg_dict['mapper_opts'] = None
       
  3262   arg_dict['map'] = False
       
  3263   arg_dict['dry_run'] = False
       
  3264   arg_dict['dump'] = False
       
  3265   arg_dict['restore'] = False
  3892 
  3266 
  3893   def ExpandFilename(filename):
  3267   def ExpandFilename(filename):
  3894     """Expand shell variables and ~usernames in filename."""
  3268     """Expand shell variables and ~usernames in filename."""
  3895     return os.path.expandvars(os.path.expanduser(filename))
  3269     return os.path.expandvars(os.path.expanduser(filename))
  3896 
  3270 
  3936     elif option == '--loader_opts':
  3310     elif option == '--loader_opts':
  3937       arg_dict['loader_opts'] = value
  3311       arg_dict['loader_opts'] = value
  3938     elif option == '--exporter_opts':
  3312     elif option == '--exporter_opts':
  3939       arg_dict['exporter_opts'] = value
  3313       arg_dict['exporter_opts'] = value
  3940     elif option == '--log_file':
  3314     elif option == '--log_file':
  3941       arg_dict['log_file'] = value
  3315       arg_dict['log_file'] = ExpandFilename(value)
  3942     elif option == '--email':
  3316     elif option == '--email':
  3943       arg_dict['email'] = value
  3317       arg_dict['email'] = value
  3944     elif option == '--passin':
  3318     elif option == '--passin':
  3945       arg_dict['passin'] = True
  3319       arg_dict['passin'] = True
  3946 
  3320     elif option == '--map':
  3947   return ProcessArguments(arg_dict, die_fn=lambda: PrintUsageExit(1))
  3321       arg_dict['map'] = True
       
  3322     elif option == '--mapper_opts':
       
  3323       arg_dict['mapper_opts'] = value
       
  3324     elif option == '--dry_run':
       
  3325       arg_dict['dry_run'] = True
       
  3326     elif option == '--dump':
       
  3327       arg_dict['dump'] = True
       
  3328     elif option == '--restore':
       
  3329       arg_dict['restore'] = True
       
  3330 
       
  3331   return ProcessArguments(arg_dict, die_fn=die_fn)
  3948 
  3332 
  3949 
  3333 
  3950 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
  3334 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
  3951   """Return a dictionary indicating the throttle options."""
  3335   """Return a dictionary indicating the throttle options."""
  3952   return {
  3336   bulkloader_limits = dict(remote_api_throttle.NO_LIMITS)
  3953       BANDWIDTH_UP: bandwidth_limit,
  3337   bulkloader_limits.update({
  3954       BANDWIDTH_DOWN: bandwidth_limit,
  3338       remote_api_throttle.BANDWIDTH_UP: bandwidth_limit,
  3955       REQUESTS: http_limit,
  3339       remote_api_throttle.BANDWIDTH_DOWN: bandwidth_limit,
  3956       HTTPS_BANDWIDTH_UP: bandwidth_limit / 5,
  3340       remote_api_throttle.REQUESTS: http_limit,
  3957       HTTPS_BANDWIDTH_DOWN: bandwidth_limit / 5,
  3341       remote_api_throttle.HTTPS_BANDWIDTH_UP: bandwidth_limit,
  3958       HTTPS_REQUESTS: http_limit / 5,
  3342       remote_api_throttle.HTTPS_BANDWIDTH_DOWN: bandwidth_limit,
  3959       RECORDS: rps_limit,
  3343       remote_api_throttle.HTTPS_REQUESTS: http_limit,
  3960   }
  3344       remote_api_throttle.ENTITIES_FETCHED: rps_limit,
       
  3345       remote_api_throttle.ENTITIES_MODIFIED: rps_limit,
       
  3346   })
       
  3347   return bulkloader_limits
  3961 
  3348 
  3962 
  3349 
  3963 def CheckOutputFile(filename):
  3350 def CheckOutputFile(filename):
  3964   """Check that the given file does not exist and can be opened for writing.
  3351   """Check that the given file does not exist and can be opened for writing.
  3965 
  3352 
  3967     filename: The name of the file.
  3354     filename: The name of the file.
  3968 
  3355 
  3969   Raises:
  3356   Raises:
  3970     FileExistsError: if the given filename is not found
  3357     FileExistsError: if the given filename is not found
  3971     FileNotWritableError: if the given filename is not readable.
  3358     FileNotWritableError: if the given filename is not readable.
  3972   """
  3359     """
  3973   if os.path.exists(filename):
  3360   full_path = os.path.abspath(filename)
       
  3361   if os.path.exists(full_path):
  3974     raise FileExistsError('%s: output file exists' % filename)
  3362     raise FileExistsError('%s: output file exists' % filename)
  3975   elif not os.access(os.path.dirname(filename), os.W_OK):
  3363   elif not os.access(os.path.dirname(full_path), os.W_OK):
  3976     raise FileNotWritableError(
  3364     raise FileNotWritableError(
  3977         '%s: not writable' % os.path.dirname(filename))
  3365         '%s: not writable' % os.path.dirname(full_path))
  3978 
  3366 
  3979 
  3367 
  3980 def LoadConfig(config_file_name, exit_fn=sys.exit):
  3368 def LoadConfig(config_file_name, exit_fn=sys.exit):
  3981   """Loads a config file and registers any Loader classes present.
  3369   """Loads a config file and registers any Loader classes present.
  3982 
  3370 
  3997           Loader.RegisterLoader(cls())
  3385           Loader.RegisterLoader(cls())
  3998 
  3386 
  3999       if hasattr(bulkloader_config, 'exporters'):
  3387       if hasattr(bulkloader_config, 'exporters'):
  4000         for cls in bulkloader_config.exporters:
  3388         for cls in bulkloader_config.exporters:
  4001           Exporter.RegisterExporter(cls())
  3389           Exporter.RegisterExporter(cls())
       
  3390 
       
  3391       if hasattr(bulkloader_config, 'mappers'):
       
  3392         for cls in bulkloader_config.mappers:
       
  3393           Mapper.RegisterMapper(cls())
       
  3394 
  4002     except NameError, e:
  3395     except NameError, e:
  4003       m = re.search(r"[^']*'([^']*)'.*", str(e))
  3396       m = re.search(r"[^']*'([^']*)'.*", str(e))
  4004       if m.groups() and m.group(1) == 'Loader':
  3397       if m.groups() and m.group(1) == 'Loader':
  4005         print >>sys.stderr, """
  3398         print >>sys.stderr, """
  4006 The config file format has changed and you appear to be using an old-style
  3399 The config file format has changed and you appear to be using an old-style
  4056 
  3449 
  4057 def _MakeSignature(app_id=None,
  3450 def _MakeSignature(app_id=None,
  4058                    url=None,
  3451                    url=None,
  4059                    kind=None,
  3452                    kind=None,
  4060                    db_filename=None,
  3453                    db_filename=None,
       
  3454                    perform_map=None,
  4061                    download=None,
  3455                    download=None,
  4062                    has_header=None,
  3456                    has_header=None,
  4063                    result_db_filename=None):
  3457                    result_db_filename=None,
       
  3458                    dump=None,
       
  3459                    restore=None):
  4064   """Returns a string that identifies the important options for the database."""
  3460   """Returns a string that identifies the important options for the database."""
  4065   if download:
  3461   if download:
  4066     result_db_line = 'result_db: %s' % result_db_filename
  3462     result_db_line = 'result_db: %s' % result_db_filename
  4067   else:
  3463   else:
  4068     result_db_line = ''
  3464     result_db_line = ''
  4069   return u"""
  3465   return u"""
  4070   app_id: %s
  3466   app_id: %s
  4071   url: %s
  3467   url: %s
  4072   kind: %s
  3468   kind: %s
  4073   download: %s
  3469   download: %s
       
  3470   map: %s
       
  3471   dump: %s
       
  3472   restore: %s
  4074   progress_db: %s
  3473   progress_db: %s
  4075   has_header: %s
  3474   has_header: %s
  4076   %s
  3475   %s
  4077   """ % (app_id, url, kind, download, db_filename, has_header, result_db_line)
  3476   """ % (app_id, url, kind, download, perform_map, dump, restore, db_filename,
       
  3477          has_header, result_db_line)
  4078 
  3478 
  4079 
  3479 
  4080 def ProcessArguments(arg_dict,
  3480 def ProcessArguments(arg_dict,
  4081                      die_fn=lambda: sys.exit(1)):
  3481                      die_fn=lambda: sys.exit(1)):
  4082   """Processes non command-line input arguments.
  3482   """Processes non command-line input arguments.
  4088   Returns:
  3488   Returns:
  4089     A dictionary of bulkloader options.
  3489     A dictionary of bulkloader options.
  4090   """
  3490   """
  4091   app_id = GetArgument(arg_dict, 'app_id', die_fn)
  3491   app_id = GetArgument(arg_dict, 'app_id', die_fn)
  4092   url = GetArgument(arg_dict, 'url', die_fn)
  3492   url = GetArgument(arg_dict, 'url', die_fn)
       
  3493   dump = GetArgument(arg_dict, 'dump', die_fn)
       
  3494   restore = GetArgument(arg_dict, 'restore', die_fn)
  4093   filename = GetArgument(arg_dict, 'filename', die_fn)
  3495   filename = GetArgument(arg_dict, 'filename', die_fn)
  4094   batch_size = GetArgument(arg_dict, 'batch_size', die_fn)
  3496   batch_size = GetArgument(arg_dict, 'batch_size', die_fn)
  4095   kind = GetArgument(arg_dict, 'kind', die_fn)
  3497   kind = GetArgument(arg_dict, 'kind', die_fn)
  4096   db_filename = GetArgument(arg_dict, 'db_filename', die_fn)
  3498   db_filename = GetArgument(arg_dict, 'db_filename', die_fn)
  4097   config_file = GetArgument(arg_dict, 'config_file', die_fn)
  3499   config_file = GetArgument(arg_dict, 'config_file', die_fn)
  4098   result_db_filename = GetArgument(arg_dict, 'result_db_filename', die_fn)
  3500   result_db_filename = GetArgument(arg_dict, 'result_db_filename', die_fn)
  4099   download = GetArgument(arg_dict, 'download', die_fn)
  3501   download = GetArgument(arg_dict, 'download', die_fn)
  4100   log_file = GetArgument(arg_dict, 'log_file', die_fn)
  3502   log_file = GetArgument(arg_dict, 'log_file', die_fn)
  4101 
  3503   perform_map = GetArgument(arg_dict, 'map', die_fn)
  4102   unused_passin = GetArgument(arg_dict, 'passin', die_fn)
       
  4103   unused_email = GetArgument(arg_dict, 'email', die_fn)
       
  4104   unused_debug = GetArgument(arg_dict, 'debug', die_fn)
       
  4105   unused_num_threads = GetArgument(arg_dict, 'num_threads', die_fn)
       
  4106   unused_bandwidth_limit = GetArgument(arg_dict, 'bandwidth_limit', die_fn)
       
  4107   unused_rps_limit = GetArgument(arg_dict, 'rps_limit', die_fn)
       
  4108   unused_http_limit = GetArgument(arg_dict, 'http_limit', die_fn)
       
  4109   unused_auth_domain = GetArgument(arg_dict, 'auth_domain', die_fn)
       
  4110   unused_has_headers = GetArgument(arg_dict, 'has_header', die_fn)
       
  4111   unused_loader_opts = GetArgument(arg_dict, 'loader_opts', die_fn)
       
  4112   unused_exporter_opts = GetArgument(arg_dict, 'exporter_opts', die_fn)
       
  4113 
  3504 
  4114   errors = []
  3505   errors = []
       
  3506 
       
  3507   if batch_size is None:
       
  3508     if download or perform_map:
       
  3509       arg_dict['batch_size'] = DEFAULT_DOWNLOAD_BATCH_SIZE
       
  3510     else:
       
  3511       arg_dict['batch_size'] = DEFAULT_BATCH_SIZE
       
  3512   elif batch_size <= 0:
       
  3513     errors.append('batch_size must be at least 1')
  4115 
  3514 
  4116   if db_filename is None:
  3515   if db_filename is None:
  4117     arg_dict['db_filename'] = time.strftime(
  3516     arg_dict['db_filename'] = time.strftime(
  4118         'bulkloader-progress-%Y%m%d.%H%M%S.sql3')
  3517         'bulkloader-progress-%Y%m%d.%H%M%S.sql3')
  4119 
  3518 
  4122         'bulkloader-results-%Y%m%d.%H%M%S.sql3')
  3521         'bulkloader-results-%Y%m%d.%H%M%S.sql3')
  4123 
  3522 
  4124   if log_file is None:
  3523   if log_file is None:
  4125     arg_dict['log_file'] = time.strftime('bulkloader-log-%Y%m%d.%H%M%S')
  3524     arg_dict['log_file'] = time.strftime('bulkloader-log-%Y%m%d.%H%M%S')
  4126 
  3525 
  4127   if batch_size <= 0:
       
  4128     errors.append('batch_size must be at least 1')
       
  4129 
       
  4130   required = '%s argument required'
  3526   required = '%s argument required'
       
  3527 
       
  3528   if config_file is None and not dump and not restore:
       
  3529     errors.append('One of --config_file, --dump, or --restore is required')
  4131 
  3530 
  4132   if url is REQUIRED_OPTION:
  3531   if url is REQUIRED_OPTION:
  4133     errors.append(required % 'url')
  3532     errors.append(required % 'url')
  4134 
  3533 
  4135   if filename is REQUIRED_OPTION:
  3534   if not filename and not perform_map:
  4136     errors.append(required % 'filename')
  3535     errors.append(required % 'filename')
  4137 
  3536 
  4138   if kind is REQUIRED_OPTION:
  3537   if kind is None:
  4139     errors.append(required % 'kind')
  3538     if download or map:
  4140 
  3539       errors.append('kind argument required for this operation')
  4141   if config_file is REQUIRED_OPTION:
  3540     elif not dump and not restore:
  4142     errors.append(required % 'config_file')
  3541       errors.append(
  4143 
  3542           'kind argument required unless --dump or --restore is specified')
  4144   if download:
       
  4145     if result_db_filename is REQUIRED_OPTION:
       
  4146       errors.append(required % 'result_db_filename')
       
  4147 
  3543 
  4148   if not app_id:
  3544   if not app_id:
  4149     (unused_scheme, host_port, unused_url_path,
  3545     if url and url is not REQUIRED_OPTION:
  4150      unused_query, unused_fragment) = urlparse.urlsplit(url)
  3546       (unused_scheme, host_port, unused_url_path,
  4151     suffix_idx = host_port.find('.appspot.com')
  3547        unused_query, unused_fragment) = urlparse.urlsplit(url)
  4152     if suffix_idx > -1:
  3548       suffix_idx = host_port.find('.appspot.com')
  4153       arg_dict['app_id'] = host_port[:suffix_idx]
  3549       if suffix_idx > -1:
  4154     elif host_port.split(':')[0].endswith('google.com'):
  3550         arg_dict['app_id'] = host_port[:suffix_idx]
  4155       arg_dict['app_id'] = host_port.split('.')[0]
  3551       elif host_port.split(':')[0].endswith('google.com'):
  4156     else:
  3552         arg_dict['app_id'] = host_port.split('.')[0]
  4157       errors.append('app_id argument required for non appspot.com domains')
  3553       else:
       
  3554         errors.append('app_id argument required for non appspot.com domains')
  4158 
  3555 
  4159   if errors:
  3556   if errors:
  4160     print >>sys.stderr, '\n'.join(errors)
  3557     print >>sys.stderr, '\n'.join(errors)
  4161     die_fn()
  3558     die_fn()
  4162 
  3559 
  4201   has_header = arg_dict['has_header']
  3598   has_header = arg_dict['has_header']
  4202   download = arg_dict['download']
  3599   download = arg_dict['download']
  4203   result_db_filename = arg_dict['result_db_filename']
  3600   result_db_filename = arg_dict['result_db_filename']
  4204   loader_opts = arg_dict['loader_opts']
  3601   loader_opts = arg_dict['loader_opts']
  4205   exporter_opts = arg_dict['exporter_opts']
  3602   exporter_opts = arg_dict['exporter_opts']
       
  3603   mapper_opts = arg_dict['mapper_opts']
  4206   email = arg_dict['email']
  3604   email = arg_dict['email']
  4207   passin = arg_dict['passin']
  3605   passin = arg_dict['passin']
       
  3606   perform_map = arg_dict['map']
       
  3607   dump = arg_dict['dump']
       
  3608   restore = arg_dict['restore']
  4208 
  3609 
  4209   os.environ['AUTH_DOMAIN'] = auth_domain
  3610   os.environ['AUTH_DOMAIN'] = auth_domain
  4210 
  3611 
  4211   kind = ParseKind(kind)
  3612   kind = ParseKind(kind)
  4212 
  3613 
  4213   check_file(config_file)
  3614   if not dump and not restore:
  4214   if not download:
  3615     check_file(config_file)
       
  3616 
       
  3617   if download and perform_map:
       
  3618     logger.error('--download and --map are mutually exclusive.')
       
  3619 
       
  3620   if download or dump:
       
  3621     check_output_file(filename)
       
  3622   elif not perform_map:
  4215     check_file(filename)
  3623     check_file(filename)
       
  3624 
       
  3625   if dump:
       
  3626     Exporter.RegisterExporter(DumpExporter(kind, result_db_filename))
       
  3627   elif restore:
       
  3628     Loader.RegisterLoader(RestoreLoader(kind))
  4216   else:
  3629   else:
  4217     check_output_file(filename)
  3630     LoadConfig(config_file)
  4218 
       
  4219   LoadConfig(config_file)
       
  4220 
  3631 
  4221   os.environ['APPLICATION_ID'] = app_id
  3632   os.environ['APPLICATION_ID'] = app_id
  4222 
  3633 
  4223   throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
  3634   throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
  4224 
  3635   logger.info('Throttling transfers:')
  4225   throttle = Throttle(layout=throttle_layout)
  3636   logger.info('Bandwidth: %s bytes/second', bandwidth_limit)
       
  3637   logger.info('HTTP connections: %s/second', http_limit)
       
  3638   logger.info('Entities inserted/fetched/modified: %s/second', rps_limit)
       
  3639 
       
  3640   throttle = remote_api_throttle.Throttle(layout=throttle_layout)
  4226   signature = _MakeSignature(app_id=app_id,
  3641   signature = _MakeSignature(app_id=app_id,
  4227                              url=url,
  3642                              url=url,
  4228                              kind=kind,
  3643                              kind=kind,
  4229                              db_filename=db_filename,
  3644                              db_filename=db_filename,
  4230                              download=download,
  3645                              download=download,
       
  3646                              perform_map=perform_map,
  4231                              has_header=has_header,
  3647                              has_header=has_header,
  4232                              result_db_filename=result_db_filename)
  3648                              result_db_filename=result_db_filename,
       
  3649                              dump=dump,
       
  3650                              restore=restore)
  4233 
  3651 
  4234 
  3652 
  4235   max_queue_size = max(DEFAULT_QUEUE_SIZE, 3 * num_threads + 5)
  3653   max_queue_size = max(DEFAULT_QUEUE_SIZE, 3 * num_threads + 5)
  4236 
  3654 
  4237   if db_filename == 'skip':
  3655   if db_filename == 'skip':
  4238     progress_db = StubProgressDatabase()
  3656     progress_db = StubProgressDatabase()
  4239   elif not download:
  3657   elif not download and not perform_map and not dump:
  4240     progress_db = ProgressDatabase(db_filename, signature)
  3658     progress_db = ProgressDatabase(db_filename, signature)
  4241   else:
  3659   else:
  4242     progress_db = ExportProgressDatabase(db_filename, signature)
  3660     progress_db = ExportProgressDatabase(db_filename, signature)
  4243 
  3661 
  4244   if download:
       
  4245     result_db = ResultDatabase(result_db_filename, signature)
       
  4246 
       
  4247   return_code = 1
  3662   return_code = 1
  4248 
  3663 
  4249   if not download:
  3664   if not download and not perform_map and not dump:
  4250     loader = Loader.RegisteredLoader(kind)
  3665     loader = Loader.RegisteredLoader(kind)
  4251     try:
  3666     try:
  4252       loader.initialize(filename, loader_opts)
  3667       loader.initialize(filename, loader_opts)
  4253       workitem_generator_factory = GetCSVGeneratorFactory(
  3668       workitem_generator_factory = GetCSVGeneratorFactory(
  4254           kind, filename, batch_size, has_header)
  3669           kind, filename, batch_size, has_header)
  4255 
  3670 
  4256       app = BulkUploaderApp(arg_dict,
  3671       app = BulkUploaderApp(arg_dict,
  4257                             workitem_generator_factory,
  3672                             workitem_generator_factory,
  4258                             throttle,
  3673                             throttle,
  4259                             progress_db,
  3674                             progress_db,
  4260                             BulkLoaderThread,
       
  4261                             ProgressTrackerThread,
  3675                             ProgressTrackerThread,
  4262                             max_queue_size,
  3676                             max_queue_size,
  4263                             RequestManager,
  3677                             RequestManager,
  4264                             DataSourceThread,
  3678                             DataSourceThread,
  4265                             ReQueue,
       
  4266                             Queue.Queue)
  3679                             Queue.Queue)
  4267       try:
  3680       try:
  4268         return_code = app.Run()
  3681         return_code = app.Run()
  4269       except AuthenticationError:
  3682       except AuthenticationError:
  4270         logger.info('Authentication Failed')
  3683         logger.info('Authentication Failed')
  4271     finally:
  3684     finally:
  4272       loader.finalize()
  3685       loader.finalize()
  4273   else:
  3686   elif not perform_map:
       
  3687     result_db = ResultDatabase(result_db_filename, signature)
  4274     exporter = Exporter.RegisteredExporter(kind)
  3688     exporter = Exporter.RegisteredExporter(kind)
  4275     try:
  3689     try:
  4276       exporter.initialize(filename, exporter_opts)
  3690       exporter.initialize(filename, exporter_opts)
  4277 
  3691 
  4278       def KeyRangeGeneratorFactory(progress_queue, progress_gen):
  3692       def KeyRangeGeneratorFactory(request_manager, progress_queue,
  4279         return KeyRangeGenerator(kind, progress_queue, progress_gen)
  3693                                    progress_gen):
       
  3694         return KeyRangeItemGenerator(request_manager, kind, progress_queue,
       
  3695                                      progress_gen, DownloadItem)
  4280 
  3696 
  4281       def ExportProgressThreadFactory(progress_queue, progress_db):
  3697       def ExportProgressThreadFactory(progress_queue, progress_db):
  4282         return ExportProgressThread(kind,
  3698         return ExportProgressThread(kind,
  4283                                     progress_queue,
  3699                                     progress_queue,
  4284                                     progress_db,
  3700                                     progress_db,
  4285                                     result_db)
  3701                                     result_db)
       
  3702 
  4286       app = BulkDownloaderApp(arg_dict,
  3703       app = BulkDownloaderApp(arg_dict,
  4287                               KeyRangeGeneratorFactory,
  3704                               KeyRangeGeneratorFactory,
  4288                               throttle,
  3705                               throttle,
  4289                               progress_db,
  3706                               progress_db,
  4290                               BulkExporterThread,
       
  4291                               ExportProgressThreadFactory,
  3707                               ExportProgressThreadFactory,
  4292                               0,
  3708                               0,
  4293                               RequestManager,
  3709                               RequestManager,
  4294                               DataSourceThread,
  3710                               DataSourceThread,
  4295                               ReQueue,
       
  4296                               Queue.Queue)
  3711                               Queue.Queue)
  4297       try:
  3712       try:
  4298         return_code = app.Run()
  3713         return_code = app.Run()
  4299       except AuthenticationError:
  3714       except AuthenticationError:
  4300         logger.info('Authentication Failed')
  3715         logger.info('Authentication Failed')
  4301     finally:
  3716     finally:
  4302       exporter.finalize()
  3717       exporter.finalize()
       
  3718   elif not download:
       
  3719     mapper = Mapper.RegisteredMapper(kind)
       
  3720     try:
       
  3721       mapper.initialize(mapper_opts)
       
  3722       def KeyRangeGeneratorFactory(request_manager, progress_queue,
       
  3723                                    progress_gen):
       
  3724         return KeyRangeItemGenerator(request_manager, kind, progress_queue,
       
  3725                                      progress_gen, MapperItem)
       
  3726 
       
  3727       def MapperProgressThreadFactory(progress_queue, progress_db):
       
  3728         return MapperProgressThread(kind,
       
  3729                                     progress_queue,
       
  3730                                     progress_db)
       
  3731 
       
  3732       app = BulkMapperApp(arg_dict,
       
  3733                           KeyRangeGeneratorFactory,
       
  3734                           throttle,
       
  3735                           progress_db,
       
  3736                           MapperProgressThreadFactory,
       
  3737                           0,
       
  3738                           RequestManager,
       
  3739                           DataSourceThread,
       
  3740                           Queue.Queue)
       
  3741       try:
       
  3742         return_code = app.Run()
       
  3743       except AuthenticationError:
       
  3744         logger.info('Authentication Failed')
       
  3745     finally:
       
  3746       mapper.finalize()
  4303   return return_code
  3747   return return_code
  4304 
  3748 
  4305 
  3749 
  4306 def SetupLogging(arg_dict):
  3750 def SetupLogging(arg_dict):
  4307   """Sets up logging for the bulkloader.
  3751   """Sets up logging for the bulkloader.
  4333   console.setFormatter(formatter)
  3777   console.setFormatter(formatter)
  4334   logger.addHandler(console)
  3778   logger.addHandler(console)
  4335 
  3779 
  4336   logger.info('Logging to %s', log_file)
  3780   logger.info('Logging to %s', log_file)
  4337 
  3781 
       
  3782   remote_api_throttle.logger.setLevel(level)
       
  3783   remote_api_throttle.logger.addHandler(file_handler)
       
  3784   remote_api_throttle.logger.addHandler(console)
       
  3785 
  4338   appengine_rpc.logger.setLevel(logging.WARN)
  3786   appengine_rpc.logger.setLevel(logging.WARN)
       
  3787 
       
  3788   adaptive_thread_pool.logger.setLevel(logging.DEBUG)
       
  3789   adaptive_thread_pool.logger.addHandler(console)
       
  3790   adaptive_thread_pool.logger.addHandler(file_handler)
       
  3791   adaptive_thread_pool.logger.propagate = False
  4339 
  3792 
  4340 
  3793 
  4341 def Run(arg_dict):
  3794 def Run(arg_dict):
  4342   """Sets up and runs the bulkloader, given the options as keyword arguments.
  3795   """Sets up and runs the bulkloader, given the options as keyword arguments.
  4343 
  3796