thirdparty/google_appengine/google/appengine/tools/bulkload_client.py
author Todd Larsen <tlarsen@google.com>
Tue, 26 Aug 2008 21:49:54 +0000
changeset 109 620f9b141567
permissions -rwxr-xr-x
Load ../../google_appengine into trunk/thirdparty/google_appengine.

#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Imports CSV data over HTTP.

Usage:
  %s [flags]

    --debug             Show debugging information. (Optional)
    --cookie=<string>   Whole Cookie header to supply to the server, including
                        the parameter name (e.g., "ACSID=..."). (Optional)
    --url=<string>      URL endpoint to post to for importing data. (Required)
    --batch_size=<int>  Number of Entity objects to include in each post to
                        the URL endpoint. The more data per row/Entity, the
                        smaller the batch size should be. (Default 10)
    --filename=<path>   Path to the CSV file to import. (Required)
    --kind=<string>     Name of the Entity object kind to put in the datastore.
                        (Required)

The exit status will be 0 on success, non-zero on import failure.

Works with the bulkload mix-in library for google.appengine.ext.bulkload.
Please look there for documentation about how to setup the server side.
"""


import StringIO
import httplib
import logging
import csv
import getopt
import socket
import sys
import urllib
import urlparse

from google.appengine.ext.bulkload import constants



class Error(Exception):
  """Base-class for exceptions in this module."""


class PostError(Error):
  """An error has occured while trying to post data to the server."""


class BadServerStatusError(PostError):
  """The server has returned an error while importing data."""


def ContentGenerator(csv_file,
                     batch_size,
                     create_csv_reader=csv.reader,
                     create_csv_writer=csv.writer):
  """Retrieves CSV data up to a batch size at a time.

  Args:
    csv_file: A file-like object for reading CSV data.
    batch_size: Maximum number of CSV rows to yield on each iteration.
    create_csv_reader, create_csv_writer: Used for dependency injection.

  Yields:
    Tuple (entity_count, csv_content) where:
      entity_count: Number of entities contained in the csv_content. Will be
        less than or equal to the batch_size and greater than 0.
      csv_content: String containing the CSV content containing the next
        entity_count entities.
  """
  try:
    csv.field_size_limit(800000)
  except AttributeError:
    pass

  reader = create_csv_reader(csv_file, skipinitialspace=True)
  exhausted = False

  while not exhausted:
    rows_written = 0
    content = StringIO.StringIO()
    writer = create_csv_writer(content)
    try:
      for i in xrange(batch_size):
        row = reader.next()
        writer.writerow(row)
        rows_written += 1
    except StopIteration:
      exhausted = True

    if rows_written > 0:
      yield rows_written, content.getvalue()


def PostEntities(host_port, uri, cookie, kind, content):
  """Posts Entity records to a remote endpoint over HTTP.

  Args:
   host_port: String containing the "host:port" pair; the port is optional.
   uri: Relative URI to access on the remote host (e.g., '/bulkload').
   cookie: String containing the Cookie header to use, if any.
   kind: Kind of the Entity records being posted.
   content: String containing the CSV data for the entities.

  Raises:
    BadServerStatusError if the server was contactable but returns an error.
    PostError If an error occurred while connecting to the server or reading
    or writing data.
  """
  logging.debug('Connecting to %s', host_port)
  try:
    body = urllib.urlencode({
      constants.KIND_PARAM: kind,
      constants.CSV_PARAM: content,
    })
    headers = {
      'Content-Type': 'application/x-www-form-urlencoded',
      'Content-Length': len(body),
      'Cookie': cookie,
    }

    logging.debug('Posting %d bytes to http://%s%s', len(body), host_port, uri)
    connection = httplib.HTTPConnection(host_port)
    try:
      connection.request('POST', uri, body, headers)
      response = connection.getresponse()

      status = response.status
      reason = response.reason
      content = response.read()
      logging.debug('Received response code %d: %s', status, reason)
      if status != httplib.OK:
        raise BadServerStatusError('Received code %d: %s\n%s' % (
                                   status, reason, content))
    finally:
      connection.close()
  except (IOError, httplib.HTTPException, socket.error), e:
    logging.debug('Encountered exception accessing HTTP server: %s', e)
    raise PostError(e)


def SplitURL(url):
  """Splits an HTTP URL into pieces.

  Args:
    url: String containing a full URL string (e.g.,
      'http://blah.com:8080/stuff?param=1#foo')

  Returns:
    Tuple (netloc, uri) where:
      netloc: String containing the host/port combination from the URL. The
        port is optional. (e.g., 'blah.com:8080').
      uri: String containing the relative URI of the URL. (e.g., '/stuff').
  """
  scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
  return netloc, path


def ImportCSV(filename,
              post_url,
              cookie,
              batch_size,
              kind,
              split_url=SplitURL,
              openfile=file,
              create_content_generator=ContentGenerator,
              post_entities=PostEntities):
  """Imports CSV data using a series of HTTP posts.

  Args:
    filename: File on disk containing CSV data.
    post_url: URL to post the Entity data to.
    cookie: Full cookie header to use while connecting.
    batch_size: Maximum number of Entity objects to post with each request.
    kind: Entity kind of the objects being posted.
    split_url, openfile, create_content_generator, post_entities: Used for
      dependency injection.

  Returns:
    True if all entities were imported successfully; False otherwise.
  """
  host_port, uri = split_url(post_url)
  csv_file = openfile(filename, 'r')
  try:
    content_gen = create_content_generator(csv_file, batch_size)
    logging.info('Starting import; maximum %d entities per post', batch_size)
    for num_entities, content in content_gen:
      logging.info('Importing %d entities in %d bytes',
                   num_entities, len(content))
      try:
        content = post_entities(host_port, uri, cookie, kind, content)
      except PostError, e:
        logging.error('An error occurred while importing: %s', e)
        return False
  finally:
    csv_file.close()
  return True


def PrintUsageExit(code):
  """Prints usage information and exits with a status code.

  Args:
    code: Status code to pass to sys.exit() after displaying usage information.
  """
  print sys.modules['__main__'].__doc__ % sys.argv[0]
  sys.stdout.flush()
  sys.stderr.flush()
  sys.exit(code)


def ParseArguments(argv):
  """Parses command-line arguments.

  Prints out a help message if -h or --help is supplied.

  Args:
    argv: List of command-line arguments.

  Returns:
    Tuple (url, filename, cookie, batch_size, kind) containing the values from
    each corresponding command-line flag.
  """
  opts, args = getopt.getopt(
    argv[1:],
    'h',
    ['debug',
     'help',
     'url=',
     'filename=',
     'cookie=',
     'batch_size=',
     'kind='])

  url = None
  filename = None
  cookie = ''
  batch_size = 10
  kind = None
  encoding = None

  for option, value in opts:
    if option == '--debug':
      logging.getLogger().setLevel(logging.DEBUG)
    if option in ('-h', '--help'):
      PrintUsageExit(0)
    if option == '--url':
      url = value
    if option == '--filename':
      filename = value
    if option == '--cookie':
      cookie = value
    if option == '--batch_size':
      batch_size = int(value)
      if batch_size <= 0:
        print >>sys.stderr, 'batch_size must be 1 or larger'
        PrintUsageExit(1)
    if option == '--kind':
      kind = value

  return (url, filename, cookie, batch_size, kind)


def main(argv):
  """Runs the importer."""
  logging.basicConfig(
    level=logging.INFO,
    format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s')

  args = ParseArguments(argv)
  if [arg for arg in args if arg is None]:
    print >>sys.stderr, 'Invalid arguments'
    PrintUsageExit(1)

  url, filename, cookie, batch_size, kind = args
  if ImportCSV(filename, url, cookie, batch_size, kind):
    logging.info('Import succcessful')
    return 0
  logging.error('Import failed')
  return 1


if __name__ == '__main__':
  sys.exit(main(sys.argv))