thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 3031 7678f72140e6
parent 2864 2e0b0af889be
equal deleted inserted replaced
3030:09cae668b536 3031:7678f72140e6
   393     while self.line_number <= key_end:
   393     while self.line_number <= key_end:
   394       row = self.reader.next()
   394       row = self.reader.next()
   395       self.row_count += 1
   395       self.row_count += 1
   396       if self.column_count is None:
   396       if self.column_count is None:
   397         self.column_count = len(row)
   397         self.column_count = len(row)
   398       else:
       
   399         if self.column_count != len(row):
       
   400           raise ResumeError('Column count mismatch, %d: %s' %
       
   401                             (self.column_count, str(row)))
       
   402       self.read_rows.append((self.line_number, row))
   398       self.read_rows.append((self.line_number, row))
   403       self.line_number += 1
   399       self.line_number += 1
   404 
   400 
   405   def _MakeItem(self, key_start, key_end, rows, progress_key=None):
   401   def _MakeItem(self, key_start, key_end, rows, progress_key=None):
   406     """Makes a UploadWorkItem containing the given rows, with the given keys.
   402     """Makes a UploadWorkItem containing the given rows, with the given keys.
  1184       password = None
  1180       password = None
  1185 
  1181 
  1186     self.auth_called = True
  1182     self.auth_called = True
  1187     return (email, password)
  1183     return (email, password)
  1188 
  1184 
       
  1185   def IncrementId(self, ancestor_path, kind, high_id):
       
  1186     """Increment the unique id counter associated with ancestor_path and kind.
       
  1187 
       
  1188     Args:
       
  1189       ancestor_path: A list encoding the path of a key.
       
  1190       kind: The string name of a kind.
       
  1191       high_id: The int value to which to increment the unique id counter.
       
  1192     """
       
  1193     model_key = datastore.Key.from_path(*(ancestor_path + [kind, 1]))
       
  1194     start, end = datastore.AllocateIds(model_key, 1)
       
  1195     if end < high_id:
       
  1196       start, end = datastore.AllocateIds(model_key, high_id - end)
       
  1197     assert end >= high_id
       
  1198 
  1189   def EncodeContent(self, rows, loader=None):
  1199   def EncodeContent(self, rows, loader=None):
  1190     """Encodes row data to the wire format.
  1200     """Encodes row data to the wire format.
  1191 
  1201 
  1192     Args:
  1202     Args:
  1193       rows: A list of pairs of a line number and a list of column values.
  1203       rows: A list of pairs of a line number and a list of column values.
  2366     Args:
  2376     Args:
  2367       loader: A Loader instance.
  2377       loader: A Loader instance.
  2368     """
  2378     """
  2369     Loader.__loaders[loader.kind] = loader
  2379     Loader.__loaders[loader.kind] = loader
  2370 
  2380 
       
  2381   def get_high_ids(self):
       
  2382     """Returns dict {ancestor_path : {kind : id}} with high id values.
       
  2383 
       
  2384     The returned dictionary is used to increment the id counters
       
  2385     associated with each ancestor_path and kind to be at least id.
       
  2386     """
       
  2387     return {}
       
  2388 
  2371   def alias_old_names(self):
  2389   def alias_old_names(self):
  2372     """Aliases method names so that Loaders defined with old names work."""
  2390     """Aliases method names so that Loaders defined with old names work."""
  2373     aliases = (
  2391     aliases = (
  2374         ('CreateEntity', 'create_entity'),
  2392         ('CreateEntity', 'create_entity'),
  2375         ('HandleEntity', 'handle_entity'),
  2393         ('HandleEntity', 'handle_entity'),
  2544   def PerformWork(self):
  2562   def PerformWork(self):
  2545     db_conn = sqlite3.connect(self.filename)
  2563     db_conn = sqlite3.connect(self.filename)
  2546     cursor = db_conn.cursor()
  2564     cursor = db_conn.cursor()
  2547     cursor.execute('select id, value from result')
  2565     cursor.execute('select id, value from result')
  2548     for entity_id, value in cursor:
  2566     for entity_id, value in cursor:
  2549       self.queue.put([entity_id, value], block=True)
  2567       self.queue.put(value, block=True)
  2550     self.queue.put(RestoreThread._ENTITIES_DONE, block=True)
  2568     self.queue.put(RestoreThread._ENTITIES_DONE, block=True)
  2551 
  2569 
  2552 
  2570 
  2553 class RestoreLoader(Loader):
  2571 class RestoreLoader(Loader):
  2554   """A Loader which imports protobuffers from a file."""
  2572   """A Loader which imports protobuffers from a file."""
  2555 
  2573 
  2556   def __init__(self, kind):
  2574   def __init__(self, kind, app_id):
  2557     self.kind = kind
  2575     self.kind = kind
       
  2576     self.app_id = app_id
  2558 
  2577 
  2559   def initialize(self, filename, loader_opts):
  2578   def initialize(self, filename, loader_opts):
  2560     CheckFile(filename)
  2579     CheckFile(filename)
  2561     self.queue = Queue.Queue(1000)
  2580     self.queue = Queue.Queue(1000)
  2562     restore_thread = RestoreThread(self.queue, filename)
  2581     restore_thread = RestoreThread(self.queue, filename)
  2563     restore_thread.start()
  2582     restore_thread.start()
       
  2583     self.high_id_table = self._find_high_id(self.generate_records(filename))
       
  2584     restore_thread = RestoreThread(self.queue, filename)
       
  2585     restore_thread.start()
       
  2586 
       
  2587   def get_high_ids(self):
       
  2588     return dict(self.high_id_table)
       
  2589 
       
  2590   def _find_high_id(self, record_generator):
       
  2591     """Find the highest numeric id used for each ancestor-path, kind pair.
       
  2592 
       
  2593     Args:
       
  2594       record_generator: A generator of entity_encoding strings.
       
  2595 
       
  2596     Returns:
       
  2597       A map from ancestor-path to maps from kind to id. {path : {kind : id}}
       
  2598     """
       
  2599     high_id = {}
       
  2600     for values in record_generator:
       
  2601       entity = self.create_entity(values)
       
  2602       key = entity.key()
       
  2603       if not key.id():
       
  2604         continue
       
  2605       kind = key.kind()
       
  2606       ancestor_path = []
       
  2607       if key.parent():
       
  2608         ancestor_path = key.parent().to_path()
       
  2609       if tuple(ancestor_path) not in high_id:
       
  2610         high_id[tuple(ancestor_path)] = {}
       
  2611       kind_map = high_id[tuple(ancestor_path)]
       
  2612       if kind not in kind_map or kind_map[kind] < key.id():
       
  2613         kind_map[kind] = key.id()
       
  2614     return high_id
  2564 
  2615 
  2565   def generate_records(self, filename):
  2616   def generate_records(self, filename):
  2566     while True:
  2617     while True:
  2567       record = self.queue.get(block=True)
  2618       record = self.queue.get(block=True)
  2568       if id(record) == id(RestoreThread._ENTITIES_DONE):
  2619       if id(record) == id(RestoreThread._ENTITIES_DONE):
  2569         break
  2620         break
  2570       yield record
  2621       yield record
  2571 
  2622 
  2572   def create_entity(self, values, key_name=None, parent=None):
  2623   def create_entity(self, values, key_name=None, parent=None):
  2573     key = StrKey(unicode(values[0], 'utf-8'))
  2624     entity_proto = entity_pb.EntityProto(contents=str(values))
  2574     entity_proto = entity_pb.EntityProto(contents=str(values[1]))
  2625     fixed_entity_proto = self._translate_entity_proto(entity_proto)
  2575     entity_proto.mutable_key().CopyFrom(key._Key__reference)
  2626     return datastore.Entity._FromPb(fixed_entity_proto)
  2576     return datastore.Entity._FromPb(entity_proto)
  2627 
       
  2628   def rewrite_reference_proto(self, reference_proto):
       
  2629     """Transform the Reference protobuffer which underlies keys and references.
       
  2630 
       
  2631     Args:
       
  2632       reference_proto: A Onestore Reference proto
       
  2633     """
       
  2634     reference_proto.set_app(self.app_id)
       
  2635 
       
  2636   def _translate_entity_proto(self, entity_proto):
       
  2637     """Transform the ReferenceProperties of the given entity to fix app_id."""
       
  2638     entity_key = entity_proto.mutable_key()
       
  2639     entity_key.set_app(self.app_id)
       
  2640     for prop in entity_proto.property_list():
       
  2641       prop_value = prop.mutable_value()
       
  2642       if prop_value.has_referencevalue():
       
  2643         self.rewrite_reference_proto(prop_value.mutable_referencevalue())
       
  2644 
       
  2645     for prop in entity_proto.raw_property_list():
       
  2646       prop_value = prop.mutable_value()
       
  2647       if prop_value.has_referencevalue():
       
  2648         self.rewrite_reference_proto(prop_value.mutable_referencevalue())
       
  2649 
       
  2650     return entity_proto
  2577 
  2651 
  2578 
  2652 
  2579 class Exporter(object):
  2653 class Exporter(object):
  2580   """A base class for serializing datastore entities.
  2654   """A base class for serializing datastore entities.
  2581 
  2655 
  2660     """
  2734     """
  2661     encoding = []
  2735     encoding = []
  2662     for name, fn, default in self.__properties:
  2736     for name, fn, default in self.__properties:
  2663       try:
  2737       try:
  2664         encoding.append(fn(entity[name]))
  2738         encoding.append(fn(entity[name]))
  2665       except AttributeError:
  2739       except KeyError:
  2666         if default is None:
  2740         if default is None:
  2667           raise MissingPropertyError(name)
  2741           raise MissingPropertyError(name)
  2668         else:
  2742         else:
  2669           encoding.append(default)
  2743           encoding.append(default)
  2670     return encoding
  2744     return encoding
  2952     (scheme,
  3026     (scheme,
  2953      self.host_port, self.url_path,
  3027      self.host_port, self.url_path,
  2954      unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
  3028      unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
  2955     self.secure = (scheme == 'https')
  3029     self.secure = (scheme == 'https')
  2956 
  3030 
       
  3031   def RunPostAuthentication(self):
       
  3032     """Method that gets called after authentication."""
       
  3033     pass
       
  3034 
  2957   def Run(self):
  3035   def Run(self):
  2958     """Perform the work of the BulkTransporterApp.
  3036     """Perform the work of the BulkTransporterApp.
  2959 
  3037 
  2960     Raises:
  3038     Raises:
  2961       AuthenticationError: If authentication is required and fails.
  3039       AuthenticationError: If authentication is required and fails.
  2969 
  3047 
  2970     self.throttle.Register(threading.currentThread())
  3048     self.throttle.Register(threading.currentThread())
  2971     threading.currentThread().exit_flag = False
  3049     threading.currentThread().exit_flag = False
  2972 
  3050 
  2973     progress_queue = self.progress_queue_factory(self.max_queue_size)
  3051     progress_queue = self.progress_queue_factory(self.max_queue_size)
  2974     request_manager = self.request_manager_factory(self.app_id,
  3052     self.request_manager = self.request_manager_factory(self.app_id,
  2975                                                    self.host_port,
  3053                                                         self.host_port,
  2976                                                    self.url_path,
  3054                                                         self.url_path,
  2977                                                    self.kind,
  3055                                                         self.kind,
  2978                                                    self.throttle,
  3056                                                         self.throttle,
  2979                                                    self.batch_size,
  3057                                                         self.batch_size,
  2980                                                    self.secure,
  3058                                                         self.secure,
  2981                                                    self.email,
  3059                                                         self.email,
  2982                                                    self.passin,
  3060                                                         self.passin,
  2983                                                    self.dry_run)
  3061                                                         self.dry_run)
  2984     try:
  3062     try:
  2985       request_manager.Authenticate()
  3063       self.request_manager.Authenticate()
  2986     except Exception, e:
  3064     except Exception, e:
  2987       self.error = True
  3065       self.error = True
  2988       if not isinstance(e, urllib2.HTTPError) or (
  3066       if not isinstance(e, urllib2.HTTPError) or (
  2989           e.code != 302 and e.code != 401):
  3067           e.code != 302 and e.code != 401):
  2990         logger.exception('Exception during authentication')
  3068         logger.exception('Exception during authentication')
  2991       raise AuthenticationError()
  3069       raise AuthenticationError()
  2992     if (request_manager.auth_called and
  3070     if (self.request_manager.auth_called and
  2993         not request_manager.authenticated):
  3071         not self.request_manager.authenticated):
  2994       self.error = True
  3072       self.error = True
  2995       raise AuthenticationError('Authentication failed')
  3073       raise AuthenticationError('Authentication failed')
       
  3074 
       
  3075     self.RunPostAuthentication()
  2996 
  3076 
  2997     for thread in thread_pool.Threads():
  3077     for thread in thread_pool.Threads():
  2998       self.throttle.Register(thread)
  3078       self.throttle.Register(thread)
  2999 
  3079 
  3000     self.progress_thread = self.progresstrackerthread_factory(
  3080     self.progress_thread = self.progresstrackerthread_factory(
  3005       progress_generator_factory = self.progress_db.GetProgressStatusGenerator
  3085       progress_generator_factory = self.progress_db.GetProgressStatusGenerator
  3006     else:
  3086     else:
  3007       progress_generator_factory = None
  3087       progress_generator_factory = None
  3008 
  3088 
  3009     self.data_source_thread = (
  3089     self.data_source_thread = (
  3010         self.datasourcethread_factory(request_manager,
  3090         self.datasourcethread_factory(self.request_manager,
  3011                                       thread_pool,
  3091                                       thread_pool,
  3012                                       progress_queue,
  3092                                       progress_queue,
  3013                                       self.input_generator_factory,
  3093                                       self.input_generator_factory,
  3014                                       progress_generator_factory))
  3094                                       progress_generator_factory))
  3015 
  3095 
  3089 class BulkUploaderApp(BulkTransporterApp):
  3169 class BulkUploaderApp(BulkTransporterApp):
  3090   """Class to encapsulate bulk uploader functionality."""
  3170   """Class to encapsulate bulk uploader functionality."""
  3091 
  3171 
  3092   def __init__(self, *args, **kwargs):
  3172   def __init__(self, *args, **kwargs):
  3093     BulkTransporterApp.__init__(self, *args, **kwargs)
  3173     BulkTransporterApp.__init__(self, *args, **kwargs)
       
  3174 
       
  3175   def RunPostAuthentication(self):
       
  3176     loader = Loader.RegisteredLoader(self.kind)
       
  3177     high_id_table = loader.get_high_ids()
       
  3178     for ancestor_path, kind_map in high_id_table.iteritems():
       
  3179       for kind, high_id in kind_map.iteritems():
       
  3180         self.request_manager.IncrementId(list(ancestor_path), kind, high_id)
  3094 
  3181 
  3095   def ReportStatus(self):
  3182   def ReportStatus(self):
  3096     """Display a message reporting the final status of the transfer."""
  3183     """Display a message reporting the final status of the transfer."""
  3097     total_up, duration = self.throttle.TotalTransferred(
  3184     total_up, duration = self.throttle.TotalTransferred(
  3098         remote_api_throttle.BANDWIDTH_UP)
  3185         remote_api_throttle.BANDWIDTH_UP)
  3623     check_file(filename)
  3710     check_file(filename)
  3624 
  3711 
  3625   if dump:
  3712   if dump:
  3626     Exporter.RegisterExporter(DumpExporter(kind, result_db_filename))
  3713     Exporter.RegisterExporter(DumpExporter(kind, result_db_filename))
  3627   elif restore:
  3714   elif restore:
  3628     Loader.RegisterLoader(RestoreLoader(kind))
  3715     Loader.RegisterLoader(RestoreLoader(kind, app_id))
  3629   else:
  3716   else:
  3630     LoadConfig(config_file)
  3717     LoadConfig(config_file)
  3631 
  3718 
  3632   os.environ['APPLICATION_ID'] = app_id
  3719   os.environ['APPLICATION_ID'] = app_id
  3633 
  3720