thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 2273 e4cb9c53db3e
parent 1278 a7766286a7be
child 2309 be1b94099f2d
--- a/thirdparty/google_appengine/google/appengine/tools/bulkloader.py	Tue Apr 21 16:28:13 2009 +0000
+++ b/thirdparty/google_appengine/google/appengine/tools/bulkloader.py	Fri Apr 24 14:16:00 2009 +0000
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-"""Imports CSV data over HTTP.
+"""Imports data over HTTP.
 
 Usage:
   %(arg0)s [flags]
@@ -27,6 +27,8 @@
                             UserProperties. (Default: gmail.com)
     --bandwidth_limit=<int> The maximum number of bytes per second for the
                             aggregate transfer of data to the server. Bursts
+                            may exceed this, but overall transfer rate is
+                            restricted to this rate. (Default 250000)
     --batch_size=<int>      Number of Entity objects to include in each post to
                             the URL endpoint. The more data per row/Entity, the
                             smaller the batch size should be. (Default 10)
@@ -38,15 +40,25 @@
                             bulkloader-progress-TIMESTAMP.
                             The special filename "skip" may be used to simply
                             skip reading/writing any progress information.
-    --filename=<path>       Path to the CSV file to import. (Required)
+    --download              Export entities to a file.
+    --email=<string>        The username to use. Will prompt if omitted.
+    --exporter_opts=<string>
+                            A string to pass to the Exporter.initialize method.
+    --filename=<path>       Path to the file to import. (Required)
+    --has_header            Skip the first row of the input.
     --http_limit=<int>      The maximum numer of HTTP requests per second to
                             send to the server. (Default: 8)
     --kind=<string>         Name of the Entity object kind to put in the
                             datastore. (Required)
+    --loader_opts=<string>  A string to pass to the Loader.initialize method.
+    --log_file=<path>       File to write bulkloader logs.  If not supplied
+                            then a new log file will be created, named:
+                            bulkloader-log-TIMESTAMP.
     --num_threads=<int>     Number of threads to use for uploading entities
                             (Default 10)
-                            may exceed this, but overall transfer rate is
-                            restricted to this rate. (Default 250000)
+    --passin                Read the login password from stdin.
+    --result_db_filename=<path>
+                            Result database to write to for downloads.
     --rps_limit=<int>       The maximum number of records per second to
                             transfer to the server. (Default: 20)
     --url=<string>          URL endpoint to post to for importing data.
@@ -66,23 +78,29 @@
 
 
 
+import cPickle
 import csv
+import errno
 import getopt
 import getpass
+import imp
 import logging
-import new
 import os
 import Queue
+import re
 import signal
+import StringIO
 import sys
 import threading
 import time
-import traceback
 import urllib2
 import urlparse
 
+from google.appengine.api import datastore_errors
 from google.appengine.ext import db
+from google.appengine.ext.db import polymodel
 from google.appengine.ext.remote_api import remote_api_stub
+from google.appengine.runtime import apiproxy_errors
 from google.appengine.tools import appengine_rpc
 
 try:
@@ -90,7 +108,7 @@
 except ImportError:
   pass
 
-UPLOADER_VERSION = '1'
+logger = logging.getLogger('google.appengine.tools.bulkloader')
 
 DEFAULT_THREAD_COUNT = 10
 
@@ -105,6 +123,10 @@
 STATE_SENT = 2
 STATE_NOT_SENT = 3
 
+STATE_GETTING = 1
+STATE_GOT = 2
+STATE_NOT_GOT = 3
+
 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001
 
 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
@@ -128,8 +150,11 @@
 HTTPS_REQUESTS = 'https-requests'
 RECORDS = 'records'
 
-
-def StateMessage(state):
+MAXIMUM_INCREASE_DURATION = 8.0
+MAXIMUM_HOLD_DURATION = 10.0
+
+
+def ImportStateMessage(state):
   """Converts a numeric state identifier to a status message."""
   return ({
       STATE_READ: 'Batch read from file.',
@@ -139,12 +164,46 @@
   }[state])
 
 
+def ExportStateMessage(state):
+  """Converts a numeric state identifier to a status message."""
+  return ({
+      STATE_READ: 'Batch read from file.',
+      STATE_GETTING: 'Fetching batch from server',
+      STATE_GOT: 'Batch successfully fetched.',
+      STATE_NOT_GOT: 'Error while fetching batch'
+  }[state])
+
+
+def ExportStateName(state):
+  """Converts a numeric state identifier to a string."""
+  return ({
+      STATE_READ: 'READ',
+      STATE_GETTING: 'GETTING',
+      STATE_GOT: 'GOT',
+      STATE_NOT_GOT: 'NOT_GOT'
+  }[state])
+
+
+def ImportStateName(state):
+  """Converts a numeric state identifier to a string."""
+  return ({
+      STATE_READ: 'READ',
+      STATE_GETTING: 'SENDING',
+      STATE_GOT: 'SENT',
+      STATE_NOT_GOT: 'NOT_SENT'
+  }[state])
+
+
 class Error(Exception):
   """Base-class for exceptions in this module."""
 
 
+class MissingPropertyError(Error):
+  """An expected field is missing from an entity, and no default was given."""
+
+
 class FatalServerError(Error):
-  """An unrecoverable error occurred while trying to post data to the server."""
+  """An unrecoverable error occurred while posting data to the server."""
 
 
 class ResumeError(Error):
@@ -159,72 +218,107 @@
   """Error while trying to authenticate with the server."""
 
 
-def GetCSVGeneratorFactory(csv_filename, batch_size,
+class FileNotFoundError(Error):
+  """A filename passed in by the user refers to a non-existent input file."""
+
+
+class FileNotReadableError(Error):
+  """A filename passed in by the user refers to a non-readable input file."""
+
+
+class FileExistsError(Error):
+  """A filename passed in by the user refers to an existing output file."""
+
+
+class FileNotWritableError(Error):
+  """A filename passed in by the user refers to a non-writable output file."""
+
+
+class KeyRangeError(Error):
+  """Error while trying to generate a KeyRange."""
+
+
+class BadStateError(Error):
+  """A work item in an unexpected state was encountered."""
+
+
+class NameClashError(Error):
+  """A name clash occurred while trying to alias old method names."""
+  def __init__(self, old_name, new_name, klass):
+    Error.__init__(self, old_name, new_name, klass)
+    self.old_name = old_name
+    self.new_name = new_name
+    self.klass = klass
+
+
+def GetCSVGeneratorFactory(kind, csv_filename, batch_size, csv_has_header,
                            openfile=open, create_csv_reader=csv.reader):
   """Return a factory that creates a CSV-based WorkItem generator.
 
   Args:
+    kind: The kind of the entities being uploaded.
     csv_filename: File on disk containing CSV data.
     batch_size: Maximum number of CSV rows to stash into a WorkItem.
+    csv_has_header: Whether to skip the first row of the CSV.
     openfile: Used for dependency injection.
     create_csv_reader: Used for dependency injection.
 
-  Returns: A callable (accepting the Progress Queue and Progress
-    Generators as input) which creates the WorkItem generator.
+  Returns:
+    A callable (accepting the Progress Queue and Progress Generators
+    as input) which creates the WorkItem generator.
   """
+  loader = Loader.RegisteredLoader(kind)
+  loader._Loader__openfile = openfile
+  loader._Loader__create_csv_reader = create_csv_reader
+  record_generator = loader.generate_records(csv_filename)
 
   def CreateGenerator(progress_queue, progress_generator):
-    """Initialize a CSV generator linked to a progress generator and queue.
+    """Initialize a WorkItem generator linked to a progress generator and queue.
 
     Args:
       progress_queue: A ProgressQueue instance to send progress information.
       progress_generator: A generator of progress information or None.
 
     Returns:
-      A CSVGenerator instance.
+      A WorkItemGenerator instance.
     """
-    return CSVGenerator(progress_queue,
-                        progress_generator,
-                        csv_filename,
-                        batch_size,
-                        openfile,
-                        create_csv_reader)
+    return WorkItemGenerator(progress_queue,
+                             progress_generator,
+                             record_generator,
+                             csv_has_header,
+                             batch_size)
+
   return CreateGenerator
 
 
-class CSVGenerator(object):
-  """Reads a CSV file and generates WorkItems containing batches of records."""
+class WorkItemGenerator(object):
+  """Reads rows from a row generator and generates WorkItems of batches."""
 
   def __init__(self,
                progress_queue,
                progress_generator,
-               csv_filename,
-               batch_size,
-               openfile,
-               create_csv_reader):
-    """Initializes a CSV generator.
+               record_generator,
+               skip_first,
+               batch_size):
+    """Initialize a WorkItemGenerator.
 
     Args:
-      progress_queue: A queue used for tracking progress information.
-      progress_generator: A generator of prior progress information, or None
-        if there is no prior status.
-      csv_filename: File on disk containing CSV data.
-      batch_size: Maximum number of CSV rows to stash into a WorkItem.
-      openfile: Used for dependency injection of 'open'.
-      create_csv_reader: Used for dependency injection of 'csv.reader'.
+      progress_queue: A progress queue with which to associate WorkItems.
+      progress_generator: A generator of progress information.
+      record_generator: A generator of data records.
+      skip_first: Whether to skip the first data record.
+      batch_size: The number of data records per WorkItem.
     """
     self.progress_queue = progress_queue
     self.progress_generator = progress_generator
-    self.csv_filename = csv_filename
+    self.reader = record_generator
+    self.skip_first = skip_first
     self.batch_size = batch_size
-    self.openfile = openfile
-    self.create_csv_reader = create_csv_reader
     self.line_number = 1
     self.column_count = None
     self.read_rows = []
-    self.reader = None
     self.row_count = 0
-    self.sent_count = 0
+    self.xfer_count = 0
 
   def _AdvanceTo(self, line):
     """Advance the reader to the given line.
@@ -236,7 +330,7 @@
       self.reader.next()
       self.line_number += 1
       self.row_count += 1
-      self.sent_count += 1
+      self.xfer_count += 1
 
   def _ReadRows(self, key_start, key_end):
     """Attempts to read and encode rows [key_start, key_end].
@@ -286,7 +380,7 @@
     return item
 
   def Batches(self):
-    """Reads the CSV data file and generates WorkItems.
+    """Reads from the record_generator and generates WorkItems.
 
     Yields:
       Instances of class WorkItem
@@ -295,28 +389,23 @@
       ResumeError: If the progress database and data file indicate a different
         number of rows.
     """
-    csv_file = self.openfile(self.csv_filename, 'r')
-    csv_content = csv_file.read()
-    if csv_content:
-      has_headers = csv.Sniffer().has_header(csv_content)
-    else:
-      has_headers = False
-    csv_file.seek(0)
-    self.reader = self.create_csv_reader(csv_file, skipinitialspace=True)
-    if has_headers:
-      logging.info('The CSV file appears to have a header line, skipping.')
-      self.reader.next()
+    if self.skip_first:
+      logger.info('Skipping header line.')
+      try:
+        self.reader.next()
+      except StopIteration:
+        return
 
     exhausted = False
 
     self.line_number = 1
     self.column_count = None
 
-    logging.info('Starting import; maximum %d entities per post',
-                 self.batch_size)
+    logger.info('Starting import; maximum %d entities per post',
+                self.batch_size)
 
     state = None
-    if self.progress_generator is not None:
+    if self.progress_generator:
       for progress_key, state, key_start, key_end in self.progress_generator:
         if key_start:
           try:
@@ -327,7 +416,7 @@
                                  self.read_rows,
                                  progress_key=progress_key)
           except StopIteration:
-            logging.error('Mismatch between data file and progress database')
+            logger.error('Mismatch between data file and progress database')
             raise ResumeError(
                 'Mismatch between data file and progress database')
         elif state == DATA_CONSUMED_TO_HERE:
@@ -349,6 +438,91 @@
           yield self._MakeItem(key_start, key_end, self.read_rows)
 
 
+class CSVGenerator(object):
+  """Reads a CSV file and generates data records."""
+
+  def __init__(self,
+               csv_filename,
+               openfile=open,
+               create_csv_reader=csv.reader):
+    """Initializes a CSV generator.
+
+    Args:
+      csv_filename: File on disk containing CSV data.
+      openfile: Used for dependency injection of 'open'.
+      create_csv_reader: Used for dependency injection of 'csv.reader'.
+    """
+    self.csv_filename = csv_filename
+    self.openfile = openfile
+    self.create_csv_reader = create_csv_reader
+
+  def Records(self):
+    """Reads the CSV data file and generates row records.
+
+    Yields:
+      Lists of strings
+
+    Raises:
+      ResumeError: If the progress database and data file indicate a different
+        number of rows.
+    """
+    csv_file = self.openfile(self.csv_filename, 'rb')
+    reader = self.create_csv_reader(csv_file, skipinitialspace=True)
+    return reader
+
+
+class KeyRangeGenerator(object):
+  """Generates ranges of keys to download.
+
+  Reads progress information from the progress database and creates
+  KeyRange objects corresponding to incompletely downloaded parts of an
+  export.
+  """
+
+  def __init__(self, kind, progress_queue, progress_generator):
+    """Initialize the KeyRangeGenerator.
+
+    Args:
+      kind: The kind of entities being transferred.
+      progress_queue: A queue used for tracking progress information.
+      progress_generator: A generator of prior progress information, or None
+        if there is no prior status.
+    """
+    self.kind = kind
+    self.row_count = 0
+    self.xfer_count = 0
+    self.progress_queue = progress_queue
+    self.progress_generator = progress_generator
+
+  def Batches(self):
+    """Iterate through saved progress information.
+
+    Yields:
+      KeyRange instances corresponding to undownloaded key ranges.
+    """
+    if self.progress_generator is not None:
+      for progress_key, state, key_start, key_end in self.progress_generator:
+        if state is not None and state != STATE_GOT and key_start is not None:
+          key_start = ParseKey(key_start)
+          key_end = ParseKey(key_end)
+
+          result = KeyRange(self.progress_queue,
+                            self.kind,
+                            key_start=key_start,
+                            key_end=key_end,
+                            progress_key=progress_key,
+                            direction=KeyRange.ASC,
+                            state=STATE_READ)
+          yield result
+    else:
+
+      yield KeyRange(
+          self.progress_queue, self.kind,
+          key_start=None,
+          key_end=None,
+          direction=KeyRange.DESC)
+
+
 class ReQueue(object):
   """A special thread-safe queue.
 
@@ -358,7 +532,7 @@
   the number of outstanding tasks.
 
   This class shares an interface with Queue.Queue and provides the
-  additional Reput method.
+  additional reput method.
   """
 
   def __init__(self,
@@ -474,7 +648,7 @@
     Re-putting an item does not increase the number of outstanding
     tasks, so the reput item should be uniquely associated with an
     item that was previously removed from the requeue and for which
-    task_done has not been called.
+    TaskDone has not been called.
 
     Args:
       item: An item to add to the requeue.
@@ -538,6 +712,9 @@
     """Try to get an item from the queue without blocking."""
     return self.get(block=False)
 
+  def qsize(self):
+    return self.queue.qsize() + self.requeue.qsize()
+
 
 class ThrottleHandler(urllib2.BaseHandler):
   """A urllib2 handler for http and https requests that adds to a throttle."""
@@ -701,133 +878,127 @@
   Returns:
     A factory to produce a ThrottledHttpRpcServer.
   """
+
   def MakeRpcServer(*args, **kwargs):
+    """Factory to produce a ThrottledHttpRpcServer.
+
+    Args:
+      args: Positional args to pass to ThrottledHttpRpcServer.
+      kwargs: Keyword args to pass to ThrottledHttpRpcServer.
+
+    Returns:
+      A ThrottledHttpRpcServer instance.
+    """
     kwargs['account_type'] = 'HOSTED_OR_GOOGLE'
     kwargs['save_cookies'] = True
     return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs)
   return MakeRpcServer
 
 
-class RequestManager(object):
-  """A class which wraps a connection to the server."""
-
-  source = 'google-bulkloader-%s' % UPLOADER_VERSION
-  user_agent = source
-
-  def __init__(self,
-               app_id,
-               host_port,
-               url_path,
-               kind,
-               throttle):
-    """Initialize a RequestManager object.
-
-    Args:
-      app_id: String containing the application id for requests.
-      host_port: String containing the "host:port" pair; the port is optional.
-      url_path: partial URL (path) to post entity data to.
-      kind: Kind of the Entity records being posted.
-      throttle: A Throttle instance.
-    """
-    self.app_id = app_id
-    self.host_port = host_port
-    self.host = host_port.split(':')[0]
-    if url_path and url_path[0] != '/':
-      url_path = '/' + url_path
-    self.url_path = url_path
-    self.kind = kind
-    self.throttle = throttle
-    self.credentials = None
-    throttled_rpc_server_factory = ThrottledHttpRpcServerFactory(
-        self.throttle, self)
-    logging.debug('Configuring remote_api. app_id = %s, url_path = %s, '
-                  'servername = %s' % (app_id, url_path, host_port))
-    remote_api_stub.ConfigureRemoteDatastore(
-        app_id,
-        url_path,
-        self.AuthFunction,
-        servername=host_port,
-        rpc_server_factory=throttled_rpc_server_factory)
-    self.authenticated = False
-
-  def Authenticate(self):
-    """Invoke authentication if necessary."""
-    self.rpc_server.Send(self.url_path, payload=None)
-    self.authenticated = True
-
-  def AuthFunction(self,
-                   raw_input_fn=raw_input,
-                   password_input_fn=getpass.getpass):
-    """Prompts the user for a username and password.
-
-    Caches the results the first time it is called and returns the
-    same result every subsequent time.
+class ExportResult(object):
+  """Holds the decoded content for the result of an export requests."""
+
+  def __init__(self, continued, direction, keys, entities):
+    self.continued = continued
+    self.direction = direction
+    self.keys = keys
+    self.entities = entities
+    self.count = len(keys)
+    assert self.count == len(entities)
+    assert direction in (KeyRange.ASC, KeyRange.DESC)
+    if self.count > 0:
+      if direction == KeyRange.ASC:
+        self.key_start = keys[0]
+        self.key_end = keys[-1]
+      else:
+        self.key_start = keys[-1]
+        self.key_end = keys[0]
+
+  def __str__(self):
+    return 'continued = %s\n%s' % (
+        str(self.continued), '\n'.join(self.entities))
+
+
+class _WorkItem(object):
+  """Holds a description of a unit of upload or download work."""
+
+  def __init__(self, progress_queue, key_start, key_end, state_namer,
+               state=STATE_READ, progress_key=None):
+    """Initialize the _WorkItem instance.
 
     Args:
-      raw_input_fn: Used for dependency injection.
-      password_input_fn: Used for dependency injection.
-
-    Returns:
-      A pair of the username and password.
+      progress_queue: A queue used for tracking progress information.
+      key_start: The starting key, inclusive.
+      key_end: The ending key, inclusive.
+      state_namer: Function to describe work item states.
+      state: The initial state of the work item.
+      progress_key: If this WorkItem represents state from a prior run,
+        then this will be the key within the progress database.
     """
-    if self.credentials is not None:
-      return self.credentials
-    print 'Please enter login credentials for %s (%s)' % (
-        self.host, self.app_id)
-    email = raw_input_fn('Email: ')
-    if email:
-      password_prompt = 'Password for %s: ' % email
-      password = password_input_fn(password_prompt)
-    else:
-      password = None
-    self.credentials = (email, password)
-    return self.credentials
-
-  def _GetHeaders(self):
-    """Constructs a dictionary of extra headers to send with a request."""
-    headers = {
-        'GAE-Uploader-Version': UPLOADER_VERSION,
-        'GAE-Uploader-Kind': self.kind
-        }
-    return headers
-
-  def EncodeContent(self, rows):
-    """Encodes row data to the wire format.
+    self.progress_queue = progress_queue
+    self.key_start = key_start
+    self.key_end = key_end
+    self.state_namer = state_namer
+    self.state = state
+    self.progress_key = progress_key
+    self.progress_event = threading.Event()
+
+  def _AssertInState(self, *states):
+    """Raises an Error if the state of this range is not in states."""
+    if not self.state in states:
+      raise BadStateError('%s:%s not in %s' %
+                          (str(self),
+                           self.state_namer(self.state),
+                           map(self.state_namer, states)))
+
+  def _AssertProgressKey(self):
+    """Raises an Error if the progress key is None."""
+    if self.progress_key is None:
+      raise BadStateError('%s: Progress key is missing' % str(self))
+
+  def MarkAsRead(self):
+    """Mark this _WorkItem as read, updating the progress database."""
+    self._AssertInState(STATE_READ)
+    self._StateTransition(STATE_READ, blocking=True)
+
+  def MarkAsTransferring(self):
+    """Mark this _WorkItem as transferring, updating the progress database."""
+    self._AssertInState(STATE_READ, STATE_NOT_GOT)
+    self._AssertProgressKey()
+    self._StateTransition(STATE_GETTING, blocking=True)
+
+  def MarkAsTransferred(self):
+    """Mark this _WorkItem as transferred, updating the progress database."""
+    raise NotImplementedError()
+
+  def MarkAsError(self):
+    """Mark this _WorkItem as failed, updating the progress database."""
+    self._AssertInState(STATE_GETTING)
+    self._AssertProgressKey()
+    self._StateTransition(STATE_NOT_GOT, blocking=True)
+
+  def _StateTransition(self, new_state, blocking=False):
+    """Transition the work item to a new state, storing progress information.
 
     Args:
-      rows: A list of pairs of a line number and a list of column values.
-
-    Returns:
-      A list of db.Model instances.
+      new_state: The state to transition to.
+      blocking: Whether to block for the progress thread to acknowledge the
+        transition.
     """
-    try:
-      loader = Loader.RegisteredLoaders()[self.kind]
-    except KeyError:
-      logging.error('No Loader defined for kind %s.' % self.kind)
-      raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
-    entities = []
-    for line_number, values in rows:
-      key = loader.GenerateKey(line_number, values)
-      entity = loader.CreateEntity(values, key_name=key)
-      entities.extend(entity)
-
-    return entities
-
-  def PostEntities(self, item):
-    """Posts Entity records to a remote endpoint over HTTP.
-
-    Args:
-      item: A workitem containing the entities to post.
-
-    Returns:
-      A pair of the estimated size of the request in bytes and the response
-        from the server as a str.
-    """
-    entities = item.content
-    db.put(entities)
-
-
-class WorkItem(object):
+    assert not self.progress_event.isSet()
+
+    self.state = new_state
+
+    self.progress_queue.put(self)
+
+    if blocking:
+      self.progress_event.wait()
+
+      self.progress_event.clear()
+
+
+
+class WorkItem(_WorkItem):
   """Holds a unit of uploading work.
 
   A WorkItem represents a number of entities that need to be uploaded to
@@ -854,78 +1025,572 @@
       progress_key: If this WorkItem represents state from a prior run,
         then this will be the key within the progress database.
     """
-    self.state = STATE_READ
-
-    self.progress_queue = progress_queue
+    _WorkItem.__init__(self, progress_queue, key_start, key_end,
+                       ImportStateName, state=STATE_READ,
+                       progress_key=progress_key)
 
     assert isinstance(key_start, (int, long))
     assert isinstance(key_end, (int, long))
     assert key_start <= key_end
 
-    self.key_start = key_start
-    self.key_end = key_end
-    self.progress_key = progress_key
-
-    self.progress_event = threading.Event()
-
     self.rows = rows
     self.content = None
     self.count = len(rows)
 
-  def MarkAsRead(self):
-    """Mark this WorkItem as read/consumed from the data source."""
-
-    assert self.state == STATE_READ
-
-    self._StateTransition(STATE_READ, blocking=True)
-
-    assert self.progress_key is not None
-
-  def MarkAsSending(self):
-    """Mark this WorkItem as in-process on being uploaded to the server."""
-
-    assert self.state == STATE_READ or self.state == STATE_NOT_SENT
-    assert self.progress_key is not None
-
-    self._StateTransition(STATE_SENDING, blocking=True)
-
-  def MarkAsSent(self):
+  def __str__(self):
+    return '[%s-%s]' % (self.key_start, self.key_end)
+
+  def MarkAsTransferred(self):
     """Mark this WorkItem as sucessfully-sent to the server."""
 
-    assert self.state == STATE_SENDING
-    assert self.progress_key is not None
+    self._AssertInState(STATE_SENDING)
+    self._AssertProgressKey()
 
     self._StateTransition(STATE_SENT, blocking=False)
 
-  def MarkAsError(self):
-    """Mark this WorkItem as required manual error recovery."""
-
-    assert self.state == STATE_SENDING
-    assert self.progress_key is not None
-
-    self._StateTransition(STATE_NOT_SENT, blocking=True)
-
-  def _StateTransition(self, new_state, blocking=False):
-    """Transition the work item to a new state, storing progress information.
+
+def GetImplementationClass(kind_or_class_key):
+  """Returns the implementation class for a given kind or class key.
+
+  Args:
+    kind_or_class_key: A kind string or a tuple of kind strings.
+
+  Return:
+    A db.Model subclass for the given kind or class key.
+  """
+  if isinstance(kind_or_class_key, tuple):
+    try:
+      implementation_class = polymodel._class_map[kind_or_class_key]
+    except KeyError:
+      raise db.KindError('No implementation for class \'%s\'' %
+                         kind_or_class_key)
+  else:
+    implementation_class = db.class_for_kind(kind_or_class_key)
+  return implementation_class
+
+class EmptyQuery(db.Query):
+  def get(self):
+    return None
+
+  def fetch(self, limit=1000, offset=0):
+    return []
+
+  def count(self, limit=1000):
+    return 0
+
+
+def KeyLEQ(key1, key2):
+  """Compare two keys for less-than-or-equal-to.
+
+  All keys with numeric ids come before all keys with names.
+
+  Args:
+    key1: An int or db.Key instance.
+    key2: An int or db.Key instance.
+
+  Returns:
+    True if key1 <= key2
+  """
+  if isinstance(key1, int) and isinstance(key2, int):
+    return key1 <= key2
+  if key1 is None or key2 is None:
+    return True
+  if key1.id() and not key2.id():
+    return True
+  return key1.id_or_name() <= key2.id_or_name()
+
+
+class KeyRange(_WorkItem):
+  """Represents an item of download work.
+
+  A KeyRange object represents a key range (key_start, key_end) and a
+  scan direction (KeyRange.DESC or KeyRange.ASC).  The KeyRange object
+  has an associated state: STATE_READ, STATE_GETTING, STATE_GOT, and
+  STATE_ERROR.
+
+  - STATE_READ indicates the range ready to be downloaded by a worker thread.
+  - STATE_GETTING indicates the range is currently being downloaded.
+  - STATE_GOT indicates that the range was successfully downloaded
+  - STATE_ERROR indicates that an error occurred during the last download
+    attempt
+
+  KeyRanges not in the STATE_GOT state are stored in the progress database.
+  When a piece of KeyRange work is downloaded, the download may cover only
+  a portion of the range.  In this case, the old KeyRange is removed from
+  the progress database and ranges covering the undownloaded range are
+  generated and stored as STATE_READ in the export progress database.
+  """
+
+  DESC = 0
+  ASC = 1
+
+  MAX_KEY_LEN = 500
+
+  def __init__(self,
+               progress_queue,
+               kind,
+               direction,
+               key_start=None,
+               key_end=None,
+               include_start=True,
+               include_end=True,
+               progress_key=None,
+               state=STATE_READ):
+    """Initialize a KeyRange object.
+
+    Args:
+      progress_queue: A queue used for tracking progress information.
+      kind: The kind of entities for this range.
+      direction: The direction of the query for this range.
+      key_start: The starting key for this range.
+      key_end: The ending key for this range.
+      include_start: Whether the start key should be included in the range.
+      include_end: Whether the end key should be included in the range.
+      progress_key: The key for this range within the progress database.
+      state: The initial state of this range.
+
+    Raises:
+      KeyRangeError: if key_start is None.
+    """
+    assert direction in (KeyRange.ASC, KeyRange.DESC)
+    _WorkItem.__init__(self, progress_queue, key_start, key_end,
+                       ExportStateName, state=state, progress_key=progress_key)
+    self.kind = kind
+    self.direction = direction
+    self.export_result = None
+    self.count = 0
+    self.include_start = include_start
+    self.include_end = include_end
+    self.SPLIT_KEY = db.Key.from_path(self.kind, unichr(0))
+
+  def __str__(self):
+    return '[%s-%s]' % (PrettyKey(self.key_start), PrettyKey(self.key_end))
+
+  def __repr__(self):
+    return self.__str__()
+
+  def MarkAsTransferred(self):
+    """Mark this KeyRange as transferred, updating the progress database."""
+    pass
+
+  def Process(self, export_result, num_threads, batch_size, work_queue):
+    """Mark this KeyRange as success, updating the progress database.
+
+    Process will split this KeyRange based on the content of export_result and
+    adds the unfinished ranges to the work queue.
+
+    Args:
+      export_result: An ExportResult instance.
+      num_threads: The number of threads for parallel transfers.
+      batch_size: The number of entities to transfer per request.
+      work_queue: The work queue to add unfinished ranges to.
+
+    Returns:
+      A list of KeyRanges representing undownloaded datastore key ranges.
+    """
+    self._AssertInState(STATE_GETTING)
+    self._AssertProgressKey()
+
+    self.export_result = export_result
+    self.count = len(export_result.keys)
+    if export_result.continued:
+      self._FinishedRange()._StateTransition(STATE_GOT, blocking=True)
+      self._AddUnfinishedRanges(num_threads, batch_size, work_queue)
+    else:
+      self._StateTransition(STATE_GOT, blocking=True)
+
+  def _FinishedRange(self):
+    """Returns the range completed by the export_result.
+
+    Returns:
+      A KeyRange representing a completed range.
+    """
+    assert self.export_result is not None
+
+    if self.direction == KeyRange.ASC:
+      key_start = self.key_start
+      if self.export_result.continued:
+        key_end = self.export_result.key_end
+      else:
+        key_end = self.key_end
+    else:
+      key_end = self.key_end
+      if self.export_result.continued:
+        key_start = self.export_result.key_start
+      else:
+        key_start = self.key_start
+
+    result = KeyRange(self.progress_queue,
+                      self.kind,
+                      key_start=key_start,
+                      key_end=key_end,
+                      direction=self.direction)
+
+    result.progress_key = self.progress_key
+    result.export_result = self.export_result
+    result.state = self.state
+    result.count = self.count
+    return result
+
+  def FilterQuery(self, query):
+    """Add query filter to restrict to this key range.
 
     Args:
-      new_state: The state to transition to.
-      blocking: Whether to block for the progress thread to acknowledge the
-        transition.
+      query: A db.Query instance.
+    """
+    if self.key_start == self.key_end and not (
+        self.include_start or self.include_end):
+      return EmptyQuery()
+    if self.include_start:
+      start_comparator = '>='
+    else:
+      start_comparator = '>'
+    if self.include_end:
+      end_comparator = '<='
+    else:
+      end_comparator = '<'
+    if self.key_start and self.key_end:
+      query.filter('__key__ %s' % start_comparator, self.key_start)
+      query.filter('__key__ %s' % end_comparator, self.key_end)
+    elif self.key_start:
+      query.filter('__key__ %s' % start_comparator, self.key_start)
+    elif self.key_end:
+      query.filter('__key__ %s' % end_comparator, self.key_end)
+
+    return query
+
+  def MakeParallelQuery(self):
+    """Construct a query for this key range, for parallel downloading.
+
+    Returns:
+      A db.Query instance.
+
+    Raises:
+      KeyRangeError: if self.direction is not one of
+        KeyRange.ASC, KeyRange.DESC
+    """
+    if self.direction == KeyRange.ASC:
+      direction = ''
+    elif self.direction == KeyRange.DESC:
+      direction = '-'
+    else:
+      raise KeyRangeError('KeyRange direction unexpected: %s', self.direction)
+    query = db.Query(GetImplementationClass(self.kind))
+    query.order('%s__key__' % direction)
+
+    return self.FilterQuery(query)
+
+  def MakeSerialQuery(self):
+    """Construct a query for this key range without descending __key__ scan.
+
+    Returns:
+      A db.Query instance.
+    """
+    query = db.Query(GetImplementationClass(self.kind))
+    query.order('__key__')
+
+    return self.FilterQuery(query)
+
+  def _BisectStringRange(self, start, end):
+    if start == end:
+      return (start, start, end)
+    start += '\0'
+    end += '\0'
+    midpoint = []
+    expected_max = 127
+    for i in xrange(min(len(start), len(end))):
+      if start[i] == end[i]:
+        midpoint.append(start[i])
+      else:
+        ord_sum = ord(start[i]) + ord(end[i])
+        midpoint.append(unichr(ord_sum / 2))
+        if ord_sum % 2:
+          if len(start) > i + 1:
+            ord_start = ord(start[i+1])
+          else:
+            ord_start = 0
+          if ord_start < expected_max:
+            ord_split = (expected_max + ord_start) / 2
+          else:
+            ord_split = (0xFFFF + ord_start) / 2
+          midpoint.append(unichr(ord_split))
+        break
+    return (start[:-1], ''.join(midpoint), end[:-1])
+
+  def SplitRange(self, key_start, include_start, key_end, include_end,
+                 export_result, num_threads, batch_size, work_queue):
+    """Split the key range [key_start, key_end] into a list of ranges."""
+    if export_result.direction == KeyRange.ASC:
+      key_start = export_result.key_end
+      include_start = False
+    else:
+      key_end = export_result.key_start
+      include_end = False
+    key_pairs = []
+    if not key_start:
+      key_pairs.append((key_start, include_start, key_end, include_end,
+                        KeyRange.ASC))
+    elif not key_end:
+      key_pairs.append((key_start, include_start, key_end, include_end,
+                        KeyRange.DESC))
+    elif work_queue.qsize() > 2 * num_threads:
+      key_pairs.append((key_start, include_start, key_end, include_end,
+                        KeyRange.ASC))
+    elif key_start.id() and key_end.id():
+      if key_end.id() - key_start.id() > batch_size:
+        key_half = db.Key.from_path(self.kind,
+                                    (key_start.id() + key_end.id()) / 2)
+        key_pairs.append((key_start, include_start,
+                          key_half, True,
+                          KeyRange.DESC))
+        key_pairs.append((key_half, False,
+                          key_end, include_end,
+                          KeyRange.ASC))
+      else:
+        key_pairs.append((key_start, include_start, key_end, include_end,
+                          KeyRange.ASC))
+    elif key_start.name() and key_end.name():
+      (start, middle, end) = self._BisectStringRange(key_start.name(),
+                                                     key_end.name())
+      key_pairs.append((key_start, include_start,
+                        db.Key.from_path(self.kind, middle), True,
+                        KeyRange.DESC))
+      key_pairs.append((db.Key.from_path(self.kind, middle), False,
+                        key_end, include_end,
+                        KeyRange.ASC))
+    else:
+      assert key_start.id() and key_end.name()
+      key_pairs.append((key_start, include_start,
+                        self.SPLIT_KEY, False,
+                        KeyRange.DESC))
+      key_pairs.append((self.SPLIT_KEY, True,
+                        key_end, include_end,
+                        KeyRange.ASC))
+
+    ranges = [KeyRange(self.progress_queue,
+                       self.kind,
+                       key_start=start,
+                       include_start=include_start,
+                       key_end=end,
+                       include_end=include_end,
+                       direction=direction)
+              for (start, include_start, end, include_end, direction)
+              in key_pairs]
+
+    for key_range in ranges:
+      key_range.MarkAsRead()
+      work_queue.put(key_range, block=True)
+
+  def _AddUnfinishedRanges(self, num_threads, batch_size, work_queue):
+    """Adds incomplete KeyRanges to the work_queue.
+
+    Args:
+      num_threads: The number of threads for parallel transfers.
+      batch_size: The number of entities to transfer per request.
+      work_queue: The work queue to add unfinished ranges to.
+
+    Returns:
+      A list of KeyRanges representing incomplete datastore key ranges.
+
+    Raises:
+      KeyRangeError: if this key range has already been completely transferred.
     """
-    logging.debug('[%s-%s] %s' %
-                  (self.key_start, self.key_end, StateMessage(self.state)))
-    assert not self.progress_event.isSet()
-
-    self.state = new_state
-
-    self.progress_queue.put(self)
-
-    if blocking:
-      self.progress_event.wait()
-
-      self.progress_event.clear()
-
+    assert self.export_result is not None
+    if self.export_result.continued:
+      self.SplitRange(self.key_start, self.include_start, self.key_end,
+                      self.include_end, self.export_result,
+                      num_threads, batch_size, work_queue)
+    else:
+      raise KeyRangeError('No unfinished part of key range.')
+
+
+class RequestManager(object):
+  """A class which wraps a connection to the server."""
+
+  def __init__(self,
+               app_id,
+               host_port,
+               url_path,
+               kind,
+               throttle,
+               batch_size,
+               secure,
+               email,
+               passin):
+    """Initialize a RequestManager object.
+
+    Args:
+      app_id: String containing the application id for requests.
+      host_port: String containing the "host:port" pair; the port is optional.
+      url_path: partial URL (path) to post entity data to.
+      kind: Kind of the Entity records being posted.
+      throttle: A Throttle instance.
+      batch_size: The number of entities to transfer per request.
+      secure: Use SSL when communicating with server.
+      email: If not none, the username to log in with.
+      passin: If True, the password will be read from standard in.
+    """
+    self.app_id = app_id
+    self.host_port = host_port
+    self.host = host_port.split(':')[0]
+    if url_path and url_path[0] != '/':
+      url_path = '/' + url_path
+    self.url_path = url_path
+    self.kind = kind
+    self.throttle = throttle
+    self.batch_size = batch_size
+    self.secure = secure
+    self.authenticated = False
+    self.auth_called = False
+    self.parallel_download = True
+    self.email = email
+    self.passin = passin
+    throttled_rpc_server_factory = ThrottledHttpRpcServerFactory(
+        self.throttle, self)
+    logger.debug('Configuring remote_api. url_path = %s, '
+                 'servername = %s' % (url_path, host_port))
+    remote_api_stub.ConfigureRemoteDatastore(
+        app_id,
+        url_path,
+        self.AuthFunction,
+        servername=host_port,
+        rpc_server_factory=throttled_rpc_server_factory,
+        secure=self.secure)
+    logger.debug('Bulkloader using app_id: %s', os.environ['APPLICATION_ID'])
+
+  def Authenticate(self):
+    """Invoke authentication if necessary."""
+    logger.info('Connecting to %s', self.url_path)
+    self.rpc_server.Send(self.url_path, payload=None)
+    self.authenticated = True
+
+  def AuthFunction(self,
+                   raw_input_fn=raw_input,
+                   password_input_fn=getpass.getpass):
+    """Prompts the user for a username and password.
+
+    Caches the results the first time it is called and returns the
+    same result every subsequent time.
+
+    Args:
+      raw_input_fn: Used for dependency injection.
+      password_input_fn: Used for dependency injection.
+
+    Returns:
+      A pair of the username and password.
+    """
+    if self.email:
+      email = self.email
+    else:
+      print 'Please enter login credentials for %s' % (
+          self.host)
+      email = raw_input_fn('Email: ')
+
+    if email:
+      password_prompt = 'Password for %s: ' % email
+      if self.passin:
+        password = raw_input_fn(password_prompt)
+      else:
+        password = password_input_fn(password_prompt)
+    else:
+      password = None
+
+    self.auth_called = True
+    return (email, password)
+
+  def EncodeContent(self, rows, loader=None):
+    """Encodes row data to the wire format.
+
+    Args:
+      rows: A list of pairs of a line number and a list of column values.
+      loader: Used for dependency injection.
+
+    Returns:
+      A list of db.Model instances.
+
+    Raises:
+      ConfigurationError: if no loader is defined for self.kind
+    """
+    if not loader:
+      try:
+        loader = Loader.RegisteredLoader(self.kind)
+      except KeyError:
+        logger.error('No Loader defined for kind %s.' % self.kind)
+        raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
+    entities = []
+    for line_number, values in rows:
+      key = loader.generate_key(line_number, values)
+      if isinstance(key, db.Key):
+        parent = key.parent()
+        key = key.name()
+      else:
+        parent = None
+      entity = loader.create_entity(values, key_name=key, parent=parent)
+      if isinstance(entity, list):
+        entities.extend(entity)
+      elif entity:
+        entities.append(entity)
+
+    return entities
+
+  def PostEntities(self, item):
+    """Posts Entity records to a remote endpoint over HTTP.
+
+    Args:
+      item: A workitem containing the entities to post.
+
+    Returns:
+      A pair of the estimated size of the request in bytes and the response
+        from the server as a str.
+    """
+    entities = item.content
+    db.put(entities)
+
+  def GetEntities(self, key_range):
+    """Gets Entity records from a remote endpoint over HTTP.
+
+    Args:
+     key_range: Range of keys to get.
+
+    Returns:
+      An ExportResult instance.
+
+    Raises:
+      ConfigurationError: if no Exporter is defined for self.kind
+    """
+    try:
+      Exporter.RegisteredExporter(self.kind)
+    except KeyError:
+      raise ConfigurationError('No Exporter defined for kind %s.' % self.kind)
+
+    keys = []
+    entities = []
+
+    if self.parallel_download:
+      query = key_range.MakeParallelQuery()
+      try:
+        results = query.fetch(self.batch_size)
+      except datastore_errors.NeedIndexError:
+        logger.info('%s: No descending index on __key__, '
+                    'performing serial download', self.kind)
+        self.parallel_download = False
+
+    if not self.parallel_download:
+      key_range.direction = KeyRange.ASC
+      query = key_range.MakeSerialQuery()
+      results = query.fetch(self.batch_size)
+
+    size = len(results)
+
+    for model in results:
+      key = model.key()
+      entities.append(cPickle.dumps(model))
+      keys.append(key)
+
+    continued = (size == self.batch_size)
+    key_range.count = size
+
+    return ExportResult(continued, key_range.direction, keys, entities)
 
 
 def InterruptibleSleep(sleep_time):
@@ -961,7 +1626,19 @@
   failed upload, the number of active threads is reduced by one.
   """
 
-  def __init__(self, enabled, sleep=InterruptibleSleep):
+  def __init__(self, enabled,
+               threshhold1=MAXIMUM_INCREASE_DURATION,
+               threshhold2=MAXIMUM_HOLD_DURATION,
+               sleep=InterruptibleSleep):
+    """Constructor for ThreadGate instances.
+
+    Args:
+      enabled: Whether the thread gate is enabled
+      threshhold1: Maximum duration (in seconds) for a transfer to increase
+        the number of active threads.
+      threshhold2: Maximum duration (in seconds) for a transfer to not decrease
+        the number of active threads.
+    """
     self.enabled = enabled
     self.enabled_count = 1
     self.lock = threading.Lock()
@@ -969,6 +1646,8 @@
     self._threads = []
     self.backoff_time = 0
     self.sleep = sleep
+    self.threshhold1 = threshhold1
+    self.threshhold2 = threshhold2
 
   def Register(self, thread):
     """Register a thread with the thread gate."""
@@ -990,7 +1669,7 @@
 
   def EnableAllThreads(self):
     """Enable all worker threads."""
-    for unused_idx in range(len(self._threads) - self.enabled_count):
+    for unused_idx in xrange(len(self._threads) - self.enabled_count):
       self.EnableThread()
 
   def StartWork(self):
@@ -1004,8 +1683,8 @@
       self.thread_semaphore.acquire()
       if self.backoff_time > 0.0:
         if not threading.currentThread().exit_flag:
-          logging.info('Backing off: %.1f seconds',
-                       self.backoff_time)
+          logger.info('Backing off: %.1f seconds',
+                      self.backoff_time)
         self.sleep(self.backoff_time)
 
   def FinishWork(self):
@@ -1013,15 +1692,22 @@
     if self.enabled:
       self.thread_semaphore.release()
 
-  def IncreaseWorkers(self):
+  def TransferSuccess(self, duration):
     """Informs the throttler that an item was successfully sent.
 
-    If thread throttling is enabled, this method will cause an
-    additional thread to run in the critical section.
+    If thread throttling is enabled and the duration is low enough, this
+    method will cause an additional thread to run in the critical section.
+
+    Args:
+      duration: The duration of the transfer in seconds.
     """
-    if self.enabled:
+    if duration > self.threshhold2:
+      self.DecreaseWorkers()
+    elif duration > self.threshhold1:
+      return
+    elif self.enabled:
       if self.backoff_time > 0.0:
-        logging.info('Resetting backoff to 0.0')
+        logger.info('Resetting backoff to 0.0')
         self.backoff_time = 0.0
       do_enable = False
       self.lock.acquire()
@@ -1032,6 +1718,8 @@
       finally:
         self.lock.release()
       if do_enable:
+        logger.debug('Increasing active thread count to %d',
+                     self.enabled_count)
         self.thread_semaphore.release()
 
   def DecreaseWorkers(self):
@@ -1058,6 +1746,8 @@
       finally:
         self.lock.release()
       if do_disable:
+        logger.debug('Decreasing the number of active threads to %d',
+                     self.enabled_count)
         self.thread_semaphore.acquire()
 
 
@@ -1207,10 +1897,10 @@
       if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION:
         break
 
-      logging.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
-                    '(duration=%.1f ms, total=%d)',
-                    thread.getName(), throttle_name,
-                    sleep_time * 1000, duration * 1000, total)
+      logger.debug('[%s] Throttling on %s. Sleeping for %.1f ms '
+                   '(duration=%.1f ms, total=%d)',
+                   thread.getName(), throttle_name,
+                   sleep_time * 1000, duration * 1000, total)
       self.thread_sleep(sleep_time)
       if thread.exit_flag:
         break
@@ -1299,15 +1989,15 @@
 
   def run(self):
     """Perform the work of the thread."""
-    logging.info('[%s] %s: started', self.getName(), self.__class__.__name__)
+    logger.info('[%s] %s: started', self.getName(), self.__class__.__name__)
 
     try:
       self.PerformWork()
     except:
       self.error = sys.exc_info()[1]
-      logging.exception('[%s] %s:', self.getName(), self.__class__.__name__)
-
-    logging.info('[%s] %s: exiting', self.getName(), self.__class__.__name__)
+      logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
+
+    logger.info('[%s] %s: exiting', self.getName(), self.__class__.__name__)
 
   def PerformWork(self):
     """Perform the thread-specific work."""
@@ -1316,7 +2006,7 @@
   def CheckError(self):
     """If an error is present, then log it."""
     if self.error:
-      logging.error('Error in %s: %s', self.GetFriendlyName(), self.error)
+      logger.error('Error in %s: %s', self.GetFriendlyName(), self.error)
 
   def GetFriendlyName(self):
     """Returns a human-friendly description of the thread."""
@@ -1325,7 +2015,184 @@
     return 'unknown thread'
 
 
-class BulkLoaderThread(_ThreadBase):
+non_fatal_error_codes = set([errno.EAGAIN,
+                             errno.ENETUNREACH,
+                             errno.ENETRESET,
+                             errno.ECONNRESET,
+                             errno.ETIMEDOUT,
+                             errno.EHOSTUNREACH])
+
+
+def IsURLErrorFatal(error):
+  """Returns False if the given URLError may be from a transient failure.
+
+  Args:
+    error: A urllib2.URLError instance.
+  """
+  assert isinstance(error, urllib2.URLError)
+  if not hasattr(error, 'reason'):
+    return True
+  if not isinstance(error.reason[0], int):
+    return True
+  return error.reason[0] not in non_fatal_error_codes
+
+
+def PrettyKey(key):
+  """Returns a nice string representation of the given key."""
+  if key is None:
+    return None
+  elif isinstance(key, db.Key):
+    return repr(key.id_or_name())
+  return str(key)
+
+
+class _BulkWorkerThread(_ThreadBase):
+  """A base class for worker threads.
+
+  This thread will read WorkItem instances from the work_queue and upload
+  the entities to the server application. Progress information will be
+  pushed into the progress_queue as the work is being performed.
+
+  If a _BulkWorkerThread encounters a transient error, the entities will be
+  resent, if a fatal error is encoutered the BulkWorkerThread exits.
+
+  Subclasses must provide implementations for PreProcessItem, TransferItem,
+  and ProcessResponse.
+  """
+
+  def __init__(self,
+               work_queue,
+               throttle,
+               thread_gate,
+               request_manager,
+               num_threads,
+               batch_size,
+               state_message,
+               get_time):
+    """Initialize the BulkLoaderThread instance.
+
+    Args:
+      work_queue: A queue containing WorkItems for processing.
+      throttle: A Throttles to control upload bandwidth.
+      thread_gate: A ThreadGate to control number of simultaneous uploads.
+      request_manager: A RequestManager instance.
+      num_threads: The number of threads for parallel transfers.
+      batch_size: The number of entities to transfer per request.
+      state_message: Used for dependency injection.
+      get_time: Used for dependency injection.
+    """
+    _ThreadBase.__init__(self)
+
+    self.work_queue = work_queue
+    self.throttle = throttle
+    self.thread_gate = thread_gate
+    self.request_manager = request_manager
+    self.num_threads = num_threads
+    self.batch_size = batch_size
+    self.state_message = state_message
+    self.get_time = get_time
+
+  def PreProcessItem(self, item):
+    """Performs pre transfer processing on a work item."""
+    raise NotImplementedError()
+
+  def TransferItem(self, item):
+    """Transfers the entities associated with an item.
+
+    Args:
+      item: An item of upload (WorkItem) or download (KeyRange) work.
+
+    Returns:
+      A tuple of (estimated transfer size, response)
+    """
+    raise NotImplementedError()
+
+  def ProcessResponse(self, item, result):
+    """Processes the response from the server application."""
+    raise NotImplementedError()
+
+  def PerformWork(self):
+    """Perform the work of a _BulkWorkerThread."""
+    while not self.exit_flag:
+      transferred = False
+      self.thread_gate.StartWork()
+      try:
+        try:
+          item = self.work_queue.get(block=True, timeout=1.0)
+        except Queue.Empty:
+          continue
+        if item == _THREAD_SHOULD_EXIT:
+          break
+
+        logger.debug('[%s] Got work item %s', self.getName(), item)
+
+        try:
+
+          item.MarkAsTransferring()
+          self.PreProcessItem(item)
+          response = None
+          try:
+            try:
+              t = self.get_time()
+              response = self.TransferItem(item)
+              status = 200
+              transferred = True
+              transfer_time = self.get_time() - t
+              logger.debug('[%s] %s Transferred %d entities', self.getName(),
+                           item, item.count)
+              self.throttle.AddTransfer(RECORDS, item.count)
+            except (db.InternalError, db.NotSavedError, db.Timeout,
+                    apiproxy_errors.OverQuotaError,
+                    apiproxy_errors.DeadlineExceededError), e:
+              logger.exception('Caught non-fatal datastore error: %s', e)
+            except urllib2.HTTPError, e:
+              status = e.code
+              if status == 403 or (status >= 500 and status < 600):
+                logger.exception('Caught non-fatal HTTP error: %d %s',
+                                 status, e.msg)
+              else:
+                raise e
+            except urllib2.URLError, e:
+              if IsURLErrorFatal(e):
+                raise e
+              else:
+                logger.exception('Caught non-fatal URL error: %s', e.reason)
+
+            self.ProcessResponse(item, response)
+
+          except:
+            self.error = sys.exc_info()[1]
+            logger.exception('[%s] %s: caught exception %s', self.getName(),
+                             self.__class__.__name__, str(sys.exc_info()))
+            raise
+
+        finally:
+          if transferred:
+            item.MarkAsTransferred()
+            self.thread_gate.TransferSuccess(transfer_time)
+            self.work_queue.task_done()
+          else:
+            item.MarkAsError()
+            try:
+              self.work_queue.reput(item, block=False)
+            except Queue.Full:
+              logger.error('[%s] Failed to reput work item.', self.getName())
+              raise Error('Failed to reput work item')
+            self.thread_gate.DecreaseWorkers()
+          logger.info('%s %s',
+                      item,
+                      self.state_message(item.state))
+
+      finally:
+        self.thread_gate.FinishWork()
+
+
+  def GetFriendlyName(self):
+    """Returns a human-friendly name for this thread."""
+    return 'worker [%s]' % self.getName()
+
+
+class BulkLoaderThread(_BulkWorkerThread):
   """A thread which transmits entities to the server application.
 
   This thread will read WorkItem instances from the work_queue and upload
@@ -1340,7 +2207,10 @@
                work_queue,
                throttle,
                thread_gate,
-               request_manager):
+               request_manager,
+               num_threads,
+               batch_size,
+               get_time=time.time):
     """Initialize the BulkLoaderThread instance.
 
     Args:
@@ -1348,82 +2218,102 @@
       throttle: A Throttles to control upload bandwidth.
       thread_gate: A ThreadGate to control number of simultaneous uploads.
       request_manager: A RequestManager instance.
+      num_threads: The number of threads for parallel transfers.
+      batch_size: The number of entities to transfer per request.
+      get_time: Used for dependency injection.
     """
-    _ThreadBase.__init__(self)
-
-    self.work_queue = work_queue
-    self.throttle = throttle
-    self.thread_gate = thread_gate
-
-    self.request_manager = request_manager
-
-  def PerformWork(self):
-    """Perform the work of a BulkLoaderThread."""
-    while not self.exit_flag:
-      success = False
-      self.thread_gate.StartWork()
-      try:
-        try:
-          item = self.work_queue.get(block=True, timeout=1.0)
-        except Queue.Empty:
-          continue
-        if item == _THREAD_SHOULD_EXIT:
-          break
-
-        logging.debug('[%s] Got work item [%d-%d]',
-                      self.getName(), item.key_start, item.key_end)
-
-        try:
-
-          item.MarkAsSending()
-          try:
-            if item.content is None:
-              item.content = self.request_manager.EncodeContent(item.rows)
-            try:
-              self.request_manager.PostEntities(item)
-              success = True
-              logging.debug(
-                  '[%d-%d] Sent %d entities',
-                  item.key_start, item.key_end, item.count)
-              self.throttle.AddTransfer(RECORDS, item.count)
-            except (db.InternalError, db.NotSavedError, db.Timeout), e:
-              logging.debug('Caught non-fatal error: %s', e)
-            except urllib2.HTTPError, e:
-              if e.code == 403 or (e.code >= 500 and e.code < 600):
-                logging.debug('Caught HTTP error %d', e.code)
-                logging.debug('%s', e.read())
-              else:
-                raise e
-
-          except:
-            self.error = sys.exc_info()[1]
-            logging.exception('[%s] %s: caught exception %s', self.getName(),
-                              self.__class__.__name__, str(sys.exc_info()))
-            raise
-
-        finally:
-          if success:
-            item.MarkAsSent()
-            self.thread_gate.IncreaseWorkers()
-            self.work_queue.task_done()
-          else:
-            item.MarkAsError()
-            self.thread_gate.DecreaseWorkers()
-            try:
-              self.work_queue.reput(item, block=False)
-            except Queue.Full:
-              logging.error('[%s] Failed to reput work item.', self.getName())
-              raise Error('Failed to reput work item')
-          logging.info('[%d-%d] %s',
-                       item.key_start, item.key_end, StateMessage(item.state))
-
-      finally:
-        self.thread_gate.FinishWork()
-
-
-  def GetFriendlyName(self):
-    """Returns a human-friendly name for this thread."""
-    return 'worker [%s]' % self.getName()
+    _BulkWorkerThread.__init__(self,
+                               work_queue,
+                               throttle,
+                               thread_gate,
+                               request_manager,
+                               num_threads,
+                               batch_size,
+                               ImportStateMessage,
+                               get_time)
+
+  def PreProcessItem(self, item):
+    """Performs pre transfer processing on a work item."""
+    if item and not item.content:
+      item.content = self.request_manager.EncodeContent(item.rows)
+
+  def TransferItem(self, item):
+    """Transfers the entities associated with an item.
+
+    Args:
+      item: An item of upload (WorkItem) work.
+
+    Returns:
+      A tuple of (estimated transfer size, response)
+    """
+    return self.request_manager.PostEntities(item)
+
+  def ProcessResponse(self, item, response):
+    """Processes the response from the server application."""
+    pass
+
+
+class BulkExporterThread(_BulkWorkerThread):
+  """A thread which recieved entities to the server application.
+
+  This thread will read KeyRange instances from the work_queue and export
+  the entities from the server application. Progress information will be
+  pushed into the progress_queue as the work is being performed.
+
+  If a BulkExporterThread encounters an error when trying to post data,
+  the thread will exit and cause the application to terminate.
+  """
+
+  def __init__(self,
+               work_queue,
+               throttle,
+               thread_gate,
+               request_manager,
+               num_threads,
+               batch_size,
+               get_time=time.time):
+
+    """Initialize the BulkExporterThread instance.
+
+    Args:
+      work_queue: A queue containing KeyRanges for processing.
+      throttle: A Throttles to control upload bandwidth.
+      thread_gate: A ThreadGate to control number of simultaneous uploads.
+      request_manager: A RequestManager instance.
+      num_threads: The number of threads for parallel transfers.
+      batch_size: The number of entities to transfer per request.
+      get_time: Used for dependency injection.
+    """
+    _BulkWorkerThread.__init__(self,
+                               work_queue,
+                               throttle,
+                               thread_gate,
+                               request_manager,
+                               num_threads,
+                               batch_size,
+                               ExportStateMessage,
+                               get_time)
+
+  def PreProcessItem(self, unused_item):
+    """Performs pre transfer processing on a work item."""
+    pass
+
+  def TransferItem(self, item):
+    """Transfers the entities associated with an item.
+
+    Args:
+      item: An item of download (KeyRange) work.
+
+    Returns:
+      A tuple of (estimated transfer size, response)
+    """
+    return self.request_manager.GetEntities(item)
+
+  def ProcessResponse(self, item, export_result):
+    """Processes the response from the server application."""
+    if export_result:
+      item.Process(export_result, self.num_threads, self.batch_size,
+                   self.work_queue)
 
 
 class DataSourceThread(_ThreadBase):
@@ -1471,7 +2361,7 @@
     content_gen = self.workitem_generator_factory(self.progress_queue,
                                                   progress_gen)
 
-    self.sent_count = 0
+    self.xfer_count = 0
     self.read_count = 0
     self.read_all = False
 
@@ -1492,7 +2382,7 @@
     if not self.exit_flag:
       self.read_all = True
     self.read_count = content_gen.row_count
-    self.sent_count = content_gen.sent_count
+    self.xfer_count = content_gen.xfer_count
 
 
 
@@ -1501,62 +2391,95 @@
   return threading.currentThread().getName() == thread.getName()
 
 
-class ProgressDatabase(object):
-  """Persistently record all progress information during an upload.
-
-  This class wraps a very simple SQLite database which records each of
-  the relevant details from the WorkItem instances. If the uploader is
-  resumed, then data is replayed out of the database.
+class _Database(object):
+  """Base class for database connections in this module.
+
+  The table is created by a primary thread (the python main thread)
+  but all future lookups and updates are performed by a secondary
+  thread.
   """
 
-  def __init__(self, db_filename, commit_periodicity=100):
-    """Initialize the ProgressDatabase instance.
+  SIGNATURE_TABLE_NAME = 'bulkloader_database_signature'
+
+  def __init__(self,
+               db_filename,
+               create_table,
+               signature,
+               index=None,
+               commit_periodicity=100):
+    """Initialize the _Database instance.
 
     Args:
-      db_filename: The name of the SQLite database to use.
-      commit_periodicity: How many operations to perform between commits.
+      db_filename: The sqlite3 file to use for the database.
+      create_table: A string containing the SQL table creation command.
+      signature: A string identifying the important invocation options,
+        used to make sure we are not using an old database.
+      index: An optional string to create an index for the database.
+      commit_periodicity: Number of operations between database commits.
     """
     self.db_filename = db_filename
 
-    logging.info('Using progress database: %s', db_filename)
+    logger.info('Opening database: %s', db_filename)
     self.primary_conn = sqlite3.connect(db_filename, isolation_level=None)
     self.primary_thread = threading.currentThread()
 
-    self.progress_conn = None
-    self.progress_thread = None
+    self.secondary_conn = None
+    self.secondary_thread = None
 
     self.operation_count = 0
     self.commit_periodicity = commit_periodicity
 
-    self.prior_key_end = None
-
     try:
-      self.primary_conn.execute(
-          """create table progress (
-          id integer primary key autoincrement,
-          state integer not null,
-          key_start integer not null,
-          key_end integer not null
-          )
-          """)
+      self.primary_conn.execute(create_table)
     except sqlite3.OperationalError, e:
       if 'already exists' not in e.message:
         raise
 
+    if index:
+      try:
+        self.primary_conn.execute(index)
+      except sqlite3.OperationalError, e:
+        if 'already exists' not in e.message:
+          raise
+
+    self.existing_table = False
+    signature_cursor = self.primary_conn.cursor()
+    create_signature = """
+      create table %s (
+      value TEXT not null)
+    """ % _Database.SIGNATURE_TABLE_NAME
     try:
-      self.primary_conn.execute('create index i_state on progress (state)')
+      self.primary_conn.execute(create_signature)
+      self.primary_conn.cursor().execute(
+          'insert into %s (value) values (?)' % _Database.SIGNATURE_TABLE_NAME,
+          (signature,))
     except sqlite3.OperationalError, e:
       if 'already exists' not in e.message:
+        logger.exception('Exception creating table:')
         raise
+      else:
+        self.existing_table = True
+        signature_cursor.execute(
+            'select * from %s' % _Database.SIGNATURE_TABLE_NAME)
+        (result,) = signature_cursor.fetchone()
+        if result and result != signature:
+          logger.error('Database signature mismatch:\n\n'
+                       'Found:\n'
+                       '%s\n\n'
+                       'Expecting:\n'
+                       '%s\n',
+                       result, signature)
+          raise ResumeError('Database signature mismatch: %s != %s' % (
+                            signature, result))
 
   def ThreadComplete(self):
-    """Finalize any operations the progress thread has performed.
+    """Finalize any operations the secondary thread has performed.
 
     The database aggregates lots of operations into a single commit, and
     this method is used to commit any pending operations as the thread
     is about to shut down.
     """
-    if self.progress_conn:
+    if self.secondary_conn:
       self._MaybeCommit(force_commit=True)
 
   def _MaybeCommit(self, force_commit=False):
@@ -1573,28 +2496,175 @@
     """
     self.operation_count += 1
     if force_commit or (self.operation_count % self.commit_periodicity) == 0:
-      self.progress_conn.commit()
-
-  def _OpenProgressConnection(self):
-    """Possibly open a database connection for the progress tracker thread.
+      self.secondary_conn.commit()
+
+  def _OpenSecondaryConnection(self):
+    """Possibly open a database connection for the secondary thread.
 
     If the connection is not open (for the calling thread, which is assumed
-    to be the progress tracker thread), then open it. We also open a couple
+    to be the unique secondary thread), then open it. We also open a couple
     cursors for later use (and reuse).
     """
-    if self.progress_conn:
+    if self.secondary_conn:
       return
 
     assert not _RunningInThread(self.primary_thread)
 
-    self.progress_thread = threading.currentThread()
-
-    self.progress_conn = sqlite3.connect(self.db_filename)
-
-    self.insert_cursor = self.progress_conn.cursor()
-    self.update_cursor = self.progress_conn.cursor()
-
-  def HasUnfinishedWork(self):
+    self.secondary_thread = threading.currentThread()
+
+    self.secondary_conn = sqlite3.connect(self.db_filename)
+
+    self.insert_cursor = self.secondary_conn.cursor()
+    self.update_cursor = self.secondary_conn.cursor()
+
+
+class ResultDatabase(_Database):
+  """Persistently record all the entities downloaded during an export.
+
+  The entities are held in the database by their unique datastore key
+  in order to avoid duplication if an export is restarted.
+  """
+
+  def __init__(self, db_filename, signature, commit_periodicity=1):
+    """Initialize a ResultDatabase object.
+
+    Args:
+      db_filename: The name of the SQLite database to use.
+      signature: A string identifying the important invocation options,
+        used to make sure we are not using an old database.
+      commit_periodicity: How many operations to perform between commits.
+    """
+    self.complete = False
+    create_table = ('create table result (\n'
+                    'id TEXT primary key,\n'
+                    'value BLOB not null)')
+
+    _Database.__init__(self,
+                       db_filename,
+                       create_table,
+                       signature,
+                       commit_periodicity=commit_periodicity)
+    if self.existing_table:
+      cursor = self.primary_conn.cursor()
+      cursor.execute('select count(*) from result')
+      self.existing_count = int(cursor.fetchone()[0])
+    else:
+      self.existing_count = 0
+    self.count = self.existing_count
+
+  def _StoreEntity(self, entity_id, value):
+    """Store an entity in the result database.
+
+    Args:
+      entity_id: A db.Key for the entity.
+      value: A string of the contents of the entity.
+
+    Returns:
+      True if this entities is not already present in the result database.
+    """
+
+    assert _RunningInThread(self.secondary_thread)
+    assert isinstance(entity_id, db.Key)
+
+    entity_id = entity_id.id_or_name()
+    self.insert_cursor.execute(
+        'select count(*) from result where id = ?', (unicode(entity_id),))
+    already_present = self.insert_cursor.fetchone()[0]
+    result = True
+    if already_present:
+      result = False
+      self.insert_cursor.execute('delete from result where id = ?',
+                                 (unicode(entity_id),))
+    else:
+      self.count += 1
+    self.insert_cursor.execute(
+        'insert into result (id, value) values (?, ?)',
+        (unicode(entity_id), buffer(value)))
+    return result
+
+  def StoreEntities(self, keys, entities):
+    """Store a group of entities in the result database.
+
+    Args:
+      keys: A list of entity keys.
+      entities: A list of entities.
+
+    Returns:
+      The number of new entities stored in the result database.
+    """
+    self._OpenSecondaryConnection()
+    t = time.time()
+    count = 0
+    for entity_id, value in zip(keys,
+                                entities):
+      if self._StoreEntity(entity_id, value):
+        count += 1
+    logger.debug('%s insert: delta=%.3f',
+                 self.db_filename,
+                 time.time() - t)
+    logger.debug('Entities transferred total: %s', self.count)
+    self._MaybeCommit()
+    return count
+
+  def ResultsComplete(self):
+    """Marks the result database as containing complete results."""
+    self.complete = True
+
+  def AllEntities(self):
+    """Yields all pairs of (id, value) from the result table."""
+    conn = sqlite3.connect(self.db_filename, isolation_level=None)
+    cursor = conn.cursor()
+
+    cursor.execute(
+        'select id, value from result order by id')
+
+    for unused_entity_id, entity in cursor:
+      yield cPickle.loads(str(entity))
+
+
+class _ProgressDatabase(_Database):
+  """Persistently record all progress information during an upload.
+
+  This class wraps a very simple SQLite database which records each of
+  the relevant details from a chunk of work. If the loader is
+  resumed, then data is replayed out of the database.
+  """
+
+  def __init__(self,
+               db_filename,
+               sql_type,
+               py_type,
+               signature,
+               commit_periodicity=100):
+    """Initialize the ProgressDatabase instance.
+
+    Args:
+      db_filename: The name of the SQLite database to use.
+      sql_type: A string of the SQL type to use for entity keys.
+      py_type: The python type of entity keys.
+      signature: A string identifying the important invocation options,
+        used to make sure we are not using an old database.
+      commit_periodicity: How many operations to perform between commits.
+    """
+    self.prior_key_end = None
+
+    create_table = ('create table progress (\n'
+                    'id integer primary key autoincrement,\n'
+                    'state integer not null,\n'
+                    'key_start %s,\n'
+                    'key_end %s)'
+                    % (sql_type, sql_type))
+    self.py_type = py_type
+
+    index = 'create index i_state on progress (state)'
+    _Database.__init__(self,
+                       db_filename,
+                       create_table,
+                       signature,
+                       index=index,
+                       commit_periodicity=commit_periodicity)
+
+  def UseProgressData(self):
     """Returns True if the database has progress information.
 
     Note there are two basic cases for progress information:
@@ -1605,10 +2675,10 @@
        data.
 
     Returns:
-      True if the database has progress information, False otherwise.
+      True: if the database has progress information.
 
     Raises:
-      ResumeError: If there is an error reading the progress database.
+      ResumeError: if there is an error retrieving rows from the database.
     """
     assert _RunningInThread(self.primary_thread)
 
@@ -1616,7 +2686,7 @@
     cursor.execute('select count(*) from progress')
     row = cursor.fetchone()
     if row is None:
-      raise ResumeError('Error reading progress information.')
+      raise ResumeError('Cannot retrieve progress information from database.')
 
     return row[0] != 0
 
@@ -1642,20 +2712,18 @@
     Returns:
       A string to later be used as a unique key to update this state.
     """
-    self._OpenProgressConnection()
-
-    assert _RunningInThread(self.progress_thread)
-    assert isinstance(key_start, int)
-    assert isinstance(key_end, int)
-    assert key_start <= key_end
-
-    if self.prior_key_end is not None:
-      assert key_start > self.prior_key_end
-    self.prior_key_end = key_end
+    self._OpenSecondaryConnection()
+
+    assert _RunningInThread(self.secondary_thread)
+    assert not key_start or isinstance(key_start, self.py_type)
+    assert not key_end or isinstance(key_end, self.py_type), '%s is a %s' % (
+        key_end, key_end.__class__)
+    assert KeyLEQ(key_start, key_end), '%s not less than %s' % (
+        repr(key_start), repr(key_end))
 
     self.insert_cursor.execute(
         'insert into progress (state, key_start, key_end) values (?, ?, ?)',
-        (STATE_READ, key_start, key_end))
+        (STATE_READ, unicode(key_start), unicode(key_end)))
 
     progress_key = self.insert_cursor.lastrowid
 
@@ -1670,9 +2738,9 @@
       key: The key for this progress record, returned from StoreKeys
       new_state: The new state to associate with this progress record.
     """
-    self._OpenProgressConnection()
-
-    assert _RunningInThread(self.progress_thread)
+    self._OpenSecondaryConnection()
+
+    assert _RunningInThread(self.secondary_thread)
     assert isinstance(new_state, int)
 
     self.update_cursor.execute('update progress set state=? where id=?',
@@ -1680,8 +2748,22 @@
 
     self._MaybeCommit()
 
+  def DeleteKey(self, progress_key):
+    """Delete the entities with the given key from the result database."""
+    self._OpenSecondaryConnection()
+
+    assert _RunningInThread(self.secondary_thread)
+
+    t = time.time()
+    self.insert_cursor.execute(
+        'delete from progress where rowid = ?', (progress_key,))
+
+    logger.debug('delete: delta=%.3f', time.time() - t)
+
+    self._MaybeCommit()
+
   def GetProgressStatusGenerator(self):
-    """Get a generator which returns progress information.
+    """Get a generator which yields progress information.
 
     The returned generator will yield a series of 4-tuples that specify
     progress information about a prior run of the uploader. The 4-tuples
@@ -1706,16 +2788,19 @@
     The caller should begin uploading records which occur after key_end.
 
     Yields:
-      Progress information as tuples (progress_key, state, key_start, key_end).
+      Four-tuples of (progress_key, state, key_start, key_end)
     """
     conn = sqlite3.connect(self.db_filename, isolation_level=None)
     cursor = conn.cursor()
 
-    cursor.execute('select max(id) from progress')
-    batch_id = cursor.fetchone()[0]
-
-    cursor.execute('select key_end from progress where id = ?', (batch_id,))
-    key_end = cursor.fetchone()[0]
+    cursor.execute('select max(key_end) from progress')
+
+    result = cursor.fetchone()
+    if result is not None:
+      key_end = result[0]
+    else:
+      logger.debug('No rows in progress database.')
+      return
 
     self.prior_key_end = key_end
 
@@ -1730,16 +2815,43 @@
     for row in rows:
       if row is None:
         break
-
-      yield row
+      progress_key, state, key_start, key_end = row
+
+      yield progress_key, state, key_start, key_end
 
     yield None, DATA_CONSUMED_TO_HERE, None, key_end
 
 
+def ProgressDatabase(db_filename, signature):
+  """Returns a database to store upload progress information."""
+  return _ProgressDatabase(db_filename, 'INTEGER', int, signature)
+
+
+class ExportProgressDatabase(_ProgressDatabase):
+  """A database to store download progress information."""
+
+  def __init__(self, db_filename, signature):
+    """Initialize an ExportProgressDatabase."""
+    _ProgressDatabase.__init__(self,
+                               db_filename,
+                               'TEXT',
+                               db.Key,
+                               signature,
+                               commit_periodicity=1)
+
+  def UseProgressData(self):
+    """Check if the progress database contains progress data.
+
+    Returns:
+      True: if the database contains progress data.
+    """
+    return self.existing_table
+
+
 class StubProgressDatabase(object):
   """A stub implementation of ProgressDatabase which does nothing."""
 
-  def HasUnfinishedWork(self):
+  def UseProgressData(self):
     """Whether the stub database has progress information (it doesn't)."""
     return False
 
@@ -1756,7 +2868,7 @@
     pass
 
 
-class ProgressTrackerThread(_ThreadBase):
+class _ProgressThreadBase(_ThreadBase):
   """A thread which records progress information for the upload process.
 
   The progress information is stored into the provided progress database.
@@ -1779,7 +2891,23 @@
 
     self.progress_queue = progress_queue
     self.db = progress_db
-    self.entities_sent = 0
+    self.entities_transferred = 0
+
+  def EntitiesTransferred(self):
+    """Return the total number of unique entities transferred."""
+    return self.entities_transferred
+
+  def UpdateProgress(self, item):
+    """Updates the progress information for the given item.
+
+    Args:
+      item: A work item whose new state will be recorded
+    """
+    raise NotImplementedError()
+
+  def WorkFinished(self):
+    """Performs final actions after the entity transfer is complete."""
+    raise NotImplementedError()
 
   def PerformWork(self):
     """Performs the work of a ProgressTrackerThread."""
@@ -1795,10 +2923,7 @@
         item.progress_key = self.db.StoreKeys(item.key_start, item.key_end)
       else:
         assert item.progress_key is not None
-
-        self.db.UpdateState(item.progress_key, item.state)
-        if item.state == STATE_SENT:
-          self.entities_sent += item.count
+        self.UpdateProgress(item)
 
       item.progress_event.set()
 
@@ -1808,6 +2933,106 @@
 
 
 
+class ProgressTrackerThread(_ProgressThreadBase):
+  """A thread which records progress information for the upload process.
+
+  The progress information is stored into the provided progress database.
+  This class is not responsible for replaying a prior run's progress
+  information out of the database. Separate mechanisms must be used to
+  resume a prior upload attempt.
+  """
+  NAME = 'progress tracking thread'
+
+  def __init__(self, progress_queue, progress_db):
+    """Initialize the ProgressTrackerThread instance.
+
+    Args:
+      progress_queue: A Queue used for tracking progress information.
+      progress_db: The database for tracking progress information; should
+        be an instance of ProgressDatabase.
+    """
+    _ProgressThreadBase.__init__(self, progress_queue, progress_db)
+
+  def UpdateProgress(self, item):
+    """Update the state of the given WorkItem.
+
+    Args:
+      item: A WorkItem instance.
+    """
+    self.db.UpdateState(item.progress_key, item.state)
+    if item.state == STATE_SENT:
+      self.entities_transferred += item.count
+
+  def WorkFinished(self):
+    """Performs final actions after the entity transfer is complete."""
+    pass
+
+
+class ExportProgressThread(_ProgressThreadBase):
+  """A thread to record progress information and write record data for exports.
+
+  The progress information is stored into a provided progress database.
+  Exported results are stored in the result database and dumped to an output
+  file at the end of the download.
+  """
+
+  def __init__(self, kind, progress_queue, progress_db, result_db):
+    """Initialize the ExportProgressThread instance.
+
+    Args:
+      kind: The kind of entities being stored in the database.
+      progress_queue: A Queue used for tracking progress information.
+      progress_db: The database for tracking progress information; should
+        be an instance of ProgressDatabase.
+      result_db: The database for holding exported entities; should be an
+        instance of ResultDatabase.
+    """
+    _ProgressThreadBase.__init__(self, progress_queue, progress_db)
+
+    self.kind = kind
+    self.existing_count = result_db.existing_count
+    self.result_db = result_db
+
+  def EntitiesTransferred(self):
+    """Return the total number of unique entities transferred."""
+    return self.result_db.count
+
+  def WorkFinished(self):
+    """Write the contents of the result database."""
+    exporter = Exporter.RegisteredExporter(self.kind)
+    exporter.output_entities(self.result_db.AllEntities())
+
+  def UpdateProgress(self, item):
+    """Update the state of the given KeyRange.
+
+    Args:
+      item: A KeyRange instance.
+    """
+    if item.state == STATE_GOT:
+      count = self.result_db.StoreEntities(item.export_result.keys,
+                                           item.export_result.entities)
+      self.db.DeleteKey(item.progress_key)
+      self.entities_transferred += count
+    else:
+      self.db.UpdateState(item.progress_key, item.state)
+
+
+def ParseKey(key_string):
+  """Turn a key stored in the database into a db.Key or None.
+
+  Args:
+    key_string: The string representation of a db.Key.
+
+  Returns:
+    A db.Key instance or None
+  """
+  if not key_string:
+    return None
+  if key_string == 'None':
+    return None
+  return db.Key(encoded=key_string)
+
+
 def Validate(value, typ):
   """Checks that value is non-empty and of the right type.
 
@@ -1816,9 +3041,8 @@
     typ: a type or tuple of types
 
   Raises:
-    ValueError if value is None or empty.
-    TypeError if it's not the given type.
-
+    ValueError: if value is None or empty.
+    TypeError: if it's not the given type.
   """
   if not value:
     raise ValueError('Value should not be empty; received %s.' % value)
@@ -1827,6 +3051,22 @@
                     (typ, value, value.__class__))
 
 
+def CheckFile(filename):
+  """Check that the given file exists and can be opened for reading.
+
+  Args:
+    filename: The name of the file.
+
+  Raises:
+    FileNotFoundError: if the given filename is not found
+    FileNotReadableError: if the given filename is not readable.
+  """
+  if not os.path.exists(filename):
+    raise FileNotFoundError('%s: file not found' % filename)
+  elif not os.access(filename, os.R_OK):
+    raise FileNotReadableError('%s: file not readable' % filename)
+
+
 class Loader(object):
   """A base class for creating datastore entities from input data.
 
@@ -1836,14 +3076,14 @@
 
   If you need to run extra code to convert entities from the input
   data, create new properties, or otherwise modify the entities before
-  they're inserted, override HandleEntity.
-
-  See the CreateEntity method for the creation of entities from the
+  they're inserted, override handle_entity.
+
+  See the create_entity method for the creation of entities from the
   (parsed) input data.
   """
 
   __loaders = {}
-  __kind = None
+  kind = None
   __properties = None
 
   def __init__(self, kind, properties):
@@ -1858,11 +3098,11 @@
 
       properties: list of (name, converter) tuples.
 
-        This is used to automatically convert the CSV columns into
+        This is used to automatically convert the input columns into
         properties.  The converter should be a function that takes one
-        argument, a string value from the CSV file, and returns a
+        argument, a string value from the input file, and returns a
         correctly typed property value that should be inserted. The
-        tuples in this list should match the columns in your CSV file,
+        tuples in this list should match the columns in your input file,
         in order.
 
         For example:
@@ -1874,10 +3114,12 @@
            ('description', datastore_types.Text),
            ]
     """
-    Validate(kind, basestring)
-    self.__kind = kind
-
-    db.class_for_kind(kind)
+    Validate(kind, (basestring, tuple))
+    self.kind = kind
+    self.__openfile = open
+    self.__create_csv_reader = csv.reader
+
+    GetImplementationClass(kind)
 
     Validate(properties, list)
     for name, fn in properties:
@@ -1890,49 +3132,66 @@
   @staticmethod
   def RegisterLoader(loader):
 
-    Loader.__loaders[loader.__kind] = loader
-
-  def kind(self):
-    """ Return the entity kind that this Loader handes.
-    """
-    return self.__kind
-
-  def CreateEntity(self, values, key_name=None):
+    Loader.__loaders[loader.kind] = loader
+
+  def alias_old_names(self):
+    """Aliases method names so that Loaders defined with old names work."""
+    aliases = (
+        ('CreateEntity', 'create_entity'),
+        ('HandleEntity', 'handle_entity'),
+        ('GenerateKey', 'generate_key'),
+        )
+    for old_name, new_name in aliases:
+      setattr(Loader, old_name, getattr(Loader, new_name))
+      if hasattr(self.__class__, old_name) and not (
+          getattr(self.__class__, old_name).im_func ==
+          getattr(Loader, new_name).im_func):
+        if hasattr(self.__class__, new_name) and not (
+            getattr(self.__class__, new_name).im_func ==
+            getattr(Loader, new_name).im_func):
+          raise NameClashError(old_name, new_name, self.__class__)
+        setattr(self, new_name, getattr(self, old_name))
+
+  def create_entity(self, values, key_name=None, parent=None):
     """Creates a entity from a list of property values.
 
     Args:
       values: list/tuple of str
       key_name: if provided, the name for the (single) resulting entity
+      parent: A db.Key instance for the parent, or None
 
     Returns:
       list of db.Model
 
       The returned entities are populated with the property values from the
       argument, converted to native types using the properties map given in
-      the constructor, and passed through HandleEntity. They're ready to be
+      the constructor, and passed through handle_entity. They're ready to be
       inserted.
 
     Raises:
-      AssertionError if the number of values doesn't match the number
+      AssertionError: if the number of values doesn't match the number
         of properties in the properties map.
-      ValueError if any element of values is None or empty.
-      TypeError if values is not a list or tuple.
+      ValueError: if any element of values is None or empty.
+      TypeError: if values is not a list or tuple.
     """
     Validate(values, (list, tuple))
     assert len(values) == len(self.__properties), (
-      'Expected %d CSV columns, found %d.' %
-      (len(self.__properties), len(values)))
-
-    model_class = db.class_for_kind(self.__kind)
-
-    properties = {'key_name': key_name}
+        'Expected %d columns, found %d.' %
+        (len(self.__properties), len(values)))
+
+    model_class = GetImplementationClass(self.kind)
+
+    properties = {
+        'key_name': key_name,
+        'parent': parent,
+        }
     for (name, converter), val in zip(self.__properties, values):
       if converter is bool and val.lower() in ('0', 'false', 'no'):
-          val = False
+        val = False
       properties[name] = converter(val)
 
     entity = model_class(**properties)
-    entities = self.HandleEntity(entity)
+    entities = self.handle_entity(entity)
 
     if entities:
       if not isinstance(entities, (list, tuple)):
@@ -1945,7 +3204,7 @@
 
     return entities
 
-  def GenerateKey(self, i, values):
+  def generate_key(self, i, values):
     """Generates a key_name to be used in creating the underlying object.
 
     The default implementation returns None.
@@ -1953,8 +3212,9 @@
     This method can be overridden to control the key generation for
     uploaded entities. The value returned should be None (to use a
     server generated numeric key), or a string which neither starts
-    with a digit nor has the form __*__. (See
-    http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html)
+    with a digit nor has the form __*__ (see
+    http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html),
+    or a db.Key instance.
 
     If you generate your own string keys, keep in mind:
 
@@ -1972,16 +3232,16 @@
     """
     return None
 
-  def HandleEntity(self, entity):
+  def handle_entity(self, entity):
     """Subclasses can override this to add custom entity conversion code.
 
-    This is called for each entity, after its properties are populated from
-    CSV but before it is stored. Subclasses can override this to add custom
-    entity handling code.
-
-    The entity to be inserted should be returned. If multiple entities should
-    be inserted, return a list of entities. If no entities should be inserted,
-    return None or [].
+    This is called for each entity, after its properties are populated
+    from the input but before it is stored. Subclasses can override
+    this to add custom entity handling code.
+
+    The entity to be inserted should be returned. If multiple entities
+    should be inserted, return a list of entities. If no entities
+    should be inserted, return None or [].
 
     Args:
       entity: db.Model
@@ -1991,12 +3251,213 @@
     """
     return entity
 
+  def initialize(self, filename, loader_opts):
+    """Performs initialization and validation of the input file.
+
+    This implementation checks that the input file exists and can be
+    opened for reading.
+
+    Args:
+      filename: The string given as the --filename flag argument.
+      loader_opts: The string given as the --loader_opts flag argument.
+    """
+    CheckFile(filename)
+
+  def finalize(self):
+    """Performs finalization actions after the upload completes."""
+    pass
+
+  def generate_records(self, filename):
+    """Subclasses can override this to add custom data input code.
+
+    This method must yield fixed-length lists of strings.
+
+    The default implementation uses csv.reader to read CSV rows
+    from filename.
+
+    Args:
+      filename: The string input for the --filename option.
+
+    Yields:
+      Lists of strings.
+    """
+    csv_generator = CSVGenerator(filename, openfile=self.__openfile,
+                                 create_csv_reader=self.__create_csv_reader
+                                ).Records()
+    return csv_generator
 
   @staticmethod
   def RegisteredLoaders():
-    """Returns a list of the Loader instances that have been created.
+    """Returns a dict of the Loader instances that have been created."""
+    return dict(Loader.__loaders)
+
+  @staticmethod
+  def RegisteredLoader(kind):
+    """Returns the loader instance for the given kind if it exists."""
+    return Loader.__loaders[kind]
+
+
+class Exporter(object):
+  """A base class for serializing datastore entities.
+
+  To add a handler for exporting an entity kind from your datastore,
+  write a subclass of this class that calls Exporter.__init__ from your
+  class's __init__.
+
+  If you need to run extra code to convert entities from the input
+  data, create new properties, or otherwise modify the entities before
+  they're inserted, override handle_entity.
+
+  See the output_entities method for the writing of data from entities.
+  """
+
+  __exporters = {}
+  kind = None
+  __properties = None
+
+  def __init__(self, kind, properties):
+    """Constructor.
+
+    Populates this Exporters's kind and properties map. Also registers
+    it so that all you need to do is instantiate your Exporter, and
+    the bulkload handler will automatically use it.
+
+    Args:
+      kind: a string containing the entity kind that this exporter handles
+
+      properties: list of (name, converter, default) tuples.
+
+      This is used to automatically convert the entities to strings.
+      The converter should be a function that takes one argument, a property
+      value of the appropriate type, and returns a str or unicode.  The default
+      is a string to be used if the property is not present, or None to fail
+      with an error if the property is missing.
+
+      For example:
+        [('name', str, None),
+         ('id_number', str, None),
+         ('email', str, ''),
+         ('user', str, None),
+         ('birthdate',
+          lambda x: str(datetime.datetime.fromtimestamp(float(x))),
+          None),
+         ('description', str, ''),
+         ]
     """
-    return dict(Loader.__loaders)
+    Validate(kind, basestring)
+    self.kind = kind
+
+    GetImplementationClass(kind)
+
+    Validate(properties, list)
+    for name, fn, default in properties:
+      Validate(name, basestring)
+      assert callable(fn), (
+          'Conversion function %s for property %s is not callable.' % (
+              fn, name))
+      if default:
+        Validate(default, basestring)
+
+    self.__properties = properties
+
+  @staticmethod
+  def RegisterExporter(exporter):
+
+    Exporter.__exporters[exporter.kind] = exporter
+
+  def __ExtractProperties(self, entity):
+    """Converts an entity into a list of string values.
+
+    Args:
+      entity: An entity to extract the properties from.
+
+    Returns:
+      A list of the properties of the entity.
+
+    Raises:
+      MissingPropertyError: if an expected field on the entity is missing.
+    """
+    encoding = []
+    for name, fn, default in self.__properties:
+      try:
+        encoding.append(fn(getattr(entity, name)))
+      except AttributeError:
+        if default is None:
+          raise MissingPropertyError(name)
+        else:
+          encoding.append(default)
+    return encoding
+
+  def __EncodeEntity(self, entity):
+    """Convert the given entity into CSV string.
+
+    Args:
+      entity: The entity to encode.
+
+    Returns:
+      A CSV string.
+    """
+    output = StringIO.StringIO()
+    writer = csv.writer(output, lineterminator='')
+    writer.writerow(self.__ExtractProperties(entity))
+    return output.getvalue()
+
+  def __SerializeEntity(self, entity):
+    """Creates a string representation of an entity.
+
+    Args:
+      entity: The entity to serialize.
+
+    Returns:
+      A serialized representation of an entity.
+    """
+    encoding = self.__EncodeEntity(entity)
+    if not isinstance(encoding, unicode):
+      encoding = unicode(encoding, 'utf-8')
+    encoding = encoding.encode('utf-8')
+    return encoding
+
+  def output_entities(self, entity_generator):
+    """Outputs the downloaded entities.
+
+    This implementation writes CSV.
+
+    Args:
+      entity_generator: A generator that yields the downloaded entities
+        in key order.
+    """
+    CheckOutputFile(self.output_filename)
+    output_file = open(self.output_filename, 'w')
+    logger.debug('Export complete, writing to file')
+    output_file.writelines(self.__SerializeEntity(entity) + '\n'
+                           for entity in entity_generator)
+
+  def initialize(self, filename, exporter_opts):
+    """Performs initialization and validation of the output file.
+
+    This implementation checks that the input file exists and can be
+    opened for writing.
+
+    Args:
+      filename: The string given as the --filename flag argument.
+      exporter_opts: The string given as the --exporter_opts flag argument.
+    """
+    CheckOutputFile(filename)
+    self.output_filename = filename
+
+  def finalize(self):
+    """Performs finalization actions after the download completes."""
+    pass
+
+  @staticmethod
+  def RegisteredExporters():
+    """Returns a dictionary of the exporter instances that have been created."""
+    return dict(Exporter.__exporters)
+
+  @staticmethod
+  def RegisteredExporter(kind):
+    """Returns an exporter instance for the given kind if it exists."""
+    return Exporter.__exporters[kind]
 
 
 class QueueJoinThread(threading.Thread):
@@ -2045,7 +3506,7 @@
     if not thread.isAlive():
       return True
     if thread_local.shut_down:
-      logging.debug('Queue join interrupted')
+      logger.debug('Queue join interrupted')
       return False
     for worker_thread in thread_gate.Threads():
       if not worker_thread.isAlive():
@@ -2060,7 +3521,7 @@
     work_queue: The work queue.
     thread_gate: A ThreadGate instance with workers registered.
   """
-  logging.info('An error occurred. Shutting down...')
+  logger.info('An error occurred. Shutting down...')
 
   data_source_thread.exit_flag = True
 
@@ -2072,8 +3533,8 @@
 
   data_source_thread.join(timeout=3.0)
   if data_source_thread.isAlive():
-    logging.warn('%s hung while trying to exit',
-                 data_source_thread.GetFriendlyName())
+    logger.warn('%s hung while trying to exit',
+                data_source_thread.GetFriendlyName())
 
   while not work_queue.empty():
     try:
@@ -2083,175 +3544,255 @@
       pass
 
 
-def PerformBulkUpload(app_id,
-                      post_url,
-                      kind,
-                      workitem_generator_factory,
-                      num_threads,
-                      throttle,
-                      progress_db,
-                      max_queue_size=DEFAULT_QUEUE_SIZE,
-                      request_manager_factory=RequestManager,
-                      bulkloaderthread_factory=BulkLoaderThread,
-                      progresstrackerthread_factory=ProgressTrackerThread,
-                      datasourcethread_factory=DataSourceThread,
-                      work_queue_factory=ReQueue,
-                      progress_queue_factory=Queue.Queue):
-  """Uploads data into an application using a series of HTTP POSTs.
-
-  This function will spin up a number of threads to read entities from
-  the data source, pass those to a number of worker ("uploader") threads
-  for sending to the application, and track all of the progress in a
-  small database in case an error or pause/termination requires a
-  restart/resumption of the upload process.
-
-  Args:
-    app_id: String containing application id.
-    post_url: URL to post the Entity data to.
-    kind: Kind of the Entity records being posted.
-    workitem_generator_factory: A factory that creates a WorkItem generator.
-    num_threads: How many uploader threads should be created.
-    throttle: A Throttle instance.
-    progress_db: The database to use for replaying/recording progress.
-    max_queue_size: Maximum size of the queues before they should block.
-    request_manager_factory: Used for dependency injection.
-    bulkloaderthread_factory: Used for dependency injection.
-    progresstrackerthread_factory: Used for dependency injection.
-    datasourcethread_factory: Used for dependency injection.
-    work_queue_factory: Used for dependency injection.
-    progress_queue_factory: Used for dependency injection.
-
-  Raises:
-    AuthenticationError: If authentication is required and fails.
-  """
-  thread_gate = ThreadGate(True)
-
-  (unused_scheme,
-   host_port, url_path,
-   unused_query, unused_fragment) = urlparse.urlsplit(post_url)
-
-  work_queue = work_queue_factory(max_queue_size)
-  progress_queue = progress_queue_factory(max_queue_size)
-  request_manager = request_manager_factory(app_id,
-                                            host_port,
-                                            url_path,
-                                            kind,
-                                            throttle)
-
-  throttle.Register(threading.currentThread())
-  try:
-    request_manager.Authenticate()
-  except Exception, e:
-    logging.exception(e)
-    raise AuthenticationError('Authentication failed')
-  if (request_manager.credentials is not None and
-      not request_manager.authenticated):
-    raise AuthenticationError('Authentication failed')
-
-  for unused_idx in range(num_threads):
-    thread = bulkloaderthread_factory(work_queue,
-                                      throttle,
-                                      thread_gate,
-                                      request_manager)
-    throttle.Register(thread)
-    thread_gate.Register(thread)
-
-  progress_thread = progresstrackerthread_factory(progress_queue, progress_db)
-
-  if progress_db.HasUnfinishedWork():
-    logging.debug('Restarting upload using progress database')
-    progress_generator_factory = progress_db.GetProgressStatusGenerator
-  else:
-    progress_generator_factory = None
-
-  data_source_thread = datasourcethread_factory(work_queue,
-                                                progress_queue,
-                                                workitem_generator_factory,
-                                                progress_generator_factory)
-
-  thread_local = threading.local()
-  thread_local.shut_down = False
-
-  def Interrupt(unused_signum, unused_frame):
-    """Shutdown gracefully in response to a signal."""
-    thread_local.shut_down = True
-
-  signal.signal(signal.SIGINT, Interrupt)
-
-  progress_thread.start()
-  data_source_thread.start()
-  for thread in thread_gate.Threads():
-    thread.start()
-
-
-  while not thread_local.shut_down:
-    data_source_thread.join(timeout=0.25)
-
-    if data_source_thread.isAlive():
-      for thread in list(thread_gate.Threads()) + [progress_thread]:
-        if not thread.isAlive():
-          logging.info('Unexpected thread death: %s', thread.getName())
-          thread_local.shut_down = True
-          break
+class BulkTransporterApp(object):
+  """Class to wrap bulk transport application functionality."""
+
+  def __init__(self,
+               arg_dict,
+               input_generator_factory,
+               throttle,
+               progress_db,
+               workerthread_factory,
+               progresstrackerthread_factory,
+               max_queue_size=DEFAULT_QUEUE_SIZE,
+               request_manager_factory=RequestManager,
+               datasourcethread_factory=DataSourceThread,
+               work_queue_factory=ReQueue,
+               progress_queue_factory=Queue.Queue):
+    """Instantiate a BulkTransporterApp.
+
+    Uploads or downloads data to or from application using HTTP requests.
+    When run, the class will spin up a number of threads to read entities
+    from the data source, pass those to a number of worker threads
+    for sending to the application, and track all of the progress in a
+    small database in case an error or pause/termination requires a
+    restart/resumption of the upload process.
+
+    Args:
+      arg_dict: Dictionary of command line options.
+      input_generator_factory: A factory that creates a WorkItem generator.
+      throttle: A Throttle instance.
+      progress_db: The database to use for replaying/recording progress.
+      workerthread_factory: A factory for worker threads.
+      progresstrackerthread_factory: Used for dependency injection.
+      max_queue_size: Maximum size of the queues before they should block.
+      request_manager_factory: Used for dependency injection.
+      datasourcethread_factory: Used for dependency injection.
+      work_queue_factory: Used for dependency injection.
+      progress_queue_factory: Used for dependency injection.
+    """
+    self.app_id = arg_dict['app_id']
+    self.post_url = arg_dict['url']
+    self.kind = arg_dict['kind']
+    self.batch_size = arg_dict['batch_size']
+    self.input_generator_factory = input_generator_factory
+    self.num_threads = arg_dict['num_threads']
+    self.email = arg_dict['email']
+    self.passin = arg_dict['passin']
+    self.throttle = throttle
+    self.progress_db = progress_db
+    self.workerthread_factory = workerthread_factory
+    self.progresstrackerthread_factory = progresstrackerthread_factory
+    self.max_queue_size = max_queue_size
+    self.request_manager_factory = request_manager_factory
+    self.datasourcethread_factory = datasourcethread_factory
+    self.work_queue_factory = work_queue_factory
+    self.progress_queue_factory = progress_queue_factory
+    (scheme,
+     self.host_port, self.url_path,
+     unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
+    self.secure = (scheme == 'https')
+
+  def Run(self):
+    """Perform the work of the BulkTransporterApp.
+
+    Raises:
+      AuthenticationError: If authentication is required and fails.
+
+    Returns:
+      Error code suitable for sys.exit, e.g. 0 on success, 1 on failure.
+    """
+    thread_gate = ThreadGate(True)
+
+    self.throttle.Register(threading.currentThread())
+    threading.currentThread().exit_flag = False
+
+    work_queue = self.work_queue_factory(self.max_queue_size)
+
+    progress_queue = self.progress_queue_factory(self.max_queue_size)
+    request_manager = self.request_manager_factory(self.app_id,
+                                                   self.host_port,
+                                                   self.url_path,
+                                                   self.kind,
+                                                   self.throttle,
+                                                   self.batch_size,
+                                                   self.secure,
+                                                   self.email,
+                                                   self.passin)
+    try:
+      request_manager.Authenticate()
+    except Exception, e:
+      if not isinstance(e, urllib2.HTTPError) or (
+          e.code != 302 and e.code != 401):
+        logger.exception('Exception during authentication')
+      raise AuthenticationError()
+    if (request_manager.auth_called and
+        not request_manager.authenticated):
+      raise AuthenticationError('Authentication failed')
+
+    for unused_idx in xrange(self.num_threads):
+      thread = self.workerthread_factory(work_queue,
+                                         self.throttle,
+                                         thread_gate,
+                                         request_manager,
+                                         self.num_threads,
+                                         self.batch_size)
+      self.throttle.Register(thread)
+      thread_gate.Register(thread)
+
+    self.progress_thread = self.progresstrackerthread_factory(
+        progress_queue, self.progress_db)
+
+    if self.progress_db.UseProgressData():
+      logger.debug('Restarting upload using progress database')
+      progress_generator_factory = self.progress_db.GetProgressStatusGenerator
     else:
-      break
-
-  if thread_local.shut_down:
-    ShutdownThreads(data_source_thread, work_queue, thread_gate)
-
-  def _Join(ob, msg):
-    logging.debug('Waiting for %s...', msg)
-    if isinstance(ob, threading.Thread):
-      ob.join(timeout=3.0)
-      if ob.isAlive():
-        logging.debug('Joining %s failed', ob.GetFriendlyName())
+      progress_generator_factory = None
+
+    self.data_source_thread = (
+        self.datasourcethread_factory(work_queue,
+                                      progress_queue,
+                                      self.input_generator_factory,
+                                      progress_generator_factory))
+
+    thread_local = threading.local()
+    thread_local.shut_down = False
+
+    def Interrupt(unused_signum, unused_frame):
+      """Shutdown gracefully in response to a signal."""
+      thread_local.shut_down = True
+
+    signal.signal(signal.SIGINT, Interrupt)
+
+    self.progress_thread.start()
+    self.data_source_thread.start()
+    for thread in thread_gate.Threads():
+      thread.start()
+
+
+    while not thread_local.shut_down:
+      self.data_source_thread.join(timeout=0.25)
+
+      if self.data_source_thread.isAlive():
+        for thread in list(thread_gate.Threads()) + [self.progress_thread]:
+          if not thread.isAlive():
+            logger.info('Unexpected thread death: %s', thread.getName())
+            thread_local.shut_down = True
+            break
       else:
-        logging.debug('... done.')
-    elif isinstance(ob, (Queue.Queue, ReQueue)):
-      if not InterruptibleQueueJoin(ob, thread_local, thread_gate):
-        ShutdownThreads(data_source_thread, work_queue, thread_gate)
+        break
+
+    if thread_local.shut_down:
+      ShutdownThreads(self.data_source_thread, work_queue, thread_gate)
+
+    def _Join(ob, msg):
+      logger.debug('Waiting for %s...', msg)
+      if isinstance(ob, threading.Thread):
+        ob.join(timeout=3.0)
+        if ob.isAlive():
+          logger.debug('Joining %s failed', ob.GetFriendlyName())
+        else:
+          logger.debug('... done.')
+      elif isinstance(ob, (Queue.Queue, ReQueue)):
+        if not InterruptibleQueueJoin(ob, thread_local, thread_gate):
+          ShutdownThreads(self.data_source_thread, work_queue, thread_gate)
+      else:
+        ob.join()
+        logger.debug('... done.')
+
+    _Join(work_queue, 'work_queue to flush')
+
+    for unused_thread in thread_gate.Threads():
+      work_queue.put(_THREAD_SHOULD_EXIT)
+
+    for unused_thread in thread_gate.Threads():
+      thread_gate.EnableThread()
+
+    for thread in thread_gate.Threads():
+      _Join(thread, 'thread [%s] to terminate' % thread.getName())
+
+      thread.CheckError()
+
+    if self.progress_thread.isAlive():
+      _Join(progress_queue, 'progress_queue to finish')
     else:
-      ob.join()
-      logging.debug('... done.')
-
-  _Join(work_queue, 'work_queue to flush')
-
-  for unused_thread in thread_gate.Threads():
-    work_queue.put(_THREAD_SHOULD_EXIT)
-
-  for unused_thread in thread_gate.Threads():
-    thread_gate.EnableThread()
-
-  for thread in thread_gate.Threads():
-    _Join(thread, 'thread [%s] to terminate' % thread.getName())
-
-    thread.CheckError()
-
-  if progress_thread.isAlive():
-    _Join(progress_queue, 'progress_queue to finish')
-  else:
-    logging.warn('Progress thread exited prematurely')
-
-  progress_queue.put(_THREAD_SHOULD_EXIT)
-  _Join(progress_thread, 'progress_thread to terminate')
-  progress_thread.CheckError()
-
-  data_source_thread.CheckError()
-
-  total_up, duration = throttle.TotalTransferred(BANDWIDTH_UP)
-  s_total_up, unused_duration = throttle.TotalTransferred(HTTPS_BANDWIDTH_UP)
-  total_up += s_total_up
-  logging.info('%d entites read, %d previously transferred',
-               data_source_thread.read_count,
-               data_source_thread.sent_count)
-  logging.info('%d entities (%d bytes) transferred in %.1f seconds',
-               progress_thread.entities_sent, total_up, duration)
-  if (data_source_thread.read_all and
-      progress_thread.entities_sent + data_source_thread.sent_count >=
-      data_source_thread.read_count):
-    logging.info('All entities successfully uploaded')
-  else:
-    logging.info('Some entities not successfully uploaded')
+      logger.warn('Progress thread exited prematurely')
+
+    progress_queue.put(_THREAD_SHOULD_EXIT)
+    _Join(self.progress_thread, 'progress_thread to terminate')
+    self.progress_thread.CheckError()
+    if not thread_local.shut_down:
+      self.progress_thread.WorkFinished()
+
+    self.data_source_thread.CheckError()
+
+    return self.ReportStatus()
+
+  def ReportStatus(self):
+    """Display a message reporting the final status of the transfer."""
+    raise NotImplementedError()
+
+
+class BulkUploaderApp(BulkTransporterApp):
+  """Class to encapsulate bulk uploader functionality."""
+
+  def __init__(self, *args, **kwargs):
+    BulkTransporterApp.__init__(self, *args, **kwargs)
+
+  def ReportStatus(self):
+    """Display a message reporting the final status of the transfer."""
+    total_up, duration = self.throttle.TotalTransferred(BANDWIDTH_UP)
+    s_total_up, unused_duration = self.throttle.TotalTransferred(
+        HTTPS_BANDWIDTH_UP)
+    total_up += s_total_up
+    total = total_up
+    logger.info('%d entites total, %d previously transferred',
+                self.data_source_thread.read_count,
+                self.data_source_thread.xfer_count)
+    transfer_count = self.progress_thread.EntitiesTransferred()
+    logger.info('%d entities (%d bytes) transferred in %.1f seconds',
+                transfer_count, total, duration)
+    if (self.data_source_thread.read_all and
+        transfer_count +
+        self.data_source_thread.xfer_count >=
+        self.data_source_thread.read_count):
+      logger.info('All entities successfully transferred')
+      return 0
+    else:
+      logger.info('Some entities not successfully transferred')
+      return 1
+
+
+class BulkDownloaderApp(BulkTransporterApp):
+  """Class to encapsulate bulk downloader functionality."""
+
+  def __init__(self, *args, **kwargs):
+    BulkTransporterApp.__init__(self, *args, **kwargs)
+
+  def ReportStatus(self):
+    """Display a message reporting the final status of the transfer."""
+    total_down, duration = self.throttle.TotalTransferred(BANDWIDTH_DOWN)
+    s_total_down, unused_duration = self.throttle.TotalTransferred(
+        HTTPS_BANDWIDTH_DOWN)
+    total_down += s_total_down
+    total = total_down
+    existing_count = self.progress_thread.existing_count
+    xfer_count = self.progress_thread.EntitiesTransferred()
+    logger.info('Have %d entities, %d previously transferred',
+                xfer_count + existing_count, existing_count)
+    logger.info('%d entities (%d bytes) transferred in %.1f seconds',
+                xfer_count, total, duration)
+    return 0
 
 
 def PrintUsageExit(code):
@@ -2266,6 +3807,35 @@
   sys.exit(code)
 
 
+REQUIRED_OPTION = object()
+
+
+FLAG_SPEC = ['debug',
+             'help',
+             'url=',
+             'filename=',
+             'batch_size=',
+             'kind=',
+             'num_threads=',
+             'bandwidth_limit=',
+             'rps_limit=',
+             'http_limit=',
+             'db_filename=',
+             'app_id=',
+             'config_file=',
+             'has_header',
+             'csv_has_header',
+             'auth_domain=',
+             'result_db_filename=',
+             'download',
+             'loader_opts=',
+             'exporter_opts=',
+             'log_file=',
+             'email=',
+             'passin',
+             ]
+
+
 def ParseArguments(argv):
   """Parses command-line arguments.
 
@@ -2275,87 +3845,98 @@
     argv: List of command-line arguments.
 
   Returns:
-    Tuple (url, filename, cookie, batch_size, kind) containing the values from
-    each corresponding command-line flag.
+    A dictionary containing the value of command-line options.
   """
   opts, unused_args = getopt.getopt(
       argv[1:],
       'h',
-      ['debug',
-       'help',
-       'url=',
-       'filename=',
-       'batch_size=',
-       'kind=',
-       'num_threads=',
-       'bandwidth_limit=',
-       'rps_limit=',
-       'http_limit=',
-       'db_filename=',
-       'app_id=',
-       'config_file=',
-       'auth_domain=',
-      ])
-
-  url = None
-  filename = None
-  batch_size = DEFAULT_BATCH_SIZE
-  kind = None
-  num_threads = DEFAULT_THREAD_COUNT
-  bandwidth_limit = DEFAULT_BANDWIDTH_LIMIT
-  rps_limit = DEFAULT_RPS_LIMIT
-  http_limit = DEFAULT_REQUEST_LIMIT
-  db_filename = None
-  app_id = None
-  config_file = None
-  auth_domain = 'gmail.com'
+      FLAG_SPEC)
+
+  arg_dict = {}
+
+  arg_dict['url'] = REQUIRED_OPTION
+  arg_dict['filename'] = REQUIRED_OPTION
+  arg_dict['config_file'] = REQUIRED_OPTION
+  arg_dict['kind'] = REQUIRED_OPTION
+
+  arg_dict['batch_size'] = DEFAULT_BATCH_SIZE
+  arg_dict['num_threads'] = DEFAULT_THREAD_COUNT
+  arg_dict['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT
+  arg_dict['rps_limit'] = DEFAULT_RPS_LIMIT
+  arg_dict['http_limit'] = DEFAULT_REQUEST_LIMIT
+
+  arg_dict['db_filename'] = None
+  arg_dict['app_id'] = ''
+  arg_dict['auth_domain'] = 'gmail.com'
+  arg_dict['has_header'] = False
+  arg_dict['result_db_filename'] = None
+  arg_dict['download'] = False
+  arg_dict['loader_opts'] = None
+  arg_dict['exporter_opts'] = None
+  arg_dict['debug'] = False
+  arg_dict['log_file'] = None
+  arg_dict['email'] = None
+  arg_dict['passin'] = False
+
+  def ExpandFilename(filename):
+    """Expand shell variables and ~usernames in filename."""
+    return os.path.expandvars(os.path.expanduser(filename))
 
   for option, value in opts:
     if option == '--debug':
-      logging.getLogger().setLevel(logging.DEBUG)
+      arg_dict['debug'] = True
     elif option in ('-h', '--help'):
       PrintUsageExit(0)
     elif option == '--url':
-      url = value
+      arg_dict['url'] = value
     elif option == '--filename':
-      filename = value
+      arg_dict['filename'] = ExpandFilename(value)
     elif option == '--batch_size':
-      batch_size = int(value)
+      arg_dict['batch_size'] = int(value)
     elif option == '--kind':
-      kind = value
+      arg_dict['kind'] = value
     elif option == '--num_threads':
-      num_threads = int(value)
+      arg_dict['num_threads'] = int(value)
     elif option == '--bandwidth_limit':
-      bandwidth_limit = int(value)
+      arg_dict['bandwidth_limit'] = int(value)
     elif option == '--rps_limit':
-      rps_limit = int(value)
+      arg_dict['rps_limit'] = int(value)
     elif option == '--http_limit':
-      http_limit = int(value)
+      arg_dict['http_limit'] = int(value)
     elif option == '--db_filename':
-      db_filename = value
+      arg_dict['db_filename'] = ExpandFilename(value)
     elif option == '--app_id':
-      app_id = value
+      arg_dict['app_id'] = value
     elif option == '--config_file':
-      config_file = value
+      arg_dict['config_file'] = ExpandFilename(value)
     elif option == '--auth_domain':
-      auth_domain = value
-
-  return ProcessArguments(app_id=app_id,
-                          url=url,
-                          filename=filename,
-                          batch_size=batch_size,
-                          kind=kind,
-                          num_threads=num_threads,
-                          bandwidth_limit=bandwidth_limit,
-                          rps_limit=rps_limit,
-                          http_limit=http_limit,
-                          db_filename=db_filename,
-                          config_file=config_file,
-                          auth_domain=auth_domain,
-                          die_fn=lambda: PrintUsageExit(1))
+      arg_dict['auth_domain'] = value
+    elif option == '--has_header':
+      arg_dict['has_header'] = True
+    elif option == '--csv_has_header':
+      print >>sys.stderr, ('--csv_has_header is deprecated, please use '
+                           '--has_header.')
+      arg_dict['has_header'] = True
+    elif option == '--result_db_filename':
+      arg_dict['result_db_filename'] = ExpandFilename(value)
+    elif option == '--download':
+      arg_dict['download'] = True
+    elif option == '--loader_opts':
+      arg_dict['loader_opts'] = value
+    elif option == '--exporter_opts':
+      arg_dict['exporter_opts'] = value
+    elif option == '--log_file':
+      arg_dict['log_file'] = value
+    elif option == '--email':
+      arg_dict['email'] = value
+    elif option == '--passin':
+      arg_dict['passin'] = True
+
+  return ProcessArguments(arg_dict, die_fn=lambda: PrintUsageExit(1))
 
 
 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
+  """Return a dictionary indicating the throttle options."""
   return {
       BANDWIDTH_UP: bandwidth_limit,
       BANDWIDTH_DOWN: bandwidth_limit,
@@ -2367,221 +3948,414 @@
   }
 
 
-def LoadConfig(config_file):
-  """Loads a config file and registers any Loader classes present."""
-  if config_file:
-    global_dict = dict(globals())
-    execfile(config_file, global_dict)
-    for cls in Loader.__subclasses__():
-      Loader.RegisterLoader(cls())
-
-
-def _MissingArgument(arg_name, die_fn):
-  """Print error message about missing argument and die."""
-  print >>sys.stderr, '%s argument required' % arg_name
-  die_fn()
-
-
-def ProcessArguments(app_id=None,
-                     url=None,
-                     filename=None,
-                     batch_size=DEFAULT_BATCH_SIZE,
-                     kind=None,
-                     num_threads=DEFAULT_THREAD_COUNT,
-                     bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
-                     rps_limit=DEFAULT_RPS_LIMIT,
-                     http_limit=DEFAULT_REQUEST_LIMIT,
-                     db_filename=None,
-                     config_file=None,
-                     auth_domain='gmail.com',
+def CheckOutputFile(filename):
+  """Check that the given file does not exist and can be opened for writing.
+
+  Args:
+    filename: The name of the file.
+
+  Raises:
+    FileExistsError: if the given filename is not found
+    FileNotWritableError: if the given filename is not readable.
+  """
+  if os.path.exists(filename):
+    raise FileExistsError('%s: output file exists' % filename)
+  elif not os.access(os.path.dirname(filename), os.W_OK):
+    raise FileNotWritableError(
+        '%s: not writable' % os.path.dirname(filename))
+
+
+def LoadConfig(config_file_name, exit_fn=sys.exit):
+  """Loads a config file and registers any Loader classes present.
+
+  Args:
+    config_file_name: The name of the configuration file.
+    exit_fn: Used for dependency injection.
+  """
+  if config_file_name:
+    config_file = open(config_file_name, 'r')
+    try:
+      bulkloader_config = imp.load_module(
+          'bulkloader_config', config_file, config_file_name,
+          ('', 'r', imp.PY_SOURCE))
+      sys.modules['bulkloader_config'] = bulkloader_config
+
+      if hasattr(bulkloader_config, 'loaders'):
+        for cls in bulkloader_config.loaders:
+          Loader.RegisterLoader(cls())
+
+      if hasattr(bulkloader_config, 'exporters'):
+        for cls in bulkloader_config.exporters:
+          Exporter.RegisterExporter(cls())
+    except NameError, e:
+      m = re.search(r"[^']*'([^']*)'.*", str(e))
+      if m.groups() and m.group(1) == 'Loader':
+        print >>sys.stderr, """
+The config file format has changed and you appear to be using an old-style
+config file.  Please make the following changes:
+
+1. At the top of the file, add this:
+
+from google.appengine.tools import bulkloader.Loader
+
+2. For each of your Loader subclasses add the following at the end of the
+   __init__ definitioion:
+
+self.alias_old_names()
+
+3. At the bottom of the file, add this:
+
+loaders = [MyLoader1,...,MyLoaderN]
+
+Where MyLoader1,...,MyLoaderN are the Loader subclasses you want the bulkloader
+to have access to.
+"""
+        exit_fn(1)
+      else:
+        raise
+    except Exception, e:
+      if isinstance(e, NameClashError) or 'bulkloader_config' in vars() and (
+          hasattr(bulkloader_config, 'bulkloader') and
+          isinstance(e, bulkloader_config.bulkloader.NameClashError)):
+        print >> sys.stderr, (
+            'Found both %s and %s while aliasing old names on %s.'%
+            (e.old_name, e.new_name, e.klass))
+        exit_fn(1)
+      else:
+        raise
+
+def GetArgument(kwargs, name, die_fn):
+  """Get the value of the key name in kwargs, or die with die_fn.
+
+  Args:
+    kwargs: A dictionary containing the options for the bulkloader.
+    name: The name of a bulkloader option.
+    die_fn: The function to call to exit the program.
+
+  Returns:
+    The value of kwargs[name] is name in kwargs
+  """
+  if name in kwargs:
+    return kwargs[name]
+  else:
+    print >>sys.stderr, '%s argument required' % name
+    die_fn()
+
+
+def _MakeSignature(app_id=None,
+                   url=None,
+                   kind=None,
+                   db_filename=None,
+                   download=None,
+                   has_header=None,
+                   result_db_filename=None):
+  """Returns a string that identifies the important options for the database."""
+  if download:
+    result_db_line = 'result_db: %s' % result_db_filename
+  else:
+    result_db_line = ''
+  return u"""
+  app_id: %s
+  url: %s
+  kind: %s
+  download: %s
+  progress_db: %s
+  has_header: %s
+  %s
+  """ % (app_id, url, kind, download, db_filename, has_header, result_db_line)
+
+
+def ProcessArguments(arg_dict,
                      die_fn=lambda: sys.exit(1)):
-  """Processes non command-line input arguments."""
+  """Processes non command-line input arguments.
+
+  Args:
+    arg_dict: Dictionary containing the values of bulkloader options.
+    die_fn: Function to call in case of an error during argument processing.
+
+  Returns:
+    A dictionary of bulkloader options.
+  """
+  app_id = GetArgument(arg_dict, 'app_id', die_fn)
+  url = GetArgument(arg_dict, 'url', die_fn)
+  filename = GetArgument(arg_dict, 'filename', die_fn)
+  batch_size = GetArgument(arg_dict, 'batch_size', die_fn)
+  kind = GetArgument(arg_dict, 'kind', die_fn)
+  db_filename = GetArgument(arg_dict, 'db_filename', die_fn)
+  config_file = GetArgument(arg_dict, 'config_file', die_fn)
+  result_db_filename = GetArgument(arg_dict, 'result_db_filename', die_fn)
+  download = GetArgument(arg_dict, 'download', die_fn)
+  log_file = GetArgument(arg_dict, 'log_file', die_fn)
+
+  unused_passin = GetArgument(arg_dict, 'passin', die_fn)
+  unused_email = GetArgument(arg_dict, 'email', die_fn)
+  unused_debug = GetArgument(arg_dict, 'debug', die_fn)
+  unused_num_threads = GetArgument(arg_dict, 'num_threads', die_fn)
+  unused_bandwidth_limit = GetArgument(arg_dict, 'bandwidth_limit', die_fn)
+  unused_rps_limit = GetArgument(arg_dict, 'rps_limit', die_fn)
+  unused_http_limit = GetArgument(arg_dict, 'http_limit', die_fn)
+  unused_auth_domain = GetArgument(arg_dict, 'auth_domain', die_fn)
+  unused_has_headers = GetArgument(arg_dict, 'has_header', die_fn)
+  unused_loader_opts = GetArgument(arg_dict, 'loader_opts', die_fn)
+  unused_exporter_opts = GetArgument(arg_dict, 'exporter_opts', die_fn)
+
+  errors = []
+
   if db_filename is None:
-    db_filename = time.strftime('bulkloader-progress-%Y%m%d.%H%M%S.sql3')
+    arg_dict['db_filename'] = time.strftime(
+        'bulkloader-progress-%Y%m%d.%H%M%S.sql3')
+
+  if result_db_filename is None:
+    arg_dict['result_db_filename'] = time.strftime(
+        'bulkloader-results-%Y%m%d.%H%M%S.sql3')
+
+  if log_file is None:
+    arg_dict['log_file'] = time.strftime('bulkloader-log-%Y%m%d.%H%M%S')
 
   if batch_size <= 0:
-    print >>sys.stderr, 'batch_size must be 1 or larger'
-    die_fn()
-
-  if url is None:
-    _MissingArgument('url', die_fn)
-
-  if filename is None:
-    _MissingArgument('filename', die_fn)
-
-  if kind is None:
-    _MissingArgument('kind', die_fn)
-
-  if config_file is None:
-    _MissingArgument('config_file', die_fn)
-
-  if app_id is None:
+    errors.append('batch_size must be at least 1')
+
+  required = '%s argument required'
+
+  if url is REQUIRED_OPTION:
+    errors.append(required % 'url')
+
+  if filename is REQUIRED_OPTION:
+    errors.append(required % 'filename')
+
+  if kind is REQUIRED_OPTION:
+    errors.append(required % 'kind')
+
+  if config_file is REQUIRED_OPTION:
+    errors.append(required % 'config_file')
+
+  if download:
+    if result_db_filename is REQUIRED_OPTION:
+      errors.append(required % 'result_db_filename')
+
+  if not app_id:
     (unused_scheme, host_port, unused_url_path,
      unused_query, unused_fragment) = urlparse.urlsplit(url)
     suffix_idx = host_port.find('.appspot.com')
     if suffix_idx > -1:
-      app_id = host_port[:suffix_idx]
+      arg_dict['app_id'] = host_port[:suffix_idx]
     elif host_port.split(':')[0].endswith('google.com'):
-      app_id = host_port.split('.')[0]
+      arg_dict['app_id'] = host_port.split('.')[0]
     else:
-      print >>sys.stderr, 'app_id required for non appspot.com domains'
-      die_fn()
-
-  return (app_id, url, filename, batch_size, kind, num_threads,
-          bandwidth_limit, rps_limit, http_limit, db_filename, config_file,
-          auth_domain)
-
-
-def _PerformBulkload(app_id=None,
-                     url=None,
-                     filename=None,
-                     batch_size=DEFAULT_BATCH_SIZE,
-                     kind=None,
-                     num_threads=DEFAULT_THREAD_COUNT,
-                     bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
-                     rps_limit=DEFAULT_RPS_LIMIT,
-                     http_limit=DEFAULT_REQUEST_LIMIT,
-                     db_filename=None,
-                     config_file=None,
-                     auth_domain='gmail.com'):
-  """Runs the bulkloader, given the options as keyword arguments.
+      errors.append('app_id argument required for non appspot.com domains')
+
+  if errors:
+    print >>sys.stderr, '\n'.join(errors)
+    die_fn()
+
+  return arg_dict
+
+
+def ParseKind(kind):
+  if kind and kind[0] == '(' and kind[-1] == ')':
+    return tuple(kind[1:-1].split(','))
+  else:
+    return kind
+
+
+def _PerformBulkload(arg_dict,
+                     check_file=CheckFile,
+                     check_output_file=CheckOutputFile):
+  """Runs the bulkloader, given the command line options.
 
   Args:
-    app_id: The application id.
-    url: The url of the remote_api endpoint.
-    filename: The name of the file containing the CSV data.
-    batch_size: The number of records to send per request.
-    kind: The kind of entity to transfer.
-    num_threads: The number of threads to use to transfer data.
-    bandwidth_limit: Maximum bytes/second to transfers.
-    rps_limit: Maximum records/second to transfer.
-    http_limit: Maximum requests/second for transfers.
-    db_filename: The name of the SQLite3 progress database file.
-    config_file: The name of the configuration file.
-    auth_domain: The auth domain to use for logins and UserProperty.
+    arg_dict: Dictionary of bulkloader options.
+    check_file: Used for dependency injection.
+    check_output_file: Used for dependency injection.
 
   Returns:
     An exit code.
+
+  Raises:
+    ConfigurationError: if inconsistent options are passed.
   """
+  app_id = arg_dict['app_id']
+  url = arg_dict['url']
+  filename = arg_dict['filename']
+  batch_size = arg_dict['batch_size']
+  kind = arg_dict['kind']
+  num_threads = arg_dict['num_threads']
+  bandwidth_limit = arg_dict['bandwidth_limit']
+  rps_limit = arg_dict['rps_limit']
+  http_limit = arg_dict['http_limit']
+  db_filename = arg_dict['db_filename']
+  config_file = arg_dict['config_file']
+  auth_domain = arg_dict['auth_domain']
+  has_header = arg_dict['has_header']
+  download = arg_dict['download']
+  result_db_filename = arg_dict['result_db_filename']
+  loader_opts = arg_dict['loader_opts']
+  exporter_opts = arg_dict['exporter_opts']
+  email = arg_dict['email']
+  passin = arg_dict['passin']
+
   os.environ['AUTH_DOMAIN'] = auth_domain
+
+  kind = ParseKind(kind)
+
+  check_file(config_file)
+  if not download:
+    check_file(filename)
+  else:
+    check_output_file(filename)
+
   LoadConfig(config_file)
 
+  os.environ['APPLICATION_ID'] = app_id
+
   throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
 
   throttle = Throttle(layout=throttle_layout)
-
-
-  workitem_generator_factory = GetCSVGeneratorFactory(filename, batch_size)
+  signature = _MakeSignature(app_id=app_id,
+                             url=url,
+                             kind=kind,
+                             db_filename=db_filename,
+                             download=download,
+                             has_header=has_header,
+                             result_db_filename=result_db_filename)
+
+
+  max_queue_size = max(DEFAULT_QUEUE_SIZE, 3 * num_threads + 5)
 
   if db_filename == 'skip':
     progress_db = StubProgressDatabase()
+  elif not download:
+    progress_db = ProgressDatabase(db_filename, signature)
   else:
-    progress_db = ProgressDatabase(db_filename)
-
-
-  max_queue_size = max(DEFAULT_QUEUE_SIZE, 2 * num_threads + 5)
-
-  PerformBulkUpload(app_id,
-                    url,
-                    kind,
-                    workitem_generator_factory,
-                    num_threads,
-                    throttle,
-                    progress_db,
-                    max_queue_size=max_queue_size)
-
-  return 0
-
-
-def Run(app_id=None,
-        url=None,
-        filename=None,
-        batch_size=DEFAULT_BATCH_SIZE,
-        kind=None,
-        num_threads=DEFAULT_THREAD_COUNT,
-        bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT,
-        rps_limit=DEFAULT_RPS_LIMIT,
-        http_limit=DEFAULT_REQUEST_LIMIT,
-        db_filename=None,
-        auth_domain='gmail.com',
-        config_file=None):
+    progress_db = ExportProgressDatabase(db_filename, signature)
+
+  if download:
+    result_db = ResultDatabase(result_db_filename, signature)
+
+  return_code = 1
+
+  if not download:
+    loader = Loader.RegisteredLoader(kind)
+    try:
+      loader.initialize(filename, loader_opts)
+      workitem_generator_factory = GetCSVGeneratorFactory(
+          kind, filename, batch_size, has_header)
+
+      app = BulkUploaderApp(arg_dict,
+                            workitem_generator_factory,
+                            throttle,
+                            progress_db,
+                            BulkLoaderThread,
+                            ProgressTrackerThread,
+                            max_queue_size,
+                            RequestManager,
+                            DataSourceThread,
+                            ReQueue,
+                            Queue.Queue)
+      try:
+        return_code = app.Run()
+      except AuthenticationError:
+        logger.info('Authentication Failed')
+    finally:
+      loader.finalize()
+  else:
+    exporter = Exporter.RegisteredExporter(kind)
+    try:
+      exporter.initialize(filename, exporter_opts)
+
+      def KeyRangeGeneratorFactory(progress_queue, progress_gen):
+        return KeyRangeGenerator(kind, progress_queue, progress_gen)
+
+      def ExportProgressThreadFactory(progress_queue, progress_db):
+        return ExportProgressThread(kind,
+                                    progress_queue,
+                                    progress_db,
+                                    result_db)
+      app = BulkDownloaderApp(arg_dict,
+                              KeyRangeGeneratorFactory,
+                              throttle,
+                              progress_db,
+                              BulkExporterThread,
+                              ExportProgressThreadFactory,
+                              0,
+                              RequestManager,
+                              DataSourceThread,
+                              ReQueue,
+                              Queue.Queue)
+      try:
+        return_code = app.Run()
+      except AuthenticationError:
+        logger.info('Authentication Failed')
+    finally:
+      exporter.finalize()
+  return return_code
+
+
+def SetupLogging(arg_dict):
+  """Sets up logging for the bulkloader.
+
+  Args:
+    arg_dict: Dictionary mapping flag names to their arguments.
+  """
+  format = '[%(levelname)-8s %(asctime)s %(filename)s] %(message)s'
+  debug = arg_dict['debug']
+  log_file = arg_dict['log_file']
+
+  logger.setLevel(logging.DEBUG)
+
+  logger.propagate = False
+
+  file_handler = logging.FileHandler(log_file, 'w')
+  file_handler.setLevel(logging.DEBUG)
+  file_formatter = logging.Formatter(format)
+  file_handler.setFormatter(file_formatter)
+  logger.addHandler(file_handler)
+
+  console = logging.StreamHandler()
+  level = logging.INFO
+  if debug:
+    level = logging.DEBUG
+  console.setLevel(level)
+  console_format = '[%(levelname)-8s] %(message)s'
+  formatter = logging.Formatter(console_format)
+  console.setFormatter(formatter)
+  logger.addHandler(console)
+
+  logger.info('Logging to %s', log_file)
+
+  appengine_rpc.logger.setLevel(logging.WARN)
+
+
+def Run(arg_dict):
   """Sets up and runs the bulkloader, given the options as keyword arguments.
 
   Args:
-    app_id: The application id.
-    url: The url of the remote_api endpoint.
-    filename: The name of the file containing the CSV data.
-    batch_size: The number of records to send per request.
-    kind: The kind of entity to transfer.
-    num_threads: The number of threads to use to transfer data.
-    bandwidth_limit: Maximum bytes/second to transfers.
-    rps_limit: Maximum records/second to transfer.
-    http_limit: Maximum requests/second for transfers.
-    db_filename: The name of the SQLite3 progress database file.
-    config_file: The name of the configuration file.
-    auth_domain: The auth domain to use for logins and UserProperty.
+    arg_dict: Dictionary of bulkloader options
 
   Returns:
     An exit code.
   """
-  logging.basicConfig(
-      format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')
-  args = ProcessArguments(app_id=app_id,
-                          url=url,
-                          filename=filename,
-                          batch_size=batch_size,
-                          kind=kind,
-                          num_threads=num_threads,
-                          bandwidth_limit=bandwidth_limit,
-                          rps_limit=rps_limit,
-                          http_limit=http_limit,
-                          db_filename=db_filename,
-                          config_file=config_file)
-
-  (app_id, url, filename, batch_size, kind, num_threads, bandwidth_limit,
-   rps_limit, http_limit, db_filename, config_file, auth_domain) = args
-
-  return _PerformBulkload(app_id=app_id,
-                          url=url,
-                          filename=filename,
-                          batch_size=batch_size,
-                          kind=kind,
-                          num_threads=num_threads,
-                          bandwidth_limit=bandwidth_limit,
-                          rps_limit=rps_limit,
-                          http_limit=http_limit,
-                          db_filename=db_filename,
-                          config_file=config_file,
-                          auth_domain=auth_domain)
+  arg_dict = ProcessArguments(arg_dict)
+
+  SetupLogging(arg_dict)
+
+  return _PerformBulkload(arg_dict)
 
 
 def main(argv):
   """Runs the importer from the command line."""
-  logging.basicConfig(
-      level=logging.INFO,
-      format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')
-
-  args = ParseArguments(argv)
-  if None in args:
-    print >>sys.stderr, 'Invalid arguments'
+
+  arg_dict = ParseArguments(argv)
+
+  errors = ['%s argument required' % key
+            for (key, value) in arg_dict.iteritems()
+            if value is REQUIRED_OPTION]
+  if errors:
+    print >>sys.stderr, '\n'.join(errors)
     PrintUsageExit(1)
 
-  (app_id, url, filename, batch_size, kind, num_threads,
-   bandwidth_limit, rps_limit, http_limit, db_filename, config_file,
-   auth_domain) = args
-
-  return _PerformBulkload(app_id=app_id,
-                          url=url,
-                          filename=filename,
-                          batch_size=batch_size,
-                          kind=kind,
-                          num_threads=num_threads,
-                          bandwidth_limit=bandwidth_limit,
-                          rps_limit=rps_limit,
-                          http_limit=http_limit,
-                          db_filename=db_filename,
-                          config_file=config_file,
-                          auth_domain=auth_domain)
+  SetupLogging(arg_dict)
+  return _PerformBulkload(arg_dict)
 
 
 if __name__ == '__main__':