393 while self.line_number <= key_end: |
393 while self.line_number <= key_end: |
394 row = self.reader.next() |
394 row = self.reader.next() |
395 self.row_count += 1 |
395 self.row_count += 1 |
396 if self.column_count is None: |
396 if self.column_count is None: |
397 self.column_count = len(row) |
397 self.column_count = len(row) |
398 else: |
|
399 if self.column_count != len(row): |
|
400 raise ResumeError('Column count mismatch, %d: %s' % |
|
401 (self.column_count, str(row))) |
|
402 self.read_rows.append((self.line_number, row)) |
398 self.read_rows.append((self.line_number, row)) |
403 self.line_number += 1 |
399 self.line_number += 1 |
404 |
400 |
405 def _MakeItem(self, key_start, key_end, rows, progress_key=None): |
401 def _MakeItem(self, key_start, key_end, rows, progress_key=None): |
406 """Makes a UploadWorkItem containing the given rows, with the given keys. |
402 """Makes a UploadWorkItem containing the given rows, with the given keys. |
1184 password = None |
1180 password = None |
1185 |
1181 |
1186 self.auth_called = True |
1182 self.auth_called = True |
1187 return (email, password) |
1183 return (email, password) |
1188 |
1184 |
|
1185 def IncrementId(self, ancestor_path, kind, high_id): |
|
1186 """Increment the unique id counter associated with ancestor_path and kind. |
|
1187 |
|
1188 Args: |
|
1189 ancestor_path: A list encoding the path of a key. |
|
1190 kind: The string name of a kind. |
|
1191 high_id: The int value to which to increment the unique id counter. |
|
1192 """ |
|
1193 model_key = datastore.Key.from_path(*(ancestor_path + [kind, 1])) |
|
1194 start, end = datastore.AllocateIds(model_key, 1) |
|
1195 if end < high_id: |
|
1196 start, end = datastore.AllocateIds(model_key, high_id - end) |
|
1197 assert end >= high_id |
|
1198 |
1189 def EncodeContent(self, rows, loader=None): |
1199 def EncodeContent(self, rows, loader=None): |
1190 """Encodes row data to the wire format. |
1200 """Encodes row data to the wire format. |
1191 |
1201 |
1192 Args: |
1202 Args: |
1193 rows: A list of pairs of a line number and a list of column values. |
1203 rows: A list of pairs of a line number and a list of column values. |
2366 Args: |
2376 Args: |
2367 loader: A Loader instance. |
2377 loader: A Loader instance. |
2368 """ |
2378 """ |
2369 Loader.__loaders[loader.kind] = loader |
2379 Loader.__loaders[loader.kind] = loader |
2370 |
2380 |
|
2381 def get_high_ids(self): |
|
2382 """Returns dict {ancestor_path : {kind : id}} with high id values. |
|
2383 |
|
2384 The returned dictionary is used to increment the id counters |
|
2385 associated with each ancestor_path and kind to be at least id. |
|
2386 """ |
|
2387 return {} |
|
2388 |
2371 def alias_old_names(self): |
2389 def alias_old_names(self): |
2372 """Aliases method names so that Loaders defined with old names work.""" |
2390 """Aliases method names so that Loaders defined with old names work.""" |
2373 aliases = ( |
2391 aliases = ( |
2374 ('CreateEntity', 'create_entity'), |
2392 ('CreateEntity', 'create_entity'), |
2375 ('HandleEntity', 'handle_entity'), |
2393 ('HandleEntity', 'handle_entity'), |
2544 def PerformWork(self): |
2562 def PerformWork(self): |
2545 db_conn = sqlite3.connect(self.filename) |
2563 db_conn = sqlite3.connect(self.filename) |
2546 cursor = db_conn.cursor() |
2564 cursor = db_conn.cursor() |
2547 cursor.execute('select id, value from result') |
2565 cursor.execute('select id, value from result') |
2548 for entity_id, value in cursor: |
2566 for entity_id, value in cursor: |
2549 self.queue.put([entity_id, value], block=True) |
2567 self.queue.put(value, block=True) |
2550 self.queue.put(RestoreThread._ENTITIES_DONE, block=True) |
2568 self.queue.put(RestoreThread._ENTITIES_DONE, block=True) |
2551 |
2569 |
2552 |
2570 |
2553 class RestoreLoader(Loader): |
2571 class RestoreLoader(Loader): |
2554 """A Loader which imports protobuffers from a file.""" |
2572 """A Loader which imports protobuffers from a file.""" |
2555 |
2573 |
2556 def __init__(self, kind): |
2574 def __init__(self, kind, app_id): |
2557 self.kind = kind |
2575 self.kind = kind |
|
2576 self.app_id = app_id |
2558 |
2577 |
2559 def initialize(self, filename, loader_opts): |
2578 def initialize(self, filename, loader_opts): |
2560 CheckFile(filename) |
2579 CheckFile(filename) |
2561 self.queue = Queue.Queue(1000) |
2580 self.queue = Queue.Queue(1000) |
2562 restore_thread = RestoreThread(self.queue, filename) |
2581 restore_thread = RestoreThread(self.queue, filename) |
2563 restore_thread.start() |
2582 restore_thread.start() |
|
2583 self.high_id_table = self._find_high_id(self.generate_records(filename)) |
|
2584 restore_thread = RestoreThread(self.queue, filename) |
|
2585 restore_thread.start() |
|
2586 |
|
2587 def get_high_ids(self): |
|
2588 return dict(self.high_id_table) |
|
2589 |
|
2590 def _find_high_id(self, record_generator): |
|
2591 """Find the highest numeric id used for each ancestor-path, kind pair. |
|
2592 |
|
2593 Args: |
|
2594 record_generator: A generator of entity_encoding strings. |
|
2595 |
|
2596 Returns: |
|
2597 A map from ancestor-path to maps from kind to id. {path : {kind : id}} |
|
2598 """ |
|
2599 high_id = {} |
|
2600 for values in record_generator: |
|
2601 entity = self.create_entity(values) |
|
2602 key = entity.key() |
|
2603 if not key.id(): |
|
2604 continue |
|
2605 kind = key.kind() |
|
2606 ancestor_path = [] |
|
2607 if key.parent(): |
|
2608 ancestor_path = key.parent().to_path() |
|
2609 if tuple(ancestor_path) not in high_id: |
|
2610 high_id[tuple(ancestor_path)] = {} |
|
2611 kind_map = high_id[tuple(ancestor_path)] |
|
2612 if kind not in kind_map or kind_map[kind] < key.id(): |
|
2613 kind_map[kind] = key.id() |
|
2614 return high_id |
2564 |
2615 |
2565 def generate_records(self, filename): |
2616 def generate_records(self, filename): |
2566 while True: |
2617 while True: |
2567 record = self.queue.get(block=True) |
2618 record = self.queue.get(block=True) |
2568 if id(record) == id(RestoreThread._ENTITIES_DONE): |
2619 if id(record) == id(RestoreThread._ENTITIES_DONE): |
2569 break |
2620 break |
2570 yield record |
2621 yield record |
2571 |
2622 |
2572 def create_entity(self, values, key_name=None, parent=None): |
2623 def create_entity(self, values, key_name=None, parent=None): |
2573 key = StrKey(unicode(values[0], 'utf-8')) |
2624 entity_proto = entity_pb.EntityProto(contents=str(values)) |
2574 entity_proto = entity_pb.EntityProto(contents=str(values[1])) |
2625 fixed_entity_proto = self._translate_entity_proto(entity_proto) |
2575 entity_proto.mutable_key().CopyFrom(key._Key__reference) |
2626 return datastore.Entity._FromPb(fixed_entity_proto) |
2576 return datastore.Entity._FromPb(entity_proto) |
2627 |
|
2628 def rewrite_reference_proto(self, reference_proto): |
|
2629 """Transform the Reference protobuffer which underlies keys and references. |
|
2630 |
|
2631 Args: |
|
2632 reference_proto: A Onestore Reference proto |
|
2633 """ |
|
2634 reference_proto.set_app(self.app_id) |
|
2635 |
|
2636 def _translate_entity_proto(self, entity_proto): |
|
2637 """Transform the ReferenceProperties of the given entity to fix app_id.""" |
|
2638 entity_key = entity_proto.mutable_key() |
|
2639 entity_key.set_app(self.app_id) |
|
2640 for prop in entity_proto.property_list(): |
|
2641 prop_value = prop.mutable_value() |
|
2642 if prop_value.has_referencevalue(): |
|
2643 self.rewrite_reference_proto(prop_value.mutable_referencevalue()) |
|
2644 |
|
2645 for prop in entity_proto.raw_property_list(): |
|
2646 prop_value = prop.mutable_value() |
|
2647 if prop_value.has_referencevalue(): |
|
2648 self.rewrite_reference_proto(prop_value.mutable_referencevalue()) |
|
2649 |
|
2650 return entity_proto |
2577 |
2651 |
2578 |
2652 |
2579 class Exporter(object): |
2653 class Exporter(object): |
2580 """A base class for serializing datastore entities. |
2654 """A base class for serializing datastore entities. |
2581 |
2655 |
2969 |
3047 |
2970 self.throttle.Register(threading.currentThread()) |
3048 self.throttle.Register(threading.currentThread()) |
2971 threading.currentThread().exit_flag = False |
3049 threading.currentThread().exit_flag = False |
2972 |
3050 |
2973 progress_queue = self.progress_queue_factory(self.max_queue_size) |
3051 progress_queue = self.progress_queue_factory(self.max_queue_size) |
2974 request_manager = self.request_manager_factory(self.app_id, |
3052 self.request_manager = self.request_manager_factory(self.app_id, |
2975 self.host_port, |
3053 self.host_port, |
2976 self.url_path, |
3054 self.url_path, |
2977 self.kind, |
3055 self.kind, |
2978 self.throttle, |
3056 self.throttle, |
2979 self.batch_size, |
3057 self.batch_size, |
2980 self.secure, |
3058 self.secure, |
2981 self.email, |
3059 self.email, |
2982 self.passin, |
3060 self.passin, |
2983 self.dry_run) |
3061 self.dry_run) |
2984 try: |
3062 try: |
2985 request_manager.Authenticate() |
3063 self.request_manager.Authenticate() |
2986 except Exception, e: |
3064 except Exception, e: |
2987 self.error = True |
3065 self.error = True |
2988 if not isinstance(e, urllib2.HTTPError) or ( |
3066 if not isinstance(e, urllib2.HTTPError) or ( |
2989 e.code != 302 and e.code != 401): |
3067 e.code != 302 and e.code != 401): |
2990 logger.exception('Exception during authentication') |
3068 logger.exception('Exception during authentication') |
2991 raise AuthenticationError() |
3069 raise AuthenticationError() |
2992 if (request_manager.auth_called and |
3070 if (self.request_manager.auth_called and |
2993 not request_manager.authenticated): |
3071 not self.request_manager.authenticated): |
2994 self.error = True |
3072 self.error = True |
2995 raise AuthenticationError('Authentication failed') |
3073 raise AuthenticationError('Authentication failed') |
|
3074 |
|
3075 self.RunPostAuthentication() |
2996 |
3076 |
2997 for thread in thread_pool.Threads(): |
3077 for thread in thread_pool.Threads(): |
2998 self.throttle.Register(thread) |
3078 self.throttle.Register(thread) |
2999 |
3079 |
3000 self.progress_thread = self.progresstrackerthread_factory( |
3080 self.progress_thread = self.progresstrackerthread_factory( |
3089 class BulkUploaderApp(BulkTransporterApp): |
3169 class BulkUploaderApp(BulkTransporterApp): |
3090 """Class to encapsulate bulk uploader functionality.""" |
3170 """Class to encapsulate bulk uploader functionality.""" |
3091 |
3171 |
3092 def __init__(self, *args, **kwargs): |
3172 def __init__(self, *args, **kwargs): |
3093 BulkTransporterApp.__init__(self, *args, **kwargs) |
3173 BulkTransporterApp.__init__(self, *args, **kwargs) |
|
3174 |
|
3175 def RunPostAuthentication(self): |
|
3176 loader = Loader.RegisteredLoader(self.kind) |
|
3177 high_id_table = loader.get_high_ids() |
|
3178 for ancestor_path, kind_map in high_id_table.iteritems(): |
|
3179 for kind, high_id in kind_map.iteritems(): |
|
3180 self.request_manager.IncrementId(list(ancestor_path), kind, high_id) |
3094 |
3181 |
3095 def ReportStatus(self): |
3182 def ReportStatus(self): |
3096 """Display a message reporting the final status of the transfer.""" |
3183 """Display a message reporting the final status of the transfer.""" |
3097 total_up, duration = self.throttle.TotalTransferred( |
3184 total_up, duration = self.throttle.TotalTransferred( |
3098 remote_api_throttle.BANDWIDTH_UP) |
3185 remote_api_throttle.BANDWIDTH_UP) |