157 |
216 |
158 class AuthenticationError(Error): |
217 class AuthenticationError(Error): |
159 """Error while trying to authenticate with the server.""" |
218 """Error while trying to authenticate with the server.""" |
160 |
219 |
161 |
220 |
162 def GetCSVGeneratorFactory(csv_filename, batch_size, |
221 class FileNotFoundError(Error): |
|
222 """A filename passed in by the user refers to a non-existent input file.""" |
|
223 |
|
224 |
|
225 class FileNotReadableError(Error): |
|
226 """A filename passed in by the user refers to a non-readable input file.""" |
|
227 |
|
228 |
|
229 class FileExistsError(Error): |
|
230 """A filename passed in by the user refers to an existing output file.""" |
|
231 |
|
232 |
|
233 class FileNotWritableError(Error): |
|
234 """A filename passed in by the user refers to a non-writable output file.""" |
|
235 |
|
236 |
|
237 class KeyRangeError(Error): |
|
238 """Error while trying to generate a KeyRange.""" |
|
239 |
|
240 |
|
241 class BadStateError(Error): |
|
242 """A work item in an unexpected state was encountered.""" |
|
243 |
|
244 |
|
245 class NameClashError(Error): |
|
246 """A name clash occurred while trying to alias old method names.""" |
|
247 def __init__(self, old_name, new_name, klass): |
|
248 Error.__init__(self, old_name, new_name, klass) |
|
249 self.old_name = old_name |
|
250 self.new_name = new_name |
|
251 self.klass = klass |
|
252 |
|
253 |
|
254 def GetCSVGeneratorFactory(kind, csv_filename, batch_size, csv_has_header, |
163 openfile=open, create_csv_reader=csv.reader): |
255 openfile=open, create_csv_reader=csv.reader): |
164 """Return a factory that creates a CSV-based WorkItem generator. |
256 """Return a factory that creates a CSV-based WorkItem generator. |
165 |
257 |
166 Args: |
258 Args: |
|
259 kind: The kind of the entities being uploaded. |
167 csv_filename: File on disk containing CSV data. |
260 csv_filename: File on disk containing CSV data. |
168 batch_size: Maximum number of CSV rows to stash into a WorkItem. |
261 batch_size: Maximum number of CSV rows to stash into a WorkItem. |
|
262 csv_has_header: Whether to skip the first row of the CSV. |
169 openfile: Used for dependency injection. |
263 openfile: Used for dependency injection. |
170 create_csv_reader: Used for dependency injection. |
264 create_csv_reader: Used for dependency injection. |
171 |
265 |
172 Returns: A callable (accepting the Progress Queue and Progress |
266 Returns: |
173 Generators as input) which creates the WorkItem generator. |
267 A callable (accepting the Progress Queue and Progress Generators |
|
268 as input) which creates the WorkItem generator. |
174 """ |
269 """ |
|
270 loader = Loader.RegisteredLoader(kind) |
|
271 loader._Loader__openfile = openfile |
|
272 loader._Loader__create_csv_reader = create_csv_reader |
|
273 record_generator = loader.generate_records(csv_filename) |
175 |
274 |
176 def CreateGenerator(progress_queue, progress_generator): |
275 def CreateGenerator(progress_queue, progress_generator): |
177 """Initialize a CSV generator linked to a progress generator and queue. |
276 """Initialize a WorkItem generator linked to a progress generator and queue. |
178 |
277 |
179 Args: |
278 Args: |
180 progress_queue: A ProgressQueue instance to send progress information. |
279 progress_queue: A ProgressQueue instance to send progress information. |
181 progress_generator: A generator of progress information or None. |
280 progress_generator: A generator of progress information or None. |
182 |
281 |
183 Returns: |
282 Returns: |
184 A CSVGenerator instance. |
283 A WorkItemGenerator instance. |
185 """ |
284 """ |
186 return CSVGenerator(progress_queue, |
285 return WorkItemGenerator(progress_queue, |
187 progress_generator, |
286 progress_generator, |
188 csv_filename, |
287 record_generator, |
189 batch_size, |
288 csv_has_header, |
190 openfile, |
289 batch_size) |
191 create_csv_reader) |
290 |
192 return CreateGenerator |
291 return CreateGenerator |
193 |
292 |
194 |
293 |
195 class CSVGenerator(object): |
294 class WorkItemGenerator(object): |
196 """Reads a CSV file and generates WorkItems containing batches of records.""" |
295 """Reads rows from a row generator and generates WorkItems of batches.""" |
197 |
296 |
198 def __init__(self, |
297 def __init__(self, |
199 progress_queue, |
298 progress_queue, |
200 progress_generator, |
299 progress_generator, |
201 csv_filename, |
300 record_generator, |
202 batch_size, |
301 skip_first, |
203 openfile, |
302 batch_size): |
204 create_csv_reader): |
303 """Initialize a WorkItemGenerator. |
205 """Initializes a CSV generator. |
304 |
206 |
305 Args: |
207 Args: |
306 progress_queue: A progress queue with which to associate WorkItems. |
208 progress_queue: A queue used for tracking progress information. |
307 progress_generator: A generator of progress information. |
209 progress_generator: A generator of prior progress information, or None |
308 record_generator: A generator of data records. |
210 if there is no prior status. |
309 skip_first: Whether to skip the first data record. |
211 csv_filename: File on disk containing CSV data. |
310 batch_size: The number of data records per WorkItem. |
212 batch_size: Maximum number of CSV rows to stash into a WorkItem. |
|
213 openfile: Used for dependency injection of 'open'. |
|
214 create_csv_reader: Used for dependency injection of 'csv.reader'. |
|
215 """ |
311 """ |
216 self.progress_queue = progress_queue |
312 self.progress_queue = progress_queue |
217 self.progress_generator = progress_generator |
313 self.progress_generator = progress_generator |
218 self.csv_filename = csv_filename |
314 self.reader = record_generator |
|
315 self.skip_first = skip_first |
219 self.batch_size = batch_size |
316 self.batch_size = batch_size |
220 self.openfile = openfile |
|
221 self.create_csv_reader = create_csv_reader |
|
222 self.line_number = 1 |
317 self.line_number = 1 |
223 self.column_count = None |
318 self.column_count = None |
224 self.read_rows = [] |
319 self.read_rows = [] |
225 self.reader = None |
|
226 self.row_count = 0 |
320 self.row_count = 0 |
227 self.sent_count = 0 |
321 self.xfer_count = 0 |
228 |
322 |
229 def _AdvanceTo(self, line): |
323 def _AdvanceTo(self, line): |
230 """Advance the reader to the given line. |
324 """Advance the reader to the given line. |
231 |
325 |
232 Args: |
326 Args: |
699 request_manager: A RequestManager instance. |
876 request_manager: A RequestManager instance. |
700 |
877 |
701 Returns: |
878 Returns: |
702 A factory to produce a ThrottledHttpRpcServer. |
879 A factory to produce a ThrottledHttpRpcServer. |
703 """ |
880 """ |
|
881 |
704 def MakeRpcServer(*args, **kwargs): |
882 def MakeRpcServer(*args, **kwargs): |
|
883 """Factory to produce a ThrottledHttpRpcServer. |
|
884 |
|
885 Args: |
|
886 args: Positional args to pass to ThrottledHttpRpcServer. |
|
887 kwargs: Keyword args to pass to ThrottledHttpRpcServer. |
|
888 |
|
889 Returns: |
|
890 A ThrottledHttpRpcServer instance. |
|
891 """ |
705 kwargs['account_type'] = 'HOSTED_OR_GOOGLE' |
892 kwargs['account_type'] = 'HOSTED_OR_GOOGLE' |
706 kwargs['save_cookies'] = True |
893 kwargs['save_cookies'] = True |
707 return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs) |
894 return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs) |
708 return MakeRpcServer |
895 return MakeRpcServer |
709 |
896 |
710 |
897 |
|
898 class ExportResult(object): |
|
899 """Holds the decoded content for the result of an export requests.""" |
|
900 |
|
901 def __init__(self, continued, direction, keys, entities): |
|
902 self.continued = continued |
|
903 self.direction = direction |
|
904 self.keys = keys |
|
905 self.entities = entities |
|
906 self.count = len(keys) |
|
907 assert self.count == len(entities) |
|
908 assert direction in (KeyRange.ASC, KeyRange.DESC) |
|
909 if self.count > 0: |
|
910 if direction == KeyRange.ASC: |
|
911 self.key_start = keys[0] |
|
912 self.key_end = keys[-1] |
|
913 else: |
|
914 self.key_start = keys[-1] |
|
915 self.key_end = keys[0] |
|
916 |
|
917 def __str__(self): |
|
918 return 'continued = %s\n%s' % ( |
|
919 str(self.continued), '\n'.join(self.entities)) |
|
920 |
|
921 |
|
922 class _WorkItem(object): |
|
923 """Holds a description of a unit of upload or download work.""" |
|
924 |
|
925 def __init__(self, progress_queue, key_start, key_end, state_namer, |
|
926 state=STATE_READ, progress_key=None): |
|
927 """Initialize the _WorkItem instance. |
|
928 |
|
929 Args: |
|
930 progress_queue: A queue used for tracking progress information. |
|
931 key_start: The starting key, inclusive. |
|
932 key_end: The ending key, inclusive. |
|
933 state_namer: Function to describe work item states. |
|
934 state: The initial state of the work item. |
|
935 progress_key: If this WorkItem represents state from a prior run, |
|
936 then this will be the key within the progress database. |
|
937 """ |
|
938 self.progress_queue = progress_queue |
|
939 self.key_start = key_start |
|
940 self.key_end = key_end |
|
941 self.state_namer = state_namer |
|
942 self.state = state |
|
943 self.progress_key = progress_key |
|
944 self.progress_event = threading.Event() |
|
945 |
|
946 def _AssertInState(self, *states): |
|
947 """Raises an Error if the state of this range is not in states.""" |
|
948 if not self.state in states: |
|
949 raise BadStateError('%s:%s not in %s' % |
|
950 (str(self), |
|
951 self.state_namer(self.state), |
|
952 map(self.state_namer, states))) |
|
953 |
|
954 def _AssertProgressKey(self): |
|
955 """Raises an Error if the progress key is None.""" |
|
956 if self.progress_key is None: |
|
957 raise BadStateError('%s: Progress key is missing' % str(self)) |
|
958 |
|
959 def MarkAsRead(self): |
|
960 """Mark this _WorkItem as read, updating the progress database.""" |
|
961 self._AssertInState(STATE_READ) |
|
962 self._StateTransition(STATE_READ, blocking=True) |
|
963 |
|
964 def MarkAsTransferring(self): |
|
965 """Mark this _WorkItem as transferring, updating the progress database.""" |
|
966 self._AssertInState(STATE_READ, STATE_NOT_GOT) |
|
967 self._AssertProgressKey() |
|
968 self._StateTransition(STATE_GETTING, blocking=True) |
|
969 |
|
970 def MarkAsTransferred(self): |
|
971 """Mark this _WorkItem as transferred, updating the progress database.""" |
|
972 raise NotImplementedError() |
|
973 |
|
974 def MarkAsError(self): |
|
975 """Mark this _WorkItem as failed, updating the progress database.""" |
|
976 self._AssertInState(STATE_GETTING) |
|
977 self._AssertProgressKey() |
|
978 self._StateTransition(STATE_NOT_GOT, blocking=True) |
|
979 |
|
980 def _StateTransition(self, new_state, blocking=False): |
|
981 """Transition the work item to a new state, storing progress information. |
|
982 |
|
983 Args: |
|
984 new_state: The state to transition to. |
|
985 blocking: Whether to block for the progress thread to acknowledge the |
|
986 transition. |
|
987 """ |
|
988 assert not self.progress_event.isSet() |
|
989 |
|
990 self.state = new_state |
|
991 |
|
992 self.progress_queue.put(self) |
|
993 |
|
994 if blocking: |
|
995 self.progress_event.wait() |
|
996 |
|
997 self.progress_event.clear() |
|
998 |
|
999 |
|
1000 |
|
1001 class WorkItem(_WorkItem): |
|
1002 """Holds a unit of uploading work. |
|
1003 |
|
1004 A WorkItem represents a number of entities that need to be uploaded to |
|
1005 Google App Engine. These entities are encoded in the "content" field of |
|
1006 the WorkItem, and will be POST'd as-is to the server. |
|
1007 |
|
1008 The entities are identified by a range of numeric keys, inclusively. In |
|
1009 the case of a resumption of an upload, or a replay to correct errors, |
|
1010 these keys must be able to identify the same set of entities. |
|
1011 |
|
1012 Note that keys specify a range. The entities do not have to sequentially |
|
1013 fill the entire range, they must simply bound a range of valid keys. |
|
1014 """ |
|
1015 |
|
1016 def __init__(self, progress_queue, rows, key_start, key_end, |
|
1017 progress_key=None): |
|
1018 """Initialize the WorkItem instance. |
|
1019 |
|
1020 Args: |
|
1021 progress_queue: A queue used for tracking progress information. |
|
1022 rows: A list of pairs of a line number and a list of column values |
|
1023 key_start: The (numeric) starting key, inclusive. |
|
1024 key_end: The (numeric) ending key, inclusive. |
|
1025 progress_key: If this WorkItem represents state from a prior run, |
|
1026 then this will be the key within the progress database. |
|
1027 """ |
|
1028 _WorkItem.__init__(self, progress_queue, key_start, key_end, |
|
1029 ImportStateName, state=STATE_READ, |
|
1030 progress_key=progress_key) |
|
1031 |
|
1032 assert isinstance(key_start, (int, long)) |
|
1033 assert isinstance(key_end, (int, long)) |
|
1034 assert key_start <= key_end |
|
1035 |
|
1036 self.rows = rows |
|
1037 self.content = None |
|
1038 self.count = len(rows) |
|
1039 |
|
1040 def __str__(self): |
|
1041 return '[%s-%s]' % (self.key_start, self.key_end) |
|
1042 |
|
1043 def MarkAsTransferred(self): |
|
1044 """Mark this WorkItem as sucessfully-sent to the server.""" |
|
1045 |
|
1046 self._AssertInState(STATE_SENDING) |
|
1047 self._AssertProgressKey() |
|
1048 |
|
1049 self._StateTransition(STATE_SENT, blocking=False) |
|
1050 |
|
1051 |
|
1052 def GetImplementationClass(kind_or_class_key): |
|
1053 """Returns the implementation class for a given kind or class key. |
|
1054 |
|
1055 Args: |
|
1056 kind_or_class_key: A kind string or a tuple of kind strings. |
|
1057 |
|
1058 Return: |
|
1059 A db.Model subclass for the given kind or class key. |
|
1060 """ |
|
1061 if isinstance(kind_or_class_key, tuple): |
|
1062 try: |
|
1063 implementation_class = polymodel._class_map[kind_or_class_key] |
|
1064 except KeyError: |
|
1065 raise db.KindError('No implementation for class \'%s\'' % |
|
1066 kind_or_class_key) |
|
1067 else: |
|
1068 implementation_class = db.class_for_kind(kind_or_class_key) |
|
1069 return implementation_class |
|
1070 |
|
1071 class EmptyQuery(db.Query): |
|
1072 def get(self): |
|
1073 return None |
|
1074 |
|
1075 def fetch(self, limit=1000, offset=0): |
|
1076 return [] |
|
1077 |
|
1078 def count(self, limit=1000): |
|
1079 return 0 |
|
1080 |
|
1081 |
|
1082 def KeyLEQ(key1, key2): |
|
1083 """Compare two keys for less-than-or-equal-to. |
|
1084 |
|
1085 All keys with numeric ids come before all keys with names. |
|
1086 |
|
1087 Args: |
|
1088 key1: An int or db.Key instance. |
|
1089 key2: An int or db.Key instance. |
|
1090 |
|
1091 Returns: |
|
1092 True if key1 <= key2 |
|
1093 """ |
|
1094 if isinstance(key1, int) and isinstance(key2, int): |
|
1095 return key1 <= key2 |
|
1096 if key1 is None or key2 is None: |
|
1097 return True |
|
1098 if key1.id() and not key2.id(): |
|
1099 return True |
|
1100 return key1.id_or_name() <= key2.id_or_name() |
|
1101 |
|
1102 |
|
1103 class KeyRange(_WorkItem): |
|
1104 """Represents an item of download work. |
|
1105 |
|
1106 A KeyRange object represents a key range (key_start, key_end) and a |
|
1107 scan direction (KeyRange.DESC or KeyRange.ASC). The KeyRange object |
|
1108 has an associated state: STATE_READ, STATE_GETTING, STATE_GOT, and |
|
1109 STATE_ERROR. |
|
1110 |
|
1111 - STATE_READ indicates the range ready to be downloaded by a worker thread. |
|
1112 - STATE_GETTING indicates the range is currently being downloaded. |
|
1113 - STATE_GOT indicates that the range was successfully downloaded |
|
1114 - STATE_ERROR indicates that an error occurred during the last download |
|
1115 attempt |
|
1116 |
|
1117 KeyRanges not in the STATE_GOT state are stored in the progress database. |
|
1118 When a piece of KeyRange work is downloaded, the download may cover only |
|
1119 a portion of the range. In this case, the old KeyRange is removed from |
|
1120 the progress database and ranges covering the undownloaded range are |
|
1121 generated and stored as STATE_READ in the export progress database. |
|
1122 """ |
|
1123 |
|
1124 DESC = 0 |
|
1125 ASC = 1 |
|
1126 |
|
1127 MAX_KEY_LEN = 500 |
|
1128 |
|
1129 def __init__(self, |
|
1130 progress_queue, |
|
1131 kind, |
|
1132 direction, |
|
1133 key_start=None, |
|
1134 key_end=None, |
|
1135 include_start=True, |
|
1136 include_end=True, |
|
1137 progress_key=None, |
|
1138 state=STATE_READ): |
|
1139 """Initialize a KeyRange object. |
|
1140 |
|
1141 Args: |
|
1142 progress_queue: A queue used for tracking progress information. |
|
1143 kind: The kind of entities for this range. |
|
1144 direction: The direction of the query for this range. |
|
1145 key_start: The starting key for this range. |
|
1146 key_end: The ending key for this range. |
|
1147 include_start: Whether the start key should be included in the range. |
|
1148 include_end: Whether the end key should be included in the range. |
|
1149 progress_key: The key for this range within the progress database. |
|
1150 state: The initial state of this range. |
|
1151 |
|
1152 Raises: |
|
1153 KeyRangeError: if key_start is None. |
|
1154 """ |
|
1155 assert direction in (KeyRange.ASC, KeyRange.DESC) |
|
1156 _WorkItem.__init__(self, progress_queue, key_start, key_end, |
|
1157 ExportStateName, state=state, progress_key=progress_key) |
|
1158 self.kind = kind |
|
1159 self.direction = direction |
|
1160 self.export_result = None |
|
1161 self.count = 0 |
|
1162 self.include_start = include_start |
|
1163 self.include_end = include_end |
|
1164 self.SPLIT_KEY = db.Key.from_path(self.kind, unichr(0)) |
|
1165 |
|
1166 def __str__(self): |
|
1167 return '[%s-%s]' % (PrettyKey(self.key_start), PrettyKey(self.key_end)) |
|
1168 |
|
1169 def __repr__(self): |
|
1170 return self.__str__() |
|
1171 |
|
1172 def MarkAsTransferred(self): |
|
1173 """Mark this KeyRange as transferred, updating the progress database.""" |
|
1174 pass |
|
1175 |
|
1176 def Process(self, export_result, num_threads, batch_size, work_queue): |
|
1177 """Mark this KeyRange as success, updating the progress database. |
|
1178 |
|
1179 Process will split this KeyRange based on the content of export_result and |
|
1180 adds the unfinished ranges to the work queue. |
|
1181 |
|
1182 Args: |
|
1183 export_result: An ExportResult instance. |
|
1184 num_threads: The number of threads for parallel transfers. |
|
1185 batch_size: The number of entities to transfer per request. |
|
1186 work_queue: The work queue to add unfinished ranges to. |
|
1187 |
|
1188 Returns: |
|
1189 A list of KeyRanges representing undownloaded datastore key ranges. |
|
1190 """ |
|
1191 self._AssertInState(STATE_GETTING) |
|
1192 self._AssertProgressKey() |
|
1193 |
|
1194 self.export_result = export_result |
|
1195 self.count = len(export_result.keys) |
|
1196 if export_result.continued: |
|
1197 self._FinishedRange()._StateTransition(STATE_GOT, blocking=True) |
|
1198 self._AddUnfinishedRanges(num_threads, batch_size, work_queue) |
|
1199 else: |
|
1200 self._StateTransition(STATE_GOT, blocking=True) |
|
1201 |
|
1202 def _FinishedRange(self): |
|
1203 """Returns the range completed by the export_result. |
|
1204 |
|
1205 Returns: |
|
1206 A KeyRange representing a completed range. |
|
1207 """ |
|
1208 assert self.export_result is not None |
|
1209 |
|
1210 if self.direction == KeyRange.ASC: |
|
1211 key_start = self.key_start |
|
1212 if self.export_result.continued: |
|
1213 key_end = self.export_result.key_end |
|
1214 else: |
|
1215 key_end = self.key_end |
|
1216 else: |
|
1217 key_end = self.key_end |
|
1218 if self.export_result.continued: |
|
1219 key_start = self.export_result.key_start |
|
1220 else: |
|
1221 key_start = self.key_start |
|
1222 |
|
1223 result = KeyRange(self.progress_queue, |
|
1224 self.kind, |
|
1225 key_start=key_start, |
|
1226 key_end=key_end, |
|
1227 direction=self.direction) |
|
1228 |
|
1229 result.progress_key = self.progress_key |
|
1230 result.export_result = self.export_result |
|
1231 result.state = self.state |
|
1232 result.count = self.count |
|
1233 return result |
|
1234 |
|
1235 def FilterQuery(self, query): |
|
1236 """Add query filter to restrict to this key range. |
|
1237 |
|
1238 Args: |
|
1239 query: A db.Query instance. |
|
1240 """ |
|
1241 if self.key_start == self.key_end and not ( |
|
1242 self.include_start or self.include_end): |
|
1243 return EmptyQuery() |
|
1244 if self.include_start: |
|
1245 start_comparator = '>=' |
|
1246 else: |
|
1247 start_comparator = '>' |
|
1248 if self.include_end: |
|
1249 end_comparator = '<=' |
|
1250 else: |
|
1251 end_comparator = '<' |
|
1252 if self.key_start and self.key_end: |
|
1253 query.filter('__key__ %s' % start_comparator, self.key_start) |
|
1254 query.filter('__key__ %s' % end_comparator, self.key_end) |
|
1255 elif self.key_start: |
|
1256 query.filter('__key__ %s' % start_comparator, self.key_start) |
|
1257 elif self.key_end: |
|
1258 query.filter('__key__ %s' % end_comparator, self.key_end) |
|
1259 |
|
1260 return query |
|
1261 |
|
1262 def MakeParallelQuery(self): |
|
1263 """Construct a query for this key range, for parallel downloading. |
|
1264 |
|
1265 Returns: |
|
1266 A db.Query instance. |
|
1267 |
|
1268 Raises: |
|
1269 KeyRangeError: if self.direction is not one of |
|
1270 KeyRange.ASC, KeyRange.DESC |
|
1271 """ |
|
1272 if self.direction == KeyRange.ASC: |
|
1273 direction = '' |
|
1274 elif self.direction == KeyRange.DESC: |
|
1275 direction = '-' |
|
1276 else: |
|
1277 raise KeyRangeError('KeyRange direction unexpected: %s', self.direction) |
|
1278 query = db.Query(GetImplementationClass(self.kind)) |
|
1279 query.order('%s__key__' % direction) |
|
1280 |
|
1281 return self.FilterQuery(query) |
|
1282 |
|
1283 def MakeSerialQuery(self): |
|
1284 """Construct a query for this key range without descending __key__ scan. |
|
1285 |
|
1286 Returns: |
|
1287 A db.Query instance. |
|
1288 """ |
|
1289 query = db.Query(GetImplementationClass(self.kind)) |
|
1290 query.order('__key__') |
|
1291 |
|
1292 return self.FilterQuery(query) |
|
1293 |
|
1294 def _BisectStringRange(self, start, end): |
|
1295 if start == end: |
|
1296 return (start, start, end) |
|
1297 start += '\0' |
|
1298 end += '\0' |
|
1299 midpoint = [] |
|
1300 expected_max = 127 |
|
1301 for i in xrange(min(len(start), len(end))): |
|
1302 if start[i] == end[i]: |
|
1303 midpoint.append(start[i]) |
|
1304 else: |
|
1305 ord_sum = ord(start[i]) + ord(end[i]) |
|
1306 midpoint.append(unichr(ord_sum / 2)) |
|
1307 if ord_sum % 2: |
|
1308 if len(start) > i + 1: |
|
1309 ord_start = ord(start[i+1]) |
|
1310 else: |
|
1311 ord_start = 0 |
|
1312 if ord_start < expected_max: |
|
1313 ord_split = (expected_max + ord_start) / 2 |
|
1314 else: |
|
1315 ord_split = (0xFFFF + ord_start) / 2 |
|
1316 midpoint.append(unichr(ord_split)) |
|
1317 break |
|
1318 return (start[:-1], ''.join(midpoint), end[:-1]) |
|
1319 |
|
1320 def SplitRange(self, key_start, include_start, key_end, include_end, |
|
1321 export_result, num_threads, batch_size, work_queue): |
|
1322 """Split the key range [key_start, key_end] into a list of ranges.""" |
|
1323 if export_result.direction == KeyRange.ASC: |
|
1324 key_start = export_result.key_end |
|
1325 include_start = False |
|
1326 else: |
|
1327 key_end = export_result.key_start |
|
1328 include_end = False |
|
1329 key_pairs = [] |
|
1330 if not key_start: |
|
1331 key_pairs.append((key_start, include_start, key_end, include_end, |
|
1332 KeyRange.ASC)) |
|
1333 elif not key_end: |
|
1334 key_pairs.append((key_start, include_start, key_end, include_end, |
|
1335 KeyRange.DESC)) |
|
1336 elif work_queue.qsize() > 2 * num_threads: |
|
1337 key_pairs.append((key_start, include_start, key_end, include_end, |
|
1338 KeyRange.ASC)) |
|
1339 elif key_start.id() and key_end.id(): |
|
1340 if key_end.id() - key_start.id() > batch_size: |
|
1341 key_half = db.Key.from_path(self.kind, |
|
1342 (key_start.id() + key_end.id()) / 2) |
|
1343 key_pairs.append((key_start, include_start, |
|
1344 key_half, True, |
|
1345 KeyRange.DESC)) |
|
1346 key_pairs.append((key_half, False, |
|
1347 key_end, include_end, |
|
1348 KeyRange.ASC)) |
|
1349 else: |
|
1350 key_pairs.append((key_start, include_start, key_end, include_end, |
|
1351 KeyRange.ASC)) |
|
1352 elif key_start.name() and key_end.name(): |
|
1353 (start, middle, end) = self._BisectStringRange(key_start.name(), |
|
1354 key_end.name()) |
|
1355 key_pairs.append((key_start, include_start, |
|
1356 db.Key.from_path(self.kind, middle), True, |
|
1357 KeyRange.DESC)) |
|
1358 key_pairs.append((db.Key.from_path(self.kind, middle), False, |
|
1359 key_end, include_end, |
|
1360 KeyRange.ASC)) |
|
1361 else: |
|
1362 assert key_start.id() and key_end.name() |
|
1363 key_pairs.append((key_start, include_start, |
|
1364 self.SPLIT_KEY, False, |
|
1365 KeyRange.DESC)) |
|
1366 key_pairs.append((self.SPLIT_KEY, True, |
|
1367 key_end, include_end, |
|
1368 KeyRange.ASC)) |
|
1369 |
|
1370 ranges = [KeyRange(self.progress_queue, |
|
1371 self.kind, |
|
1372 key_start=start, |
|
1373 include_start=include_start, |
|
1374 key_end=end, |
|
1375 include_end=include_end, |
|
1376 direction=direction) |
|
1377 for (start, include_start, end, include_end, direction) |
|
1378 in key_pairs] |
|
1379 |
|
1380 for key_range in ranges: |
|
1381 key_range.MarkAsRead() |
|
1382 work_queue.put(key_range, block=True) |
|
1383 |
|
1384 def _AddUnfinishedRanges(self, num_threads, batch_size, work_queue): |
|
1385 """Adds incomplete KeyRanges to the work_queue. |
|
1386 |
|
1387 Args: |
|
1388 num_threads: The number of threads for parallel transfers. |
|
1389 batch_size: The number of entities to transfer per request. |
|
1390 work_queue: The work queue to add unfinished ranges to. |
|
1391 |
|
1392 Returns: |
|
1393 A list of KeyRanges representing incomplete datastore key ranges. |
|
1394 |
|
1395 Raises: |
|
1396 KeyRangeError: if this key range has already been completely transferred. |
|
1397 """ |
|
1398 assert self.export_result is not None |
|
1399 if self.export_result.continued: |
|
1400 self.SplitRange(self.key_start, self.include_start, self.key_end, |
|
1401 self.include_end, self.export_result, |
|
1402 num_threads, batch_size, work_queue) |
|
1403 else: |
|
1404 raise KeyRangeError('No unfinished part of key range.') |
|
1405 |
|
1406 |
711 class RequestManager(object): |
1407 class RequestManager(object): |
712 """A class which wraps a connection to the server.""" |
1408 """A class which wraps a connection to the server.""" |
713 |
|
714 source = 'google-bulkloader-%s' % UPLOADER_VERSION |
|
715 user_agent = source |
|
716 |
1409 |
717 def __init__(self, |
1410 def __init__(self, |
718 app_id, |
1411 app_id, |
719 host_port, |
1412 host_port, |
720 url_path, |
1413 url_path, |
721 kind, |
1414 kind, |
722 throttle): |
1415 throttle, |
|
1416 batch_size, |
|
1417 secure, |
|
1418 email, |
|
1419 passin): |
723 """Initialize a RequestManager object. |
1420 """Initialize a RequestManager object. |
724 |
1421 |
725 Args: |
1422 Args: |
726 app_id: String containing the application id for requests. |
1423 app_id: String containing the application id for requests. |
727 host_port: String containing the "host:port" pair; the port is optional. |
1424 host_port: String containing the "host:port" pair; the port is optional. |
728 url_path: partial URL (path) to post entity data to. |
1425 url_path: partial URL (path) to post entity data to. |
729 kind: Kind of the Entity records being posted. |
1426 kind: Kind of the Entity records being posted. |
730 throttle: A Throttle instance. |
1427 throttle: A Throttle instance. |
|
1428 batch_size: The number of entities to transfer per request. |
|
1429 secure: Use SSL when communicating with server. |
|
1430 email: If not none, the username to log in with. |
|
1431 passin: If True, the password will be read from standard in. |
731 """ |
1432 """ |
732 self.app_id = app_id |
1433 self.app_id = app_id |
733 self.host_port = host_port |
1434 self.host_port = host_port |
734 self.host = host_port.split(':')[0] |
1435 self.host = host_port.split(':')[0] |
735 if url_path and url_path[0] != '/': |
1436 if url_path and url_path[0] != '/': |
736 url_path = '/' + url_path |
1437 url_path = '/' + url_path |
737 self.url_path = url_path |
1438 self.url_path = url_path |
738 self.kind = kind |
1439 self.kind = kind |
739 self.throttle = throttle |
1440 self.throttle = throttle |
740 self.credentials = None |
1441 self.batch_size = batch_size |
|
1442 self.secure = secure |
|
1443 self.authenticated = False |
|
1444 self.auth_called = False |
|
1445 self.parallel_download = True |
|
1446 self.email = email |
|
1447 self.passin = passin |
741 throttled_rpc_server_factory = ThrottledHttpRpcServerFactory( |
1448 throttled_rpc_server_factory = ThrottledHttpRpcServerFactory( |
742 self.throttle, self) |
1449 self.throttle, self) |
743 logging.debug('Configuring remote_api. app_id = %s, url_path = %s, ' |
1450 logger.debug('Configuring remote_api. url_path = %s, ' |
744 'servername = %s' % (app_id, url_path, host_port)) |
1451 'servername = %s' % (url_path, host_port)) |
745 remote_api_stub.ConfigureRemoteDatastore( |
1452 remote_api_stub.ConfigureRemoteDatastore( |
746 app_id, |
1453 app_id, |
747 url_path, |
1454 url_path, |
748 self.AuthFunction, |
1455 self.AuthFunction, |
749 servername=host_port, |
1456 servername=host_port, |
750 rpc_server_factory=throttled_rpc_server_factory) |
1457 rpc_server_factory=throttled_rpc_server_factory, |
751 self.authenticated = False |
1458 secure=self.secure) |
|
1459 logger.debug('Bulkloader using app_id: %s', os.environ['APPLICATION_ID']) |
752 |
1460 |
753 def Authenticate(self): |
1461 def Authenticate(self): |
754 """Invoke authentication if necessary.""" |
1462 """Invoke authentication if necessary.""" |
|
1463 logger.info('Connecting to %s', self.url_path) |
755 self.rpc_server.Send(self.url_path, payload=None) |
1464 self.rpc_server.Send(self.url_path, payload=None) |
756 self.authenticated = True |
1465 self.authenticated = True |
757 |
1466 |
758 def AuthFunction(self, |
1467 def AuthFunction(self, |
759 raw_input_fn=raw_input, |
1468 raw_input_fn=raw_input, |
824 from the server as a str. |
1544 from the server as a str. |
825 """ |
1545 """ |
826 entities = item.content |
1546 entities = item.content |
827 db.put(entities) |
1547 db.put(entities) |
828 |
1548 |
829 |
1549 def GetEntities(self, key_range): |
830 class WorkItem(object): |
1550 """Gets Entity records from a remote endpoint over HTTP. |
831 """Holds a unit of uploading work. |
1551 |
832 |
1552 Args: |
833 A WorkItem represents a number of entities that need to be uploaded to |
1553 key_range: Range of keys to get. |
834 Google App Engine. These entities are encoded in the "content" field of |
1554 |
835 the WorkItem, and will be POST'd as-is to the server. |
1555 Returns: |
836 |
1556 An ExportResult instance. |
837 The entities are identified by a range of numeric keys, inclusively. In |
1557 |
838 the case of a resumption of an upload, or a replay to correct errors, |
1558 Raises: |
839 these keys must be able to identify the same set of entities. |
1559 ConfigurationError: if no Exporter is defined for self.kind |
840 |
1560 """ |
841 Note that keys specify a range. The entities do not have to sequentially |
1561 try: |
842 fill the entire range, they must simply bound a range of valid keys. |
1562 Exporter.RegisteredExporter(self.kind) |
843 """ |
1563 except KeyError: |
844 |
1564 raise ConfigurationError('No Exporter defined for kind %s.' % self.kind) |
845 def __init__(self, progress_queue, rows, key_start, key_end, |
1565 |
846 progress_key=None): |
1566 keys = [] |
847 """Initialize the WorkItem instance. |
1567 entities = [] |
848 |
1568 |
849 Args: |
1569 if self.parallel_download: |
850 progress_queue: A queue used for tracking progress information. |
1570 query = key_range.MakeParallelQuery() |
851 rows: A list of pairs of a line number and a list of column values |
1571 try: |
852 key_start: The (numeric) starting key, inclusive. |
1572 results = query.fetch(self.batch_size) |
853 key_end: The (numeric) ending key, inclusive. |
1573 except datastore_errors.NeedIndexError: |
854 progress_key: If this WorkItem represents state from a prior run, |
1574 logger.info('%s: No descending index on __key__, ' |
855 then this will be the key within the progress database. |
1575 'performing serial download', self.kind) |
856 """ |
1576 self.parallel_download = False |
857 self.state = STATE_READ |
1577 |
858 |
1578 if not self.parallel_download: |
859 self.progress_queue = progress_queue |
1579 key_range.direction = KeyRange.ASC |
860 |
1580 query = key_range.MakeSerialQuery() |
861 assert isinstance(key_start, (int, long)) |
1581 results = query.fetch(self.batch_size) |
862 assert isinstance(key_end, (int, long)) |
1582 |
863 assert key_start <= key_end |
1583 size = len(results) |
864 |
1584 |
865 self.key_start = key_start |
1585 for model in results: |
866 self.key_end = key_end |
1586 key = model.key() |
867 self.progress_key = progress_key |
1587 entities.append(cPickle.dumps(model)) |
868 |
1588 keys.append(key) |
869 self.progress_event = threading.Event() |
1589 |
870 |
1590 continued = (size == self.batch_size) |
871 self.rows = rows |
1591 key_range.count = size |
872 self.content = None |
1592 |
873 self.count = len(rows) |
1593 return ExportResult(continued, key_range.direction, keys, entities) |
874 |
|
875 def MarkAsRead(self): |
|
876 """Mark this WorkItem as read/consumed from the data source.""" |
|
877 |
|
878 assert self.state == STATE_READ |
|
879 |
|
880 self._StateTransition(STATE_READ, blocking=True) |
|
881 |
|
882 assert self.progress_key is not None |
|
883 |
|
884 def MarkAsSending(self): |
|
885 """Mark this WorkItem as in-process on being uploaded to the server.""" |
|
886 |
|
887 assert self.state == STATE_READ or self.state == STATE_NOT_SENT |
|
888 assert self.progress_key is not None |
|
889 |
|
890 self._StateTransition(STATE_SENDING, blocking=True) |
|
891 |
|
892 def MarkAsSent(self): |
|
893 """Mark this WorkItem as sucessfully-sent to the server.""" |
|
894 |
|
895 assert self.state == STATE_SENDING |
|
896 assert self.progress_key is not None |
|
897 |
|
898 self._StateTransition(STATE_SENT, blocking=False) |
|
899 |
|
900 def MarkAsError(self): |
|
901 """Mark this WorkItem as required manual error recovery.""" |
|
902 |
|
903 assert self.state == STATE_SENDING |
|
904 assert self.progress_key is not None |
|
905 |
|
906 self._StateTransition(STATE_NOT_SENT, blocking=True) |
|
907 |
|
908 def _StateTransition(self, new_state, blocking=False): |
|
909 """Transition the work item to a new state, storing progress information. |
|
910 |
|
911 Args: |
|
912 new_state: The state to transition to. |
|
913 blocking: Whether to block for the progress thread to acknowledge the |
|
914 transition. |
|
915 """ |
|
916 logging.debug('[%s-%s] %s' % |
|
917 (self.key_start, self.key_end, StateMessage(self.state))) |
|
918 assert not self.progress_event.isSet() |
|
919 |
|
920 self.state = new_state |
|
921 |
|
922 self.progress_queue.put(self) |
|
923 |
|
924 if blocking: |
|
925 self.progress_event.wait() |
|
926 |
|
927 self.progress_event.clear() |
|
928 |
|
929 |
1594 |
930 |
1595 |
931 def InterruptibleSleep(sleep_time): |
1596 def InterruptibleSleep(sleep_time): |
932 """Puts thread to sleep, checking this threads exit_flag twice a second. |
1597 """Puts thread to sleep, checking this threads exit_flag twice a second. |
933 |
1598 |
1297 self.exit_flag = False |
1987 self.exit_flag = False |
1298 self.error = None |
1988 self.error = None |
1299 |
1989 |
1300 def run(self): |
1990 def run(self): |
1301 """Perform the work of the thread.""" |
1991 """Perform the work of the thread.""" |
1302 logging.info('[%s] %s: started', self.getName(), self.__class__.__name__) |
1992 logger.info('[%s] %s: started', self.getName(), self.__class__.__name__) |
1303 |
1993 |
1304 try: |
1994 try: |
1305 self.PerformWork() |
1995 self.PerformWork() |
1306 except: |
1996 except: |
1307 self.error = sys.exc_info()[1] |
1997 self.error = sys.exc_info()[1] |
1308 logging.exception('[%s] %s:', self.getName(), self.__class__.__name__) |
1998 logger.exception('[%s] %s:', self.getName(), self.__class__.__name__) |
1309 |
1999 |
1310 logging.info('[%s] %s: exiting', self.getName(), self.__class__.__name__) |
2000 logger.info('[%s] %s: exiting', self.getName(), self.__class__.__name__) |
1311 |
2001 |
1312 def PerformWork(self): |
2002 def PerformWork(self): |
1313 """Perform the thread-specific work.""" |
2003 """Perform the thread-specific work.""" |
1314 raise NotImplementedError() |
2004 raise NotImplementedError() |
1315 |
2005 |
1316 def CheckError(self): |
2006 def CheckError(self): |
1317 """If an error is present, then log it.""" |
2007 """If an error is present, then log it.""" |
1318 if self.error: |
2008 if self.error: |
1319 logging.error('Error in %s: %s', self.GetFriendlyName(), self.error) |
2009 logger.error('Error in %s: %s', self.GetFriendlyName(), self.error) |
1320 |
2010 |
1321 def GetFriendlyName(self): |
2011 def GetFriendlyName(self): |
1322 """Returns a human-friendly description of the thread.""" |
2012 """Returns a human-friendly description of the thread.""" |
1323 if hasattr(self, 'NAME'): |
2013 if hasattr(self, 'NAME'): |
1324 return self.NAME |
2014 return self.NAME |
1325 return 'unknown thread' |
2015 return 'unknown thread' |
1326 |
2016 |
1327 |
2017 |
1328 class BulkLoaderThread(_ThreadBase): |
2018 non_fatal_error_codes = set([errno.EAGAIN, |
1329 """A thread which transmits entities to the server application. |
2019 errno.ENETUNREACH, |
|
2020 errno.ENETRESET, |
|
2021 errno.ECONNRESET, |
|
2022 errno.ETIMEDOUT, |
|
2023 errno.EHOSTUNREACH]) |
|
2024 |
|
2025 |
|
2026 def IsURLErrorFatal(error): |
|
2027 """Returns False if the given URLError may be from a transient failure. |
|
2028 |
|
2029 Args: |
|
2030 error: A urllib2.URLError instance. |
|
2031 """ |
|
2032 assert isinstance(error, urllib2.URLError) |
|
2033 if not hasattr(error, 'reason'): |
|
2034 return True |
|
2035 if not isinstance(error.reason[0], int): |
|
2036 return True |
|
2037 return error.reason[0] not in non_fatal_error_codes |
|
2038 |
|
2039 |
|
2040 def PrettyKey(key): |
|
2041 """Returns a nice string representation of the given key.""" |
|
2042 if key is None: |
|
2043 return None |
|
2044 elif isinstance(key, db.Key): |
|
2045 return repr(key.id_or_name()) |
|
2046 return str(key) |
|
2047 |
|
2048 |
|
2049 class _BulkWorkerThread(_ThreadBase): |
|
2050 """A base class for worker threads. |
1330 |
2051 |
1331 This thread will read WorkItem instances from the work_queue and upload |
2052 This thread will read WorkItem instances from the work_queue and upload |
1332 the entities to the server application. Progress information will be |
2053 the entities to the server application. Progress information will be |
1333 pushed into the progress_queue as the work is being performed. |
2054 pushed into the progress_queue as the work is being performed. |
1334 |
2055 |
1335 If a BulkLoaderThread encounters a transient error, the entities will be |
2056 If a _BulkWorkerThread encounters a transient error, the entities will be |
1336 resent, if a fatal error is encoutered the BulkLoaderThread exits. |
2057 resent, if a fatal error is encoutered the BulkWorkerThread exits. |
|
2058 |
|
2059 Subclasses must provide implementations for PreProcessItem, TransferItem, |
|
2060 and ProcessResponse. |
1337 """ |
2061 """ |
1338 |
2062 |
1339 def __init__(self, |
2063 def __init__(self, |
1340 work_queue, |
2064 work_queue, |
1341 throttle, |
2065 throttle, |
1342 thread_gate, |
2066 thread_gate, |
1343 request_manager): |
2067 request_manager, |
|
2068 num_threads, |
|
2069 batch_size, |
|
2070 state_message, |
|
2071 get_time): |
1344 """Initialize the BulkLoaderThread instance. |
2072 """Initialize the BulkLoaderThread instance. |
1345 |
2073 |
1346 Args: |
2074 Args: |
1347 work_queue: A queue containing WorkItems for processing. |
2075 work_queue: A queue containing WorkItems for processing. |
1348 throttle: A Throttles to control upload bandwidth. |
2076 throttle: A Throttles to control upload bandwidth. |
1349 thread_gate: A ThreadGate to control number of simultaneous uploads. |
2077 thread_gate: A ThreadGate to control number of simultaneous uploads. |
1350 request_manager: A RequestManager instance. |
2078 request_manager: A RequestManager instance. |
|
2079 num_threads: The number of threads for parallel transfers. |
|
2080 batch_size: The number of entities to transfer per request. |
|
2081 state_message: Used for dependency injection. |
|
2082 get_time: Used for dependency injection. |
1351 """ |
2083 """ |
1352 _ThreadBase.__init__(self) |
2084 _ThreadBase.__init__(self) |
1353 |
2085 |
1354 self.work_queue = work_queue |
2086 self.work_queue = work_queue |
1355 self.throttle = throttle |
2087 self.throttle = throttle |
1356 self.thread_gate = thread_gate |
2088 self.thread_gate = thread_gate |
1357 |
|
1358 self.request_manager = request_manager |
2089 self.request_manager = request_manager |
|
2090 self.num_threads = num_threads |
|
2091 self.batch_size = batch_size |
|
2092 self.state_message = state_message |
|
2093 self.get_time = get_time |
|
2094 |
|
2095 def PreProcessItem(self, item): |
|
2096 """Performs pre transfer processing on a work item.""" |
|
2097 raise NotImplementedError() |
|
2098 |
|
2099 def TransferItem(self, item): |
|
2100 """Transfers the entities associated with an item. |
|
2101 |
|
2102 Args: |
|
2103 item: An item of upload (WorkItem) or download (KeyRange) work. |
|
2104 |
|
2105 Returns: |
|
2106 A tuple of (estimated transfer size, response) |
|
2107 """ |
|
2108 raise NotImplementedError() |
|
2109 |
|
2110 def ProcessResponse(self, item, result): |
|
2111 """Processes the response from the server application.""" |
|
2112 raise NotImplementedError() |
1359 |
2113 |
1360 def PerformWork(self): |
2114 def PerformWork(self): |
1361 """Perform the work of a BulkLoaderThread.""" |
2115 """Perform the work of a _BulkWorkerThread.""" |
1362 while not self.exit_flag: |
2116 while not self.exit_flag: |
1363 success = False |
2117 transferred = False |
1364 self.thread_gate.StartWork() |
2118 self.thread_gate.StartWork() |
1365 try: |
2119 try: |
1366 try: |
2120 try: |
1367 item = self.work_queue.get(block=True, timeout=1.0) |
2121 item = self.work_queue.get(block=True, timeout=1.0) |
1368 except Queue.Empty: |
2122 except Queue.Empty: |
1369 continue |
2123 continue |
1370 if item == _THREAD_SHOULD_EXIT: |
2124 if item == _THREAD_SHOULD_EXIT: |
1371 break |
2125 break |
1372 |
2126 |
1373 logging.debug('[%s] Got work item [%d-%d]', |
2127 logger.debug('[%s] Got work item %s', self.getName(), item) |
1374 self.getName(), item.key_start, item.key_end) |
|
1375 |
2128 |
1376 try: |
2129 try: |
1377 |
2130 |
1378 item.MarkAsSending() |
2131 item.MarkAsTransferring() |
|
2132 self.PreProcessItem(item) |
|
2133 response = None |
1379 try: |
2134 try: |
1380 if item.content is None: |
|
1381 item.content = self.request_manager.EncodeContent(item.rows) |
|
1382 try: |
2135 try: |
1383 self.request_manager.PostEntities(item) |
2136 t = self.get_time() |
1384 success = True |
2137 response = self.TransferItem(item) |
1385 logging.debug( |
2138 status = 200 |
1386 '[%d-%d] Sent %d entities', |
2139 transferred = True |
1387 item.key_start, item.key_end, item.count) |
2140 transfer_time = self.get_time() - t |
|
2141 logger.debug('[%s] %s Transferred %d entities', self.getName(), |
|
2142 item, item.count) |
1388 self.throttle.AddTransfer(RECORDS, item.count) |
2143 self.throttle.AddTransfer(RECORDS, item.count) |
1389 except (db.InternalError, db.NotSavedError, db.Timeout), e: |
2144 except (db.InternalError, db.NotSavedError, db.Timeout, |
1390 logging.debug('Caught non-fatal error: %s', e) |
2145 apiproxy_errors.OverQuotaError, |
|
2146 apiproxy_errors.DeadlineExceededError), e: |
|
2147 logger.exception('Caught non-fatal datastore error: %s', e) |
1391 except urllib2.HTTPError, e: |
2148 except urllib2.HTTPError, e: |
1392 if e.code == 403 or (e.code >= 500 and e.code < 600): |
2149 status = e.code |
1393 logging.debug('Caught HTTP error %d', e.code) |
2150 if status == 403 or (status >= 500 and status < 600): |
1394 logging.debug('%s', e.read()) |
2151 logger.exception('Caught non-fatal HTTP error: %d %s', |
|
2152 status, e.msg) |
1395 else: |
2153 else: |
1396 raise e |
2154 raise e |
|
2155 except urllib2.URLError, e: |
|
2156 if IsURLErrorFatal(e): |
|
2157 raise e |
|
2158 else: |
|
2159 logger.exception('Caught non-fatal URL error: %s', e.reason) |
|
2160 |
|
2161 self.ProcessResponse(item, response) |
1397 |
2162 |
1398 except: |
2163 except: |
1399 self.error = sys.exc_info()[1] |
2164 self.error = sys.exc_info()[1] |
1400 logging.exception('[%s] %s: caught exception %s', self.getName(), |
2165 logger.exception('[%s] %s: caught exception %s', self.getName(), |
1401 self.__class__.__name__, str(sys.exc_info())) |
2166 self.__class__.__name__, str(sys.exc_info())) |
1402 raise |
2167 raise |
1403 |
2168 |
1404 finally: |
2169 finally: |
1405 if success: |
2170 if transferred: |
1406 item.MarkAsSent() |
2171 item.MarkAsTransferred() |
1407 self.thread_gate.IncreaseWorkers() |
2172 self.thread_gate.TransferSuccess(transfer_time) |
1408 self.work_queue.task_done() |
2173 self.work_queue.task_done() |
1409 else: |
2174 else: |
1410 item.MarkAsError() |
2175 item.MarkAsError() |
1411 self.thread_gate.DecreaseWorkers() |
|
1412 try: |
2176 try: |
1413 self.work_queue.reput(item, block=False) |
2177 self.work_queue.reput(item, block=False) |
1414 except Queue.Full: |
2178 except Queue.Full: |
1415 logging.error('[%s] Failed to reput work item.', self.getName()) |
2179 logger.error('[%s] Failed to reput work item.', self.getName()) |
1416 raise Error('Failed to reput work item') |
2180 raise Error('Failed to reput work item') |
1417 logging.info('[%d-%d] %s', |
2181 self.thread_gate.DecreaseWorkers() |
1418 item.key_start, item.key_end, StateMessage(item.state)) |
2182 logger.info('%s %s', |
|
2183 item, |
|
2184 self.state_message(item.state)) |
1419 |
2185 |
1420 finally: |
2186 finally: |
1421 self.thread_gate.FinishWork() |
2187 self.thread_gate.FinishWork() |
1422 |
2188 |
1423 |
2189 |
1424 def GetFriendlyName(self): |
2190 def GetFriendlyName(self): |
1425 """Returns a human-friendly name for this thread.""" |
2191 """Returns a human-friendly name for this thread.""" |
1426 return 'worker [%s]' % self.getName() |
2192 return 'worker [%s]' % self.getName() |
|
2193 |
|
2194 |
|
2195 class BulkLoaderThread(_BulkWorkerThread): |
|
2196 """A thread which transmits entities to the server application. |
|
2197 |
|
2198 This thread will read WorkItem instances from the work_queue and upload |
|
2199 the entities to the server application. Progress information will be |
|
2200 pushed into the progress_queue as the work is being performed. |
|
2201 |
|
2202 If a BulkLoaderThread encounters a transient error, the entities will be |
|
2203 resent, if a fatal error is encoutered the BulkLoaderThread exits. |
|
2204 """ |
|
2205 |
|
2206 def __init__(self, |
|
2207 work_queue, |
|
2208 throttle, |
|
2209 thread_gate, |
|
2210 request_manager, |
|
2211 num_threads, |
|
2212 batch_size, |
|
2213 get_time=time.time): |
|
2214 """Initialize the BulkLoaderThread instance. |
|
2215 |
|
2216 Args: |
|
2217 work_queue: A queue containing WorkItems for processing. |
|
2218 throttle: A Throttles to control upload bandwidth. |
|
2219 thread_gate: A ThreadGate to control number of simultaneous uploads. |
|
2220 request_manager: A RequestManager instance. |
|
2221 num_threads: The number of threads for parallel transfers. |
|
2222 batch_size: The number of entities to transfer per request. |
|
2223 get_time: Used for dependency injection. |
|
2224 """ |
|
2225 _BulkWorkerThread.__init__(self, |
|
2226 work_queue, |
|
2227 throttle, |
|
2228 thread_gate, |
|
2229 request_manager, |
|
2230 num_threads, |
|
2231 batch_size, |
|
2232 ImportStateMessage, |
|
2233 get_time) |
|
2234 |
|
2235 def PreProcessItem(self, item): |
|
2236 """Performs pre transfer processing on a work item.""" |
|
2237 if item and not item.content: |
|
2238 item.content = self.request_manager.EncodeContent(item.rows) |
|
2239 |
|
2240 def TransferItem(self, item): |
|
2241 """Transfers the entities associated with an item. |
|
2242 |
|
2243 Args: |
|
2244 item: An item of upload (WorkItem) work. |
|
2245 |
|
2246 Returns: |
|
2247 A tuple of (estimated transfer size, response) |
|
2248 """ |
|
2249 return self.request_manager.PostEntities(item) |
|
2250 |
|
2251 def ProcessResponse(self, item, response): |
|
2252 """Processes the response from the server application.""" |
|
2253 pass |
|
2254 |
|
2255 |
|
2256 class BulkExporterThread(_BulkWorkerThread): |
|
2257 """A thread which recieved entities to the server application. |
|
2258 |
|
2259 This thread will read KeyRange instances from the work_queue and export |
|
2260 the entities from the server application. Progress information will be |
|
2261 pushed into the progress_queue as the work is being performed. |
|
2262 |
|
2263 If a BulkExporterThread encounters an error when trying to post data, |
|
2264 the thread will exit and cause the application to terminate. |
|
2265 """ |
|
2266 |
|
2267 def __init__(self, |
|
2268 work_queue, |
|
2269 throttle, |
|
2270 thread_gate, |
|
2271 request_manager, |
|
2272 num_threads, |
|
2273 batch_size, |
|
2274 get_time=time.time): |
|
2275 |
|
2276 """Initialize the BulkExporterThread instance. |
|
2277 |
|
2278 Args: |
|
2279 work_queue: A queue containing KeyRanges for processing. |
|
2280 throttle: A Throttles to control upload bandwidth. |
|
2281 thread_gate: A ThreadGate to control number of simultaneous uploads. |
|
2282 request_manager: A RequestManager instance. |
|
2283 num_threads: The number of threads for parallel transfers. |
|
2284 batch_size: The number of entities to transfer per request. |
|
2285 get_time: Used for dependency injection. |
|
2286 """ |
|
2287 _BulkWorkerThread.__init__(self, |
|
2288 work_queue, |
|
2289 throttle, |
|
2290 thread_gate, |
|
2291 request_manager, |
|
2292 num_threads, |
|
2293 batch_size, |
|
2294 ExportStateMessage, |
|
2295 get_time) |
|
2296 |
|
2297 def PreProcessItem(self, unused_item): |
|
2298 """Performs pre transfer processing on a work item.""" |
|
2299 pass |
|
2300 |
|
2301 def TransferItem(self, item): |
|
2302 """Transfers the entities associated with an item. |
|
2303 |
|
2304 Args: |
|
2305 item: An item of download (KeyRange) work. |
|
2306 |
|
2307 Returns: |
|
2308 A tuple of (estimated transfer size, response) |
|
2309 """ |
|
2310 return self.request_manager.GetEntities(item) |
|
2311 |
|
2312 def ProcessResponse(self, item, export_result): |
|
2313 """Processes the response from the server application.""" |
|
2314 if export_result: |
|
2315 item.Process(export_result, self.num_threads, self.batch_size, |
|
2316 self.work_queue) |
1427 |
2317 |
1428 |
2318 |
1429 class DataSourceThread(_ThreadBase): |
2319 class DataSourceThread(_ThreadBase): |
1430 """A thread which reads WorkItems and pushes them into queue. |
2320 """A thread which reads WorkItems and pushes them into queue. |
1431 |
2321 |
1490 break |
2380 break |
1491 |
2381 |
1492 if not self.exit_flag: |
2382 if not self.exit_flag: |
1493 self.read_all = True |
2383 self.read_all = True |
1494 self.read_count = content_gen.row_count |
2384 self.read_count = content_gen.row_count |
1495 self.sent_count = content_gen.sent_count |
2385 self.xfer_count = content_gen.xfer_count |
1496 |
2386 |
1497 |
2387 |
1498 |
2388 |
1499 def _RunningInThread(thread): |
2389 def _RunningInThread(thread): |
1500 """Return True if we are running within the specified thread.""" |
2390 """Return True if we are running within the specified thread.""" |
1501 return threading.currentThread().getName() == thread.getName() |
2391 return threading.currentThread().getName() == thread.getName() |
1502 |
2392 |
1503 |
2393 |
1504 class ProgressDatabase(object): |
2394 class _Database(object): |
1505 """Persistently record all progress information during an upload. |
2395 """Base class for database connections in this module. |
1506 |
2396 |
1507 This class wraps a very simple SQLite database which records each of |
2397 The table is created by a primary thread (the python main thread) |
1508 the relevant details from the WorkItem instances. If the uploader is |
2398 but all future lookups and updates are performed by a secondary |
1509 resumed, then data is replayed out of the database. |
2399 thread. |
1510 """ |
2400 """ |
1511 |
2401 |
1512 def __init__(self, db_filename, commit_periodicity=100): |
2402 SIGNATURE_TABLE_NAME = 'bulkloader_database_signature' |
1513 """Initialize the ProgressDatabase instance. |
2403 |
1514 |
2404 def __init__(self, |
1515 Args: |
2405 db_filename, |
1516 db_filename: The name of the SQLite database to use. |
2406 create_table, |
1517 commit_periodicity: How many operations to perform between commits. |
2407 signature, |
|
2408 index=None, |
|
2409 commit_periodicity=100): |
|
2410 """Initialize the _Database instance. |
|
2411 |
|
2412 Args: |
|
2413 db_filename: The sqlite3 file to use for the database. |
|
2414 create_table: A string containing the SQL table creation command. |
|
2415 signature: A string identifying the important invocation options, |
|
2416 used to make sure we are not using an old database. |
|
2417 index: An optional string to create an index for the database. |
|
2418 commit_periodicity: Number of operations between database commits. |
1518 """ |
2419 """ |
1519 self.db_filename = db_filename |
2420 self.db_filename = db_filename |
1520 |
2421 |
1521 logging.info('Using progress database: %s', db_filename) |
2422 logger.info('Opening database: %s', db_filename) |
1522 self.primary_conn = sqlite3.connect(db_filename, isolation_level=None) |
2423 self.primary_conn = sqlite3.connect(db_filename, isolation_level=None) |
1523 self.primary_thread = threading.currentThread() |
2424 self.primary_thread = threading.currentThread() |
1524 |
2425 |
1525 self.progress_conn = None |
2426 self.secondary_conn = None |
1526 self.progress_thread = None |
2427 self.secondary_thread = None |
1527 |
2428 |
1528 self.operation_count = 0 |
2429 self.operation_count = 0 |
1529 self.commit_periodicity = commit_periodicity |
2430 self.commit_periodicity = commit_periodicity |
1530 |
2431 |
1531 self.prior_key_end = None |
|
1532 |
|
1533 try: |
2432 try: |
1534 self.primary_conn.execute( |
2433 self.primary_conn.execute(create_table) |
1535 """create table progress ( |
|
1536 id integer primary key autoincrement, |
|
1537 state integer not null, |
|
1538 key_start integer not null, |
|
1539 key_end integer not null |
|
1540 ) |
|
1541 """) |
|
1542 except sqlite3.OperationalError, e: |
2434 except sqlite3.OperationalError, e: |
1543 if 'already exists' not in e.message: |
2435 if 'already exists' not in e.message: |
1544 raise |
2436 raise |
1545 |
2437 |
|
2438 if index: |
|
2439 try: |
|
2440 self.primary_conn.execute(index) |
|
2441 except sqlite3.OperationalError, e: |
|
2442 if 'already exists' not in e.message: |
|
2443 raise |
|
2444 |
|
2445 self.existing_table = False |
|
2446 signature_cursor = self.primary_conn.cursor() |
|
2447 create_signature = """ |
|
2448 create table %s ( |
|
2449 value TEXT not null) |
|
2450 """ % _Database.SIGNATURE_TABLE_NAME |
1546 try: |
2451 try: |
1547 self.primary_conn.execute('create index i_state on progress (state)') |
2452 self.primary_conn.execute(create_signature) |
|
2453 self.primary_conn.cursor().execute( |
|
2454 'insert into %s (value) values (?)' % _Database.SIGNATURE_TABLE_NAME, |
|
2455 (signature,)) |
1548 except sqlite3.OperationalError, e: |
2456 except sqlite3.OperationalError, e: |
1549 if 'already exists' not in e.message: |
2457 if 'already exists' not in e.message: |
|
2458 logger.exception('Exception creating table:') |
1550 raise |
2459 raise |
|
2460 else: |
|
2461 self.existing_table = True |
|
2462 signature_cursor.execute( |
|
2463 'select * from %s' % _Database.SIGNATURE_TABLE_NAME) |
|
2464 (result,) = signature_cursor.fetchone() |
|
2465 if result and result != signature: |
|
2466 logger.error('Database signature mismatch:\n\n' |
|
2467 'Found:\n' |
|
2468 '%s\n\n' |
|
2469 'Expecting:\n' |
|
2470 '%s\n', |
|
2471 result, signature) |
|
2472 raise ResumeError('Database signature mismatch: %s != %s' % ( |
|
2473 signature, result)) |
1551 |
2474 |
1552 def ThreadComplete(self): |
2475 def ThreadComplete(self): |
1553 """Finalize any operations the progress thread has performed. |
2476 """Finalize any operations the secondary thread has performed. |
1554 |
2477 |
1555 The database aggregates lots of operations into a single commit, and |
2478 The database aggregates lots of operations into a single commit, and |
1556 this method is used to commit any pending operations as the thread |
2479 this method is used to commit any pending operations as the thread |
1557 is about to shut down. |
2480 is about to shut down. |
1558 """ |
2481 """ |
1559 if self.progress_conn: |
2482 if self.secondary_conn: |
1560 self._MaybeCommit(force_commit=True) |
2483 self._MaybeCommit(force_commit=True) |
1561 |
2484 |
1562 def _MaybeCommit(self, force_commit=False): |
2485 def _MaybeCommit(self, force_commit=False): |
1563 """Periodically commit changes into the SQLite database. |
2486 """Periodically commit changes into the SQLite database. |
1564 |
2487 |
1571 force_commit: Pass True in order for a commit to occur regardless |
2494 force_commit: Pass True in order for a commit to occur regardless |
1572 of the current operation count. |
2495 of the current operation count. |
1573 """ |
2496 """ |
1574 self.operation_count += 1 |
2497 self.operation_count += 1 |
1575 if force_commit or (self.operation_count % self.commit_periodicity) == 0: |
2498 if force_commit or (self.operation_count % self.commit_periodicity) == 0: |
1576 self.progress_conn.commit() |
2499 self.secondary_conn.commit() |
1577 |
2500 |
1578 def _OpenProgressConnection(self): |
2501 def _OpenSecondaryConnection(self): |
1579 """Possibly open a database connection for the progress tracker thread. |
2502 """Possibly open a database connection for the secondary thread. |
1580 |
2503 |
1581 If the connection is not open (for the calling thread, which is assumed |
2504 If the connection is not open (for the calling thread, which is assumed |
1582 to be the progress tracker thread), then open it. We also open a couple |
2505 to be the unique secondary thread), then open it. We also open a couple |
1583 cursors for later use (and reuse). |
2506 cursors for later use (and reuse). |
1584 """ |
2507 """ |
1585 if self.progress_conn: |
2508 if self.secondary_conn: |
1586 return |
2509 return |
1587 |
2510 |
1588 assert not _RunningInThread(self.primary_thread) |
2511 assert not _RunningInThread(self.primary_thread) |
1589 |
2512 |
1590 self.progress_thread = threading.currentThread() |
2513 self.secondary_thread = threading.currentThread() |
1591 |
2514 |
1592 self.progress_conn = sqlite3.connect(self.db_filename) |
2515 self.secondary_conn = sqlite3.connect(self.db_filename) |
1593 |
2516 |
1594 self.insert_cursor = self.progress_conn.cursor() |
2517 self.insert_cursor = self.secondary_conn.cursor() |
1595 self.update_cursor = self.progress_conn.cursor() |
2518 self.update_cursor = self.secondary_conn.cursor() |
1596 |
2519 |
1597 def HasUnfinishedWork(self): |
2520 |
|
2521 class ResultDatabase(_Database): |
|
2522 """Persistently record all the entities downloaded during an export. |
|
2523 |
|
2524 The entities are held in the database by their unique datastore key |
|
2525 in order to avoid duplication if an export is restarted. |
|
2526 """ |
|
2527 |
|
2528 def __init__(self, db_filename, signature, commit_periodicity=1): |
|
2529 """Initialize a ResultDatabase object. |
|
2530 |
|
2531 Args: |
|
2532 db_filename: The name of the SQLite database to use. |
|
2533 signature: A string identifying the important invocation options, |
|
2534 used to make sure we are not using an old database. |
|
2535 commit_periodicity: How many operations to perform between commits. |
|
2536 """ |
|
2537 self.complete = False |
|
2538 create_table = ('create table result (\n' |
|
2539 'id TEXT primary key,\n' |
|
2540 'value BLOB not null)') |
|
2541 |
|
2542 _Database.__init__(self, |
|
2543 db_filename, |
|
2544 create_table, |
|
2545 signature, |
|
2546 commit_periodicity=commit_periodicity) |
|
2547 if self.existing_table: |
|
2548 cursor = self.primary_conn.cursor() |
|
2549 cursor.execute('select count(*) from result') |
|
2550 self.existing_count = int(cursor.fetchone()[0]) |
|
2551 else: |
|
2552 self.existing_count = 0 |
|
2553 self.count = self.existing_count |
|
2554 |
|
2555 def _StoreEntity(self, entity_id, value): |
|
2556 """Store an entity in the result database. |
|
2557 |
|
2558 Args: |
|
2559 entity_id: A db.Key for the entity. |
|
2560 value: A string of the contents of the entity. |
|
2561 |
|
2562 Returns: |
|
2563 True if this entities is not already present in the result database. |
|
2564 """ |
|
2565 |
|
2566 assert _RunningInThread(self.secondary_thread) |
|
2567 assert isinstance(entity_id, db.Key) |
|
2568 |
|
2569 entity_id = entity_id.id_or_name() |
|
2570 self.insert_cursor.execute( |
|
2571 'select count(*) from result where id = ?', (unicode(entity_id),)) |
|
2572 already_present = self.insert_cursor.fetchone()[0] |
|
2573 result = True |
|
2574 if already_present: |
|
2575 result = False |
|
2576 self.insert_cursor.execute('delete from result where id = ?', |
|
2577 (unicode(entity_id),)) |
|
2578 else: |
|
2579 self.count += 1 |
|
2580 self.insert_cursor.execute( |
|
2581 'insert into result (id, value) values (?, ?)', |
|
2582 (unicode(entity_id), buffer(value))) |
|
2583 return result |
|
2584 |
|
2585 def StoreEntities(self, keys, entities): |
|
2586 """Store a group of entities in the result database. |
|
2587 |
|
2588 Args: |
|
2589 keys: A list of entity keys. |
|
2590 entities: A list of entities. |
|
2591 |
|
2592 Returns: |
|
2593 The number of new entities stored in the result database. |
|
2594 """ |
|
2595 self._OpenSecondaryConnection() |
|
2596 t = time.time() |
|
2597 count = 0 |
|
2598 for entity_id, value in zip(keys, |
|
2599 entities): |
|
2600 if self._StoreEntity(entity_id, value): |
|
2601 count += 1 |
|
2602 logger.debug('%s insert: delta=%.3f', |
|
2603 self.db_filename, |
|
2604 time.time() - t) |
|
2605 logger.debug('Entities transferred total: %s', self.count) |
|
2606 self._MaybeCommit() |
|
2607 return count |
|
2608 |
|
2609 def ResultsComplete(self): |
|
2610 """Marks the result database as containing complete results.""" |
|
2611 self.complete = True |
|
2612 |
|
2613 def AllEntities(self): |
|
2614 """Yields all pairs of (id, value) from the result table.""" |
|
2615 conn = sqlite3.connect(self.db_filename, isolation_level=None) |
|
2616 cursor = conn.cursor() |
|
2617 |
|
2618 cursor.execute( |
|
2619 'select id, value from result order by id') |
|
2620 |
|
2621 for unused_entity_id, entity in cursor: |
|
2622 yield cPickle.loads(str(entity)) |
|
2623 |
|
2624 |
|
2625 class _ProgressDatabase(_Database): |
|
2626 """Persistently record all progress information during an upload. |
|
2627 |
|
2628 This class wraps a very simple SQLite database which records each of |
|
2629 the relevant details from a chunk of work. If the loader is |
|
2630 resumed, then data is replayed out of the database. |
|
2631 """ |
|
2632 |
|
2633 def __init__(self, |
|
2634 db_filename, |
|
2635 sql_type, |
|
2636 py_type, |
|
2637 signature, |
|
2638 commit_periodicity=100): |
|
2639 """Initialize the ProgressDatabase instance. |
|
2640 |
|
2641 Args: |
|
2642 db_filename: The name of the SQLite database to use. |
|
2643 sql_type: A string of the SQL type to use for entity keys. |
|
2644 py_type: The python type of entity keys. |
|
2645 signature: A string identifying the important invocation options, |
|
2646 used to make sure we are not using an old database. |
|
2647 commit_periodicity: How many operations to perform between commits. |
|
2648 """ |
|
2649 self.prior_key_end = None |
|
2650 |
|
2651 create_table = ('create table progress (\n' |
|
2652 'id integer primary key autoincrement,\n' |
|
2653 'state integer not null,\n' |
|
2654 'key_start %s,\n' |
|
2655 'key_end %s)' |
|
2656 % (sql_type, sql_type)) |
|
2657 self.py_type = py_type |
|
2658 |
|
2659 index = 'create index i_state on progress (state)' |
|
2660 _Database.__init__(self, |
|
2661 db_filename, |
|
2662 create_table, |
|
2663 signature, |
|
2664 index=index, |
|
2665 commit_periodicity=commit_periodicity) |
|
2666 |
|
2667 def UseProgressData(self): |
1598 """Returns True if the database has progress information. |
2668 """Returns True if the database has progress information. |
1599 |
2669 |
1600 Note there are two basic cases for progress information: |
2670 Note there are two basic cases for progress information: |
1601 1) All saved records indicate a successful upload. In this case, we |
2671 1) All saved records indicate a successful upload. In this case, we |
1602 need to skip everything transmitted so far and then send the rest. |
2672 need to skip everything transmitted so far and then send the rest. |
1603 2) Some records for incomplete transfer are present. These need to be |
2673 2) Some records for incomplete transfer are present. These need to be |
1604 sent again, and then we resume sending after all the successful |
2674 sent again, and then we resume sending after all the successful |
1605 data. |
2675 data. |
1606 |
2676 |
1607 Returns: |
2677 Returns: |
1608 True if the database has progress information, False otherwise. |
2678 True: if the database has progress information. |
1609 |
2679 |
1610 Raises: |
2680 Raises: |
1611 ResumeError: If there is an error reading the progress database. |
2681 ResumeError: if there is an error retrieving rows from the database. |
1612 """ |
2682 """ |
1613 assert _RunningInThread(self.primary_thread) |
2683 assert _RunningInThread(self.primary_thread) |
1614 |
2684 |
1615 cursor = self.primary_conn.cursor() |
2685 cursor = self.primary_conn.cursor() |
1616 cursor.execute('select count(*) from progress') |
2686 cursor.execute('select count(*) from progress') |
1617 row = cursor.fetchone() |
2687 row = cursor.fetchone() |
1618 if row is None: |
2688 if row is None: |
1619 raise ResumeError('Error reading progress information.') |
2689 raise ResumeError('Cannot retrieve progress information from database.') |
1620 |
2690 |
1621 return row[0] != 0 |
2691 return row[0] != 0 |
1622 |
2692 |
1623 def StoreKeys(self, key_start, key_end): |
2693 def StoreKeys(self, key_start, key_end): |
1624 """Record a new progress record, returning a key for later updates. |
2694 """Record a new progress record, returning a key for later updates. |
1793 |
2921 |
1794 if item.state == STATE_READ and item.progress_key is None: |
2922 if item.state == STATE_READ and item.progress_key is None: |
1795 item.progress_key = self.db.StoreKeys(item.key_start, item.key_end) |
2923 item.progress_key = self.db.StoreKeys(item.key_start, item.key_end) |
1796 else: |
2924 else: |
1797 assert item.progress_key is not None |
2925 assert item.progress_key is not None |
1798 |
2926 self.UpdateProgress(item) |
1799 self.db.UpdateState(item.progress_key, item.state) |
|
1800 if item.state == STATE_SENT: |
|
1801 self.entities_sent += item.count |
|
1802 |
2927 |
1803 item.progress_event.set() |
2928 item.progress_event.set() |
1804 |
2929 |
1805 self.progress_queue.task_done() |
2930 self.progress_queue.task_done() |
1806 |
2931 |
1807 self.db.ThreadComplete() |
2932 self.db.ThreadComplete() |
1808 |
2933 |
|
2934 |
|
2935 |
|
2936 class ProgressTrackerThread(_ProgressThreadBase): |
|
2937 """A thread which records progress information for the upload process. |
|
2938 |
|
2939 The progress information is stored into the provided progress database. |
|
2940 This class is not responsible for replaying a prior run's progress |
|
2941 information out of the database. Separate mechanisms must be used to |
|
2942 resume a prior upload attempt. |
|
2943 """ |
|
2944 NAME = 'progress tracking thread' |
|
2945 |
|
2946 def __init__(self, progress_queue, progress_db): |
|
2947 """Initialize the ProgressTrackerThread instance. |
|
2948 |
|
2949 Args: |
|
2950 progress_queue: A Queue used for tracking progress information. |
|
2951 progress_db: The database for tracking progress information; should |
|
2952 be an instance of ProgressDatabase. |
|
2953 """ |
|
2954 _ProgressThreadBase.__init__(self, progress_queue, progress_db) |
|
2955 |
|
2956 def UpdateProgress(self, item): |
|
2957 """Update the state of the given WorkItem. |
|
2958 |
|
2959 Args: |
|
2960 item: A WorkItem instance. |
|
2961 """ |
|
2962 self.db.UpdateState(item.progress_key, item.state) |
|
2963 if item.state == STATE_SENT: |
|
2964 self.entities_transferred += item.count |
|
2965 |
|
2966 def WorkFinished(self): |
|
2967 """Performs final actions after the entity transfer is complete.""" |
|
2968 pass |
|
2969 |
|
2970 |
|
2971 class ExportProgressThread(_ProgressThreadBase): |
|
2972 """A thread to record progress information and write record data for exports. |
|
2973 |
|
2974 The progress information is stored into a provided progress database. |
|
2975 Exported results are stored in the result database and dumped to an output |
|
2976 file at the end of the download. |
|
2977 """ |
|
2978 |
|
2979 def __init__(self, kind, progress_queue, progress_db, result_db): |
|
2980 """Initialize the ExportProgressThread instance. |
|
2981 |
|
2982 Args: |
|
2983 kind: The kind of entities being stored in the database. |
|
2984 progress_queue: A Queue used for tracking progress information. |
|
2985 progress_db: The database for tracking progress information; should |
|
2986 be an instance of ProgressDatabase. |
|
2987 result_db: The database for holding exported entities; should be an |
|
2988 instance of ResultDatabase. |
|
2989 """ |
|
2990 _ProgressThreadBase.__init__(self, progress_queue, progress_db) |
|
2991 |
|
2992 self.kind = kind |
|
2993 self.existing_count = result_db.existing_count |
|
2994 self.result_db = result_db |
|
2995 |
|
2996 def EntitiesTransferred(self): |
|
2997 """Return the total number of unique entities transferred.""" |
|
2998 return self.result_db.count |
|
2999 |
|
3000 def WorkFinished(self): |
|
3001 """Write the contents of the result database.""" |
|
3002 exporter = Exporter.RegisteredExporter(self.kind) |
|
3003 exporter.output_entities(self.result_db.AllEntities()) |
|
3004 |
|
3005 def UpdateProgress(self, item): |
|
3006 """Update the state of the given KeyRange. |
|
3007 |
|
3008 Args: |
|
3009 item: A KeyRange instance. |
|
3010 """ |
|
3011 if item.state == STATE_GOT: |
|
3012 count = self.result_db.StoreEntities(item.export_result.keys, |
|
3013 item.export_result.entities) |
|
3014 self.db.DeleteKey(item.progress_key) |
|
3015 self.entities_transferred += count |
|
3016 else: |
|
3017 self.db.UpdateState(item.progress_key, item.state) |
|
3018 |
|
3019 |
|
3020 def ParseKey(key_string): |
|
3021 """Turn a key stored in the database into a db.Key or None. |
|
3022 |
|
3023 Args: |
|
3024 key_string: The string representation of a db.Key. |
|
3025 |
|
3026 Returns: |
|
3027 A db.Key instance or None |
|
3028 """ |
|
3029 if not key_string: |
|
3030 return None |
|
3031 if key_string == 'None': |
|
3032 return None |
|
3033 return db.Key(encoded=key_string) |
1809 |
3034 |
1810 |
3035 |
1811 def Validate(value, typ): |
3036 def Validate(value, typ): |
1812 """Checks that value is non-empty and of the right type. |
3037 """Checks that value is non-empty and of the right type. |
1813 |
3038 |
1814 Args: |
3039 Args: |
1815 value: any value |
3040 value: any value |
1816 typ: a type or tuple of types |
3041 typ: a type or tuple of types |
1817 |
3042 |
1818 Raises: |
3043 Raises: |
1819 ValueError if value is None or empty. |
3044 ValueError: if value is None or empty. |
1820 TypeError if it's not the given type. |
3045 TypeError: if it's not the given type. |
1821 |
|
1822 """ |
3046 """ |
1823 if not value: |
3047 if not value: |
1824 raise ValueError('Value should not be empty; received %s.' % value) |
3048 raise ValueError('Value should not be empty; received %s.' % value) |
1825 elif not isinstance(value, typ): |
3049 elif not isinstance(value, typ): |
1826 raise TypeError('Expected a %s, but received %s (a %s).' % |
3050 raise TypeError('Expected a %s, but received %s (a %s).' % |
1827 (typ, value, value.__class__)) |
3051 (typ, value, value.__class__)) |
1828 |
3052 |
1829 |
3053 |
|
3054 def CheckFile(filename): |
|
3055 """Check that the given file exists and can be opened for reading. |
|
3056 |
|
3057 Args: |
|
3058 filename: The name of the file. |
|
3059 |
|
3060 Raises: |
|
3061 FileNotFoundError: if the given filename is not found |
|
3062 FileNotReadableError: if the given filename is not readable. |
|
3063 """ |
|
3064 if not os.path.exists(filename): |
|
3065 raise FileNotFoundError('%s: file not found' % filename) |
|
3066 elif not os.access(filename, os.R_OK): |
|
3067 raise FileNotReadableError('%s: file not readable' % filename) |
|
3068 |
|
3069 |
1830 class Loader(object): |
3070 class Loader(object): |
1831 """A base class for creating datastore entities from input data. |
3071 """A base class for creating datastore entities from input data. |
1832 |
3072 |
1833 To add a handler for bulk loading a new entity kind into your datastore, |
3073 To add a handler for bulk loading a new entity kind into your datastore, |
1834 write a subclass of this class that calls Loader.__init__ from your |
3074 write a subclass of this class that calls Loader.__init__ from your |
1835 class's __init__. |
3075 class's __init__. |
1836 |
3076 |
1837 If you need to run extra code to convert entities from the input |
3077 If you need to run extra code to convert entities from the input |
1838 data, create new properties, or otherwise modify the entities before |
3078 data, create new properties, or otherwise modify the entities before |
1839 they're inserted, override HandleEntity. |
3079 they're inserted, override handle_entity. |
1840 |
3080 |
1841 See the CreateEntity method for the creation of entities from the |
3081 See the create_entity method for the creation of entities from the |
1842 (parsed) input data. |
3082 (parsed) input data. |
1843 """ |
3083 """ |
1844 |
3084 |
1845 __loaders = {} |
3085 __loaders = {} |
1846 __kind = None |
3086 kind = None |
1847 __properties = None |
3087 __properties = None |
1848 |
3088 |
1849 def __init__(self, kind, properties): |
3089 def __init__(self, kind, properties): |
1850 """Constructor. |
3090 """Constructor. |
1851 |
3091 |
1888 self.__properties = properties |
3130 self.__properties = properties |
1889 |
3131 |
1890 @staticmethod |
3132 @staticmethod |
1891 def RegisterLoader(loader): |
3133 def RegisterLoader(loader): |
1892 |
3134 |
1893 Loader.__loaders[loader.__kind] = loader |
3135 Loader.__loaders[loader.kind] = loader |
1894 |
3136 |
1895 def kind(self): |
3137 def alias_old_names(self): |
1896 """ Return the entity kind that this Loader handes. |
3138 """Aliases method names so that Loaders defined with old names work.""" |
1897 """ |
3139 aliases = ( |
1898 return self.__kind |
3140 ('CreateEntity', 'create_entity'), |
1899 |
3141 ('HandleEntity', 'handle_entity'), |
1900 def CreateEntity(self, values, key_name=None): |
3142 ('GenerateKey', 'generate_key'), |
|
3143 ) |
|
3144 for old_name, new_name in aliases: |
|
3145 setattr(Loader, old_name, getattr(Loader, new_name)) |
|
3146 if hasattr(self.__class__, old_name) and not ( |
|
3147 getattr(self.__class__, old_name).im_func == |
|
3148 getattr(Loader, new_name).im_func): |
|
3149 if hasattr(self.__class__, new_name) and not ( |
|
3150 getattr(self.__class__, new_name).im_func == |
|
3151 getattr(Loader, new_name).im_func): |
|
3152 raise NameClashError(old_name, new_name, self.__class__) |
|
3153 setattr(self, new_name, getattr(self, old_name)) |
|
3154 |
|
3155 def create_entity(self, values, key_name=None, parent=None): |
1901 """Creates a entity from a list of property values. |
3156 """Creates a entity from a list of property values. |
1902 |
3157 |
1903 Args: |
3158 Args: |
1904 values: list/tuple of str |
3159 values: list/tuple of str |
1905 key_name: if provided, the name for the (single) resulting entity |
3160 key_name: if provided, the name for the (single) resulting entity |
|
3161 parent: A db.Key instance for the parent, or None |
1906 |
3162 |
1907 Returns: |
3163 Returns: |
1908 list of db.Model |
3164 list of db.Model |
1909 |
3165 |
1910 The returned entities are populated with the property values from the |
3166 The returned entities are populated with the property values from the |
1911 argument, converted to native types using the properties map given in |
3167 argument, converted to native types using the properties map given in |
1912 the constructor, and passed through HandleEntity. They're ready to be |
3168 the constructor, and passed through handle_entity. They're ready to be |
1913 inserted. |
3169 inserted. |
1914 |
3170 |
1915 Raises: |
3171 Raises: |
1916 AssertionError if the number of values doesn't match the number |
3172 AssertionError: if the number of values doesn't match the number |
1917 of properties in the properties map. |
3173 of properties in the properties map. |
1918 ValueError if any element of values is None or empty. |
3174 ValueError: if any element of values is None or empty. |
1919 TypeError if values is not a list or tuple. |
3175 TypeError: if values is not a list or tuple. |
1920 """ |
3176 """ |
1921 Validate(values, (list, tuple)) |
3177 Validate(values, (list, tuple)) |
1922 assert len(values) == len(self.__properties), ( |
3178 assert len(values) == len(self.__properties), ( |
1923 'Expected %d CSV columns, found %d.' % |
3179 'Expected %d columns, found %d.' % |
1924 (len(self.__properties), len(values))) |
3180 (len(self.__properties), len(values))) |
1925 |
3181 |
1926 model_class = db.class_for_kind(self.__kind) |
3182 model_class = GetImplementationClass(self.kind) |
1927 |
3183 |
1928 properties = {'key_name': key_name} |
3184 properties = { |
|
3185 'key_name': key_name, |
|
3186 'parent': parent, |
|
3187 } |
1929 for (name, converter), val in zip(self.__properties, values): |
3188 for (name, converter), val in zip(self.__properties, values): |
1930 if converter is bool and val.lower() in ('0', 'false', 'no'): |
3189 if converter is bool and val.lower() in ('0', 'false', 'no'): |
1931 val = False |
3190 val = False |
1932 properties[name] = converter(val) |
3191 properties[name] = converter(val) |
1933 |
3192 |
1934 entity = model_class(**properties) |
3193 entity = model_class(**properties) |
1935 entities = self.HandleEntity(entity) |
3194 entities = self.handle_entity(entity) |
1936 |
3195 |
1937 if entities: |
3196 if entities: |
1938 if not isinstance(entities, (list, tuple)): |
3197 if not isinstance(entities, (list, tuple)): |
1939 entities = [entities] |
3198 entities = [entities] |
1940 |
3199 |
1970 Returns: |
3230 Returns: |
1971 A string to be used as the key_name for an entity. |
3231 A string to be used as the key_name for an entity. |
1972 """ |
3232 """ |
1973 return None |
3233 return None |
1974 |
3234 |
1975 def HandleEntity(self, entity): |
3235 def handle_entity(self, entity): |
1976 """Subclasses can override this to add custom entity conversion code. |
3236 """Subclasses can override this to add custom entity conversion code. |
1977 |
3237 |
1978 This is called for each entity, after its properties are populated from |
3238 This is called for each entity, after its properties are populated |
1979 CSV but before it is stored. Subclasses can override this to add custom |
3239 from the input but before it is stored. Subclasses can override |
1980 entity handling code. |
3240 this to add custom entity handling code. |
1981 |
3241 |
1982 The entity to be inserted should be returned. If multiple entities should |
3242 The entity to be inserted should be returned. If multiple entities |
1983 be inserted, return a list of entities. If no entities should be inserted, |
3243 should be inserted, return a list of entities. If no entities |
1984 return None or []. |
3244 should be inserted, return None or []. |
1985 |
3245 |
1986 Args: |
3246 Args: |
1987 entity: db.Model |
3247 entity: db.Model |
1988 |
3248 |
1989 Returns: |
3249 Returns: |
1990 db.Model or list of db.Model |
3250 db.Model or list of db.Model |
1991 """ |
3251 """ |
1992 return entity |
3252 return entity |
1993 |
3253 |
|
3254 def initialize(self, filename, loader_opts): |
|
3255 """Performs initialization and validation of the input file. |
|
3256 |
|
3257 This implementation checks that the input file exists and can be |
|
3258 opened for reading. |
|
3259 |
|
3260 Args: |
|
3261 filename: The string given as the --filename flag argument. |
|
3262 loader_opts: The string given as the --loader_opts flag argument. |
|
3263 """ |
|
3264 CheckFile(filename) |
|
3265 |
|
3266 def finalize(self): |
|
3267 """Performs finalization actions after the upload completes.""" |
|
3268 pass |
|
3269 |
|
3270 def generate_records(self, filename): |
|
3271 """Subclasses can override this to add custom data input code. |
|
3272 |
|
3273 This method must yield fixed-length lists of strings. |
|
3274 |
|
3275 The default implementation uses csv.reader to read CSV rows |
|
3276 from filename. |
|
3277 |
|
3278 Args: |
|
3279 filename: The string input for the --filename option. |
|
3280 |
|
3281 Yields: |
|
3282 Lists of strings. |
|
3283 """ |
|
3284 csv_generator = CSVGenerator(filename, openfile=self.__openfile, |
|
3285 create_csv_reader=self.__create_csv_reader |
|
3286 ).Records() |
|
3287 return csv_generator |
1994 |
3288 |
1995 @staticmethod |
3289 @staticmethod |
1996 def RegisteredLoaders(): |
3290 def RegisteredLoaders(): |
1997 """Returns a list of the Loader instances that have been created. |
3291 """Returns a dict of the Loader instances that have been created.""" |
1998 """ |
|
1999 return dict(Loader.__loaders) |
3292 return dict(Loader.__loaders) |
|
3293 |
|
3294 @staticmethod |
|
3295 def RegisteredLoader(kind): |
|
3296 """Returns the loader instance for the given kind if it exists.""" |
|
3297 return Loader.__loaders[kind] |
|
3298 |
|
3299 |
|
3300 class Exporter(object): |
|
3301 """A base class for serializing datastore entities. |
|
3302 |
|
3303 To add a handler for exporting an entity kind from your datastore, |
|
3304 write a subclass of this class that calls Exporter.__init__ from your |
|
3305 class's __init__. |
|
3306 |
|
3307 If you need to run extra code to convert entities from the input |
|
3308 data, create new properties, or otherwise modify the entities before |
|
3309 they're inserted, override handle_entity. |
|
3310 |
|
3311 See the output_entities method for the writing of data from entities. |
|
3312 """ |
|
3313 |
|
3314 __exporters = {} |
|
3315 kind = None |
|
3316 __properties = None |
|
3317 |
|
3318 def __init__(self, kind, properties): |
|
3319 """Constructor. |
|
3320 |
|
3321 Populates this Exporters's kind and properties map. Also registers |
|
3322 it so that all you need to do is instantiate your Exporter, and |
|
3323 the bulkload handler will automatically use it. |
|
3324 |
|
3325 Args: |
|
3326 kind: a string containing the entity kind that this exporter handles |
|
3327 |
|
3328 properties: list of (name, converter, default) tuples. |
|
3329 |
|
3330 This is used to automatically convert the entities to strings. |
|
3331 The converter should be a function that takes one argument, a property |
|
3332 value of the appropriate type, and returns a str or unicode. The default |
|
3333 is a string to be used if the property is not present, or None to fail |
|
3334 with an error if the property is missing. |
|
3335 |
|
3336 For example: |
|
3337 [('name', str, None), |
|
3338 ('id_number', str, None), |
|
3339 ('email', str, ''), |
|
3340 ('user', str, None), |
|
3341 ('birthdate', |
|
3342 lambda x: str(datetime.datetime.fromtimestamp(float(x))), |
|
3343 None), |
|
3344 ('description', str, ''), |
|
3345 ] |
|
3346 """ |
|
3347 Validate(kind, basestring) |
|
3348 self.kind = kind |
|
3349 |
|
3350 GetImplementationClass(kind) |
|
3351 |
|
3352 Validate(properties, list) |
|
3353 for name, fn, default in properties: |
|
3354 Validate(name, basestring) |
|
3355 assert callable(fn), ( |
|
3356 'Conversion function %s for property %s is not callable.' % ( |
|
3357 fn, name)) |
|
3358 if default: |
|
3359 Validate(default, basestring) |
|
3360 |
|
3361 self.__properties = properties |
|
3362 |
|
3363 @staticmethod |
|
3364 def RegisterExporter(exporter): |
|
3365 |
|
3366 Exporter.__exporters[exporter.kind] = exporter |
|
3367 |
|
3368 def __ExtractProperties(self, entity): |
|
3369 """Converts an entity into a list of string values. |
|
3370 |
|
3371 Args: |
|
3372 entity: An entity to extract the properties from. |
|
3373 |
|
3374 Returns: |
|
3375 A list of the properties of the entity. |
|
3376 |
|
3377 Raises: |
|
3378 MissingPropertyError: if an expected field on the entity is missing. |
|
3379 """ |
|
3380 encoding = [] |
|
3381 for name, fn, default in self.__properties: |
|
3382 try: |
|
3383 encoding.append(fn(getattr(entity, name))) |
|
3384 except AttributeError: |
|
3385 if default is None: |
|
3386 raise MissingPropertyError(name) |
|
3387 else: |
|
3388 encoding.append(default) |
|
3389 return encoding |
|
3390 |
|
3391 def __EncodeEntity(self, entity): |
|
3392 """Convert the given entity into CSV string. |
|
3393 |
|
3394 Args: |
|
3395 entity: The entity to encode. |
|
3396 |
|
3397 Returns: |
|
3398 A CSV string. |
|
3399 """ |
|
3400 output = StringIO.StringIO() |
|
3401 writer = csv.writer(output, lineterminator='') |
|
3402 writer.writerow(self.__ExtractProperties(entity)) |
|
3403 return output.getvalue() |
|
3404 |
|
3405 def __SerializeEntity(self, entity): |
|
3406 """Creates a string representation of an entity. |
|
3407 |
|
3408 Args: |
|
3409 entity: The entity to serialize. |
|
3410 |
|
3411 Returns: |
|
3412 A serialized representation of an entity. |
|
3413 """ |
|
3414 encoding = self.__EncodeEntity(entity) |
|
3415 if not isinstance(encoding, unicode): |
|
3416 encoding = unicode(encoding, 'utf-8') |
|
3417 encoding = encoding.encode('utf-8') |
|
3418 return encoding |
|
3419 |
|
3420 def output_entities(self, entity_generator): |
|
3421 """Outputs the downloaded entities. |
|
3422 |
|
3423 This implementation writes CSV. |
|
3424 |
|
3425 Args: |
|
3426 entity_generator: A generator that yields the downloaded entities |
|
3427 in key order. |
|
3428 """ |
|
3429 CheckOutputFile(self.output_filename) |
|
3430 output_file = open(self.output_filename, 'w') |
|
3431 logger.debug('Export complete, writing to file') |
|
3432 output_file.writelines(self.__SerializeEntity(entity) + '\n' |
|
3433 for entity in entity_generator) |
|
3434 |
|
3435 def initialize(self, filename, exporter_opts): |
|
3436 """Performs initialization and validation of the output file. |
|
3437 |
|
3438 This implementation checks that the input file exists and can be |
|
3439 opened for writing. |
|
3440 |
|
3441 Args: |
|
3442 filename: The string given as the --filename flag argument. |
|
3443 exporter_opts: The string given as the --exporter_opts flag argument. |
|
3444 """ |
|
3445 CheckOutputFile(filename) |
|
3446 self.output_filename = filename |
|
3447 |
|
3448 def finalize(self): |
|
3449 """Performs finalization actions after the download completes.""" |
|
3450 pass |
|
3451 |
|
3452 @staticmethod |
|
3453 def RegisteredExporters(): |
|
3454 """Returns a dictionary of the exporter instances that have been created.""" |
|
3455 return dict(Exporter.__exporters) |
|
3456 |
|
3457 @staticmethod |
|
3458 def RegisteredExporter(kind): |
|
3459 """Returns an exporter instance for the given kind if it exists.""" |
|
3460 return Exporter.__exporters[kind] |
2000 |
3461 |
2001 |
3462 |
2002 class QueueJoinThread(threading.Thread): |
3463 class QueueJoinThread(threading.Thread): |
2003 """A thread that joins a queue and exits. |
3464 """A thread that joins a queue and exits. |
2004 |
3465 |
2070 for unused_thread in thread_gate.Threads(): |
3531 for unused_thread in thread_gate.Threads(): |
2071 thread_gate.EnableThread() |
3532 thread_gate.EnableThread() |
2072 |
3533 |
2073 data_source_thread.join(timeout=3.0) |
3534 data_source_thread.join(timeout=3.0) |
2074 if data_source_thread.isAlive(): |
3535 if data_source_thread.isAlive(): |
2075 logging.warn('%s hung while trying to exit', |
3536 logger.warn('%s hung while trying to exit', |
2076 data_source_thread.GetFriendlyName()) |
3537 data_source_thread.GetFriendlyName()) |
2077 |
3538 |
2078 while not work_queue.empty(): |
3539 while not work_queue.empty(): |
2079 try: |
3540 try: |
2080 unused_item = work_queue.get_nowait() |
3541 unused_item = work_queue.get_nowait() |
2081 work_queue.task_done() |
3542 work_queue.task_done() |
2082 except Queue.Empty: |
3543 except Queue.Empty: |
2083 pass |
3544 pass |
2084 |
3545 |
2085 |
3546 |
2086 def PerformBulkUpload(app_id, |
3547 class BulkTransporterApp(object): |
2087 post_url, |
3548 """Class to wrap bulk transport application functionality.""" |
2088 kind, |
3549 |
2089 workitem_generator_factory, |
3550 def __init__(self, |
2090 num_threads, |
3551 arg_dict, |
2091 throttle, |
3552 input_generator_factory, |
2092 progress_db, |
3553 throttle, |
2093 max_queue_size=DEFAULT_QUEUE_SIZE, |
3554 progress_db, |
2094 request_manager_factory=RequestManager, |
3555 workerthread_factory, |
2095 bulkloaderthread_factory=BulkLoaderThread, |
3556 progresstrackerthread_factory, |
2096 progresstrackerthread_factory=ProgressTrackerThread, |
3557 max_queue_size=DEFAULT_QUEUE_SIZE, |
2097 datasourcethread_factory=DataSourceThread, |
3558 request_manager_factory=RequestManager, |
2098 work_queue_factory=ReQueue, |
3559 datasourcethread_factory=DataSourceThread, |
2099 progress_queue_factory=Queue.Queue): |
3560 work_queue_factory=ReQueue, |
2100 """Uploads data into an application using a series of HTTP POSTs. |
3561 progress_queue_factory=Queue.Queue): |
2101 |
3562 """Instantiate a BulkTransporterApp. |
2102 This function will spin up a number of threads to read entities from |
3563 |
2103 the data source, pass those to a number of worker ("uploader") threads |
3564 Uploads or downloads data to or from application using HTTP requests. |
2104 for sending to the application, and track all of the progress in a |
3565 When run, the class will spin up a number of threads to read entities |
2105 small database in case an error or pause/termination requires a |
3566 from the data source, pass those to a number of worker threads |
2106 restart/resumption of the upload process. |
3567 for sending to the application, and track all of the progress in a |
2107 |
3568 small database in case an error or pause/termination requires a |
2108 Args: |
3569 restart/resumption of the upload process. |
2109 app_id: String containing application id. |
3570 |
2110 post_url: URL to post the Entity data to. |
3571 Args: |
2111 kind: Kind of the Entity records being posted. |
3572 arg_dict: Dictionary of command line options. |
2112 workitem_generator_factory: A factory that creates a WorkItem generator. |
3573 input_generator_factory: A factory that creates a WorkItem generator. |
2113 num_threads: How many uploader threads should be created. |
3574 throttle: A Throttle instance. |
2114 throttle: A Throttle instance. |
3575 progress_db: The database to use for replaying/recording progress. |
2115 progress_db: The database to use for replaying/recording progress. |
3576 workerthread_factory: A factory for worker threads. |
2116 max_queue_size: Maximum size of the queues before they should block. |
3577 progresstrackerthread_factory: Used for dependency injection. |
2117 request_manager_factory: Used for dependency injection. |
3578 max_queue_size: Maximum size of the queues before they should block. |
2118 bulkloaderthread_factory: Used for dependency injection. |
3579 request_manager_factory: Used for dependency injection. |
2119 progresstrackerthread_factory: Used for dependency injection. |
3580 datasourcethread_factory: Used for dependency injection. |
2120 datasourcethread_factory: Used for dependency injection. |
3581 work_queue_factory: Used for dependency injection. |
2121 work_queue_factory: Used for dependency injection. |
3582 progress_queue_factory: Used for dependency injection. |
2122 progress_queue_factory: Used for dependency injection. |
3583 """ |
2123 |
3584 self.app_id = arg_dict['app_id'] |
2124 Raises: |
3585 self.post_url = arg_dict['url'] |
2125 AuthenticationError: If authentication is required and fails. |
3586 self.kind = arg_dict['kind'] |
2126 """ |
3587 self.batch_size = arg_dict['batch_size'] |
2127 thread_gate = ThreadGate(True) |
3588 self.input_generator_factory = input_generator_factory |
2128 |
3589 self.num_threads = arg_dict['num_threads'] |
2129 (unused_scheme, |
3590 self.email = arg_dict['email'] |
2130 host_port, url_path, |
3591 self.passin = arg_dict['passin'] |
2131 unused_query, unused_fragment) = urlparse.urlsplit(post_url) |
3592 self.throttle = throttle |
2132 |
3593 self.progress_db = progress_db |
2133 work_queue = work_queue_factory(max_queue_size) |
3594 self.workerthread_factory = workerthread_factory |
2134 progress_queue = progress_queue_factory(max_queue_size) |
3595 self.progresstrackerthread_factory = progresstrackerthread_factory |
2135 request_manager = request_manager_factory(app_id, |
3596 self.max_queue_size = max_queue_size |
2136 host_port, |
3597 self.request_manager_factory = request_manager_factory |
2137 url_path, |
3598 self.datasourcethread_factory = datasourcethread_factory |
2138 kind, |
3599 self.work_queue_factory = work_queue_factory |
2139 throttle) |
3600 self.progress_queue_factory = progress_queue_factory |
2140 |
3601 (scheme, |
2141 throttle.Register(threading.currentThread()) |
3602 self.host_port, self.url_path, |
2142 try: |
3603 unused_query, unused_fragment) = urlparse.urlsplit(self.post_url) |
2143 request_manager.Authenticate() |
3604 self.secure = (scheme == 'https') |
2144 except Exception, e: |
3605 |
2145 logging.exception(e) |
3606 def Run(self): |
2146 raise AuthenticationError('Authentication failed') |
3607 """Perform the work of the BulkTransporterApp. |
2147 if (request_manager.credentials is not None and |
3608 |
2148 not request_manager.authenticated): |
3609 Raises: |
2149 raise AuthenticationError('Authentication failed') |
3610 AuthenticationError: If authentication is required and fails. |
2150 |
3611 |
2151 for unused_idx in range(num_threads): |
3612 Returns: |
2152 thread = bulkloaderthread_factory(work_queue, |
3613 Error code suitable for sys.exit, e.g. 0 on success, 1 on failure. |
2153 throttle, |
3614 """ |
2154 thread_gate, |
3615 thread_gate = ThreadGate(True) |
2155 request_manager) |
3616 |
2156 throttle.Register(thread) |
3617 self.throttle.Register(threading.currentThread()) |
2157 thread_gate.Register(thread) |
3618 threading.currentThread().exit_flag = False |
2158 |
3619 |
2159 progress_thread = progresstrackerthread_factory(progress_queue, progress_db) |
3620 work_queue = self.work_queue_factory(self.max_queue_size) |
2160 |
3621 |
2161 if progress_db.HasUnfinishedWork(): |
3622 progress_queue = self.progress_queue_factory(self.max_queue_size) |
2162 logging.debug('Restarting upload using progress database') |
3623 request_manager = self.request_manager_factory(self.app_id, |
2163 progress_generator_factory = progress_db.GetProgressStatusGenerator |
3624 self.host_port, |
2164 else: |
3625 self.url_path, |
2165 progress_generator_factory = None |
3626 self.kind, |
2166 |
3627 self.throttle, |
2167 data_source_thread = datasourcethread_factory(work_queue, |
3628 self.batch_size, |
2168 progress_queue, |
3629 self.secure, |
2169 workitem_generator_factory, |
3630 self.email, |
2170 progress_generator_factory) |
3631 self.passin) |
2171 |
3632 try: |
2172 thread_local = threading.local() |
3633 request_manager.Authenticate() |
2173 thread_local.shut_down = False |
3634 except Exception, e: |
2174 |
3635 if not isinstance(e, urllib2.HTTPError) or ( |
2175 def Interrupt(unused_signum, unused_frame): |
3636 e.code != 302 and e.code != 401): |
2176 """Shutdown gracefully in response to a signal.""" |
3637 logger.exception('Exception during authentication') |
2177 thread_local.shut_down = True |
3638 raise AuthenticationError() |
2178 |
3639 if (request_manager.auth_called and |
2179 signal.signal(signal.SIGINT, Interrupt) |
3640 not request_manager.authenticated): |
2180 |
3641 raise AuthenticationError('Authentication failed') |
2181 progress_thread.start() |
3642 |
2182 data_source_thread.start() |
3643 for unused_idx in xrange(self.num_threads): |
2183 for thread in thread_gate.Threads(): |
3644 thread = self.workerthread_factory(work_queue, |
2184 thread.start() |
3645 self.throttle, |
2185 |
3646 thread_gate, |
2186 |
3647 request_manager, |
2187 while not thread_local.shut_down: |
3648 self.num_threads, |
2188 data_source_thread.join(timeout=0.25) |
3649 self.batch_size) |
2189 |
3650 self.throttle.Register(thread) |
2190 if data_source_thread.isAlive(): |
3651 thread_gate.Register(thread) |
2191 for thread in list(thread_gate.Threads()) + [progress_thread]: |
3652 |
2192 if not thread.isAlive(): |
3653 self.progress_thread = self.progresstrackerthread_factory( |
2193 logging.info('Unexpected thread death: %s', thread.getName()) |
3654 progress_queue, self.progress_db) |
2194 thread_local.shut_down = True |
3655 |
2195 break |
3656 if self.progress_db.UseProgressData(): |
|
3657 logger.debug('Restarting upload using progress database') |
|
3658 progress_generator_factory = self.progress_db.GetProgressStatusGenerator |
2196 else: |
3659 else: |
2197 break |
3660 progress_generator_factory = None |
2198 |
3661 |
2199 if thread_local.shut_down: |
3662 self.data_source_thread = ( |
2200 ShutdownThreads(data_source_thread, work_queue, thread_gate) |
3663 self.datasourcethread_factory(work_queue, |
2201 |
3664 progress_queue, |
2202 def _Join(ob, msg): |
3665 self.input_generator_factory, |
2203 logging.debug('Waiting for %s...', msg) |
3666 progress_generator_factory)) |
2204 if isinstance(ob, threading.Thread): |
3667 |
2205 ob.join(timeout=3.0) |
3668 thread_local = threading.local() |
2206 if ob.isAlive(): |
3669 thread_local.shut_down = False |
2207 logging.debug('Joining %s failed', ob.GetFriendlyName()) |
3670 |
|
3671 def Interrupt(unused_signum, unused_frame): |
|
3672 """Shutdown gracefully in response to a signal.""" |
|
3673 thread_local.shut_down = True |
|
3674 |
|
3675 signal.signal(signal.SIGINT, Interrupt) |
|
3676 |
|
3677 self.progress_thread.start() |
|
3678 self.data_source_thread.start() |
|
3679 for thread in thread_gate.Threads(): |
|
3680 thread.start() |
|
3681 |
|
3682 |
|
3683 while not thread_local.shut_down: |
|
3684 self.data_source_thread.join(timeout=0.25) |
|
3685 |
|
3686 if self.data_source_thread.isAlive(): |
|
3687 for thread in list(thread_gate.Threads()) + [self.progress_thread]: |
|
3688 if not thread.isAlive(): |
|
3689 logger.info('Unexpected thread death: %s', thread.getName()) |
|
3690 thread_local.shut_down = True |
|
3691 break |
2208 else: |
3692 else: |
2209 logging.debug('... done.') |
3693 break |
2210 elif isinstance(ob, (Queue.Queue, ReQueue)): |
3694 |
2211 if not InterruptibleQueueJoin(ob, thread_local, thread_gate): |
3695 if thread_local.shut_down: |
2212 ShutdownThreads(data_source_thread, work_queue, thread_gate) |
3696 ShutdownThreads(self.data_source_thread, work_queue, thread_gate) |
|
3697 |
|
3698 def _Join(ob, msg): |
|
3699 logger.debug('Waiting for %s...', msg) |
|
3700 if isinstance(ob, threading.Thread): |
|
3701 ob.join(timeout=3.0) |
|
3702 if ob.isAlive(): |
|
3703 logger.debug('Joining %s failed', ob.GetFriendlyName()) |
|
3704 else: |
|
3705 logger.debug('... done.') |
|
3706 elif isinstance(ob, (Queue.Queue, ReQueue)): |
|
3707 if not InterruptibleQueueJoin(ob, thread_local, thread_gate): |
|
3708 ShutdownThreads(self.data_source_thread, work_queue, thread_gate) |
|
3709 else: |
|
3710 ob.join() |
|
3711 logger.debug('... done.') |
|
3712 |
|
3713 _Join(work_queue, 'work_queue to flush') |
|
3714 |
|
3715 for unused_thread in thread_gate.Threads(): |
|
3716 work_queue.put(_THREAD_SHOULD_EXIT) |
|
3717 |
|
3718 for unused_thread in thread_gate.Threads(): |
|
3719 thread_gate.EnableThread() |
|
3720 |
|
3721 for thread in thread_gate.Threads(): |
|
3722 _Join(thread, 'thread [%s] to terminate' % thread.getName()) |
|
3723 |
|
3724 thread.CheckError() |
|
3725 |
|
3726 if self.progress_thread.isAlive(): |
|
3727 _Join(progress_queue, 'progress_queue to finish') |
2213 else: |
3728 else: |
2214 ob.join() |
3729 logger.warn('Progress thread exited prematurely') |
2215 logging.debug('... done.') |
3730 |
2216 |
3731 progress_queue.put(_THREAD_SHOULD_EXIT) |
2217 _Join(work_queue, 'work_queue to flush') |
3732 _Join(self.progress_thread, 'progress_thread to terminate') |
2218 |
3733 self.progress_thread.CheckError() |
2219 for unused_thread in thread_gate.Threads(): |
3734 if not thread_local.shut_down: |
2220 work_queue.put(_THREAD_SHOULD_EXIT) |
3735 self.progress_thread.WorkFinished() |
2221 |
3736 |
2222 for unused_thread in thread_gate.Threads(): |
3737 self.data_source_thread.CheckError() |
2223 thread_gate.EnableThread() |
3738 |
2224 |
3739 return self.ReportStatus() |
2225 for thread in thread_gate.Threads(): |
3740 |
2226 _Join(thread, 'thread [%s] to terminate' % thread.getName()) |
3741 def ReportStatus(self): |
2227 |
3742 """Display a message reporting the final status of the transfer.""" |
2228 thread.CheckError() |
3743 raise NotImplementedError() |
2229 |
3744 |
2230 if progress_thread.isAlive(): |
3745 |
2231 _Join(progress_queue, 'progress_queue to finish') |
3746 class BulkUploaderApp(BulkTransporterApp): |
2232 else: |
3747 """Class to encapsulate bulk uploader functionality.""" |
2233 logging.warn('Progress thread exited prematurely') |
3748 |
2234 |
3749 def __init__(self, *args, **kwargs): |
2235 progress_queue.put(_THREAD_SHOULD_EXIT) |
3750 BulkTransporterApp.__init__(self, *args, **kwargs) |
2236 _Join(progress_thread, 'progress_thread to terminate') |
3751 |
2237 progress_thread.CheckError() |
3752 def ReportStatus(self): |
2238 |
3753 """Display a message reporting the final status of the transfer.""" |
2239 data_source_thread.CheckError() |
3754 total_up, duration = self.throttle.TotalTransferred(BANDWIDTH_UP) |
2240 |
3755 s_total_up, unused_duration = self.throttle.TotalTransferred( |
2241 total_up, duration = throttle.TotalTransferred(BANDWIDTH_UP) |
3756 HTTPS_BANDWIDTH_UP) |
2242 s_total_up, unused_duration = throttle.TotalTransferred(HTTPS_BANDWIDTH_UP) |
3757 total_up += s_total_up |
2243 total_up += s_total_up |
3758 total = total_up |
2244 logging.info('%d entites read, %d previously transferred', |
3759 logger.info('%d entites total, %d previously transferred', |
2245 data_source_thread.read_count, |
3760 self.data_source_thread.read_count, |
2246 data_source_thread.sent_count) |
3761 self.data_source_thread.xfer_count) |
2247 logging.info('%d entities (%d bytes) transferred in %.1f seconds', |
3762 transfer_count = self.progress_thread.EntitiesTransferred() |
2248 progress_thread.entities_sent, total_up, duration) |
3763 logger.info('%d entities (%d bytes) transferred in %.1f seconds', |
2249 if (data_source_thread.read_all and |
3764 transfer_count, total, duration) |
2250 progress_thread.entities_sent + data_source_thread.sent_count >= |
3765 if (self.data_source_thread.read_all and |
2251 data_source_thread.read_count): |
3766 transfer_count + |
2252 logging.info('All entities successfully uploaded') |
3767 self.data_source_thread.xfer_count >= |
2253 else: |
3768 self.data_source_thread.read_count): |
2254 logging.info('Some entities not successfully uploaded') |
3769 logger.info('All entities successfully transferred') |
|
3770 return 0 |
|
3771 else: |
|
3772 logger.info('Some entities not successfully transferred') |
|
3773 return 1 |
|
3774 |
|
3775 |
|
3776 class BulkDownloaderApp(BulkTransporterApp): |
|
3777 """Class to encapsulate bulk downloader functionality.""" |
|
3778 |
|
3779 def __init__(self, *args, **kwargs): |
|
3780 BulkTransporterApp.__init__(self, *args, **kwargs) |
|
3781 |
|
3782 def ReportStatus(self): |
|
3783 """Display a message reporting the final status of the transfer.""" |
|
3784 total_down, duration = self.throttle.TotalTransferred(BANDWIDTH_DOWN) |
|
3785 s_total_down, unused_duration = self.throttle.TotalTransferred( |
|
3786 HTTPS_BANDWIDTH_DOWN) |
|
3787 total_down += s_total_down |
|
3788 total = total_down |
|
3789 existing_count = self.progress_thread.existing_count |
|
3790 xfer_count = self.progress_thread.EntitiesTransferred() |
|
3791 logger.info('Have %d entities, %d previously transferred', |
|
3792 xfer_count + existing_count, existing_count) |
|
3793 logger.info('%d entities (%d bytes) transferred in %.1f seconds', |
|
3794 xfer_count, total, duration) |
|
3795 return 0 |
2255 |
3796 |
2256 |
3797 |
2257 def PrintUsageExit(code): |
3798 def PrintUsageExit(code): |
2258 """Prints usage information and exits with a status code. |
3799 """Prints usage information and exits with a status code. |
2259 |
3800 |
2264 sys.stdout.flush() |
3805 sys.stdout.flush() |
2265 sys.stderr.flush() |
3806 sys.stderr.flush() |
2266 sys.exit(code) |
3807 sys.exit(code) |
2267 |
3808 |
2268 |
3809 |
|
3810 REQUIRED_OPTION = object() |
|
3811 |
|
3812 |
|
3813 FLAG_SPEC = ['debug', |
|
3814 'help', |
|
3815 'url=', |
|
3816 'filename=', |
|
3817 'batch_size=', |
|
3818 'kind=', |
|
3819 'num_threads=', |
|
3820 'bandwidth_limit=', |
|
3821 'rps_limit=', |
|
3822 'http_limit=', |
|
3823 'db_filename=', |
|
3824 'app_id=', |
|
3825 'config_file=', |
|
3826 'has_header', |
|
3827 'csv_has_header', |
|
3828 'auth_domain=', |
|
3829 'result_db_filename=', |
|
3830 'download', |
|
3831 'loader_opts=', |
|
3832 'exporter_opts=', |
|
3833 'log_file=', |
|
3834 'email=', |
|
3835 'passin', |
|
3836 ] |
|
3837 |
|
3838 |
2269 def ParseArguments(argv): |
3839 def ParseArguments(argv): |
2270 """Parses command-line arguments. |
3840 """Parses command-line arguments. |
2271 |
3841 |
2272 Prints out a help message if -h or --help is supplied. |
3842 Prints out a help message if -h or --help is supplied. |
2273 |
3843 |
2274 Args: |
3844 Args: |
2275 argv: List of command-line arguments. |
3845 argv: List of command-line arguments. |
2276 |
3846 |
2277 Returns: |
3847 Returns: |
2278 Tuple (url, filename, cookie, batch_size, kind) containing the values from |
3848 A dictionary containing the value of command-line options. |
2279 each corresponding command-line flag. |
|
2280 """ |
3849 """ |
2281 opts, unused_args = getopt.getopt( |
3850 opts, unused_args = getopt.getopt( |
2282 argv[1:], |
3851 argv[1:], |
2283 'h', |
3852 'h', |
2284 ['debug', |
3853 FLAG_SPEC) |
2285 'help', |
3854 |
2286 'url=', |
3855 arg_dict = {} |
2287 'filename=', |
3856 |
2288 'batch_size=', |
3857 arg_dict['url'] = REQUIRED_OPTION |
2289 'kind=', |
3858 arg_dict['filename'] = REQUIRED_OPTION |
2290 'num_threads=', |
3859 arg_dict['config_file'] = REQUIRED_OPTION |
2291 'bandwidth_limit=', |
3860 arg_dict['kind'] = REQUIRED_OPTION |
2292 'rps_limit=', |
3861 |
2293 'http_limit=', |
3862 arg_dict['batch_size'] = DEFAULT_BATCH_SIZE |
2294 'db_filename=', |
3863 arg_dict['num_threads'] = DEFAULT_THREAD_COUNT |
2295 'app_id=', |
3864 arg_dict['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT |
2296 'config_file=', |
3865 arg_dict['rps_limit'] = DEFAULT_RPS_LIMIT |
2297 'auth_domain=', |
3866 arg_dict['http_limit'] = DEFAULT_REQUEST_LIMIT |
2298 ]) |
3867 |
2299 |
3868 arg_dict['db_filename'] = None |
2300 url = None |
3869 arg_dict['app_id'] = '' |
2301 filename = None |
3870 arg_dict['auth_domain'] = 'gmail.com' |
2302 batch_size = DEFAULT_BATCH_SIZE |
3871 arg_dict['has_header'] = False |
2303 kind = None |
3872 arg_dict['result_db_filename'] = None |
2304 num_threads = DEFAULT_THREAD_COUNT |
3873 arg_dict['download'] = False |
2305 bandwidth_limit = DEFAULT_BANDWIDTH_LIMIT |
3874 arg_dict['loader_opts'] = None |
2306 rps_limit = DEFAULT_RPS_LIMIT |
3875 arg_dict['exporter_opts'] = None |
2307 http_limit = DEFAULT_REQUEST_LIMIT |
3876 arg_dict['debug'] = False |
2308 db_filename = None |
3877 arg_dict['log_file'] = None |
2309 app_id = None |
3878 arg_dict['email'] = None |
2310 config_file = None |
3879 arg_dict['passin'] = False |
2311 auth_domain = 'gmail.com' |
3880 |
|
3881 def ExpandFilename(filename): |
|
3882 """Expand shell variables and ~usernames in filename.""" |
|
3883 return os.path.expandvars(os.path.expanduser(filename)) |
2312 |
3884 |
2313 for option, value in opts: |
3885 for option, value in opts: |
2314 if option == '--debug': |
3886 if option == '--debug': |
2315 logging.getLogger().setLevel(logging.DEBUG) |
3887 arg_dict['debug'] = True |
2316 elif option in ('-h', '--help'): |
3888 elif option in ('-h', '--help'): |
2317 PrintUsageExit(0) |
3889 PrintUsageExit(0) |
2318 elif option == '--url': |
3890 elif option == '--url': |
2319 url = value |
3891 arg_dict['url'] = value |
2320 elif option == '--filename': |
3892 elif option == '--filename': |
2321 filename = value |
3893 arg_dict['filename'] = ExpandFilename(value) |
2322 elif option == '--batch_size': |
3894 elif option == '--batch_size': |
2323 batch_size = int(value) |
3895 arg_dict['batch_size'] = int(value) |
2324 elif option == '--kind': |
3896 elif option == '--kind': |
2325 kind = value |
3897 arg_dict['kind'] = value |
2326 elif option == '--num_threads': |
3898 elif option == '--num_threads': |
2327 num_threads = int(value) |
3899 arg_dict['num_threads'] = int(value) |
2328 elif option == '--bandwidth_limit': |
3900 elif option == '--bandwidth_limit': |
2329 bandwidth_limit = int(value) |
3901 arg_dict['bandwidth_limit'] = int(value) |
2330 elif option == '--rps_limit': |
3902 elif option == '--rps_limit': |
2331 rps_limit = int(value) |
3903 arg_dict['rps_limit'] = int(value) |
2332 elif option == '--http_limit': |
3904 elif option == '--http_limit': |
2333 http_limit = int(value) |
3905 arg_dict['http_limit'] = int(value) |
2334 elif option == '--db_filename': |
3906 elif option == '--db_filename': |
2335 db_filename = value |
3907 arg_dict['db_filename'] = ExpandFilename(value) |
2336 elif option == '--app_id': |
3908 elif option == '--app_id': |
2337 app_id = value |
3909 arg_dict['app_id'] = value |
2338 elif option == '--config_file': |
3910 elif option == '--config_file': |
2339 config_file = value |
3911 arg_dict['config_file'] = ExpandFilename(value) |
2340 elif option == '--auth_domain': |
3912 elif option == '--auth_domain': |
2341 auth_domain = value |
3913 arg_dict['auth_domain'] = value |
2342 |
3914 elif option == '--has_header': |
2343 return ProcessArguments(app_id=app_id, |
3915 arg_dict['has_header'] = True |
2344 url=url, |
3916 elif option == '--csv_has_header': |
2345 filename=filename, |
3917 print >>sys.stderr, ('--csv_has_header is deprecated, please use ' |
2346 batch_size=batch_size, |
3918 '--has_header.') |
2347 kind=kind, |
3919 arg_dict['has_header'] = True |
2348 num_threads=num_threads, |
3920 elif option == '--result_db_filename': |
2349 bandwidth_limit=bandwidth_limit, |
3921 arg_dict['result_db_filename'] = ExpandFilename(value) |
2350 rps_limit=rps_limit, |
3922 elif option == '--download': |
2351 http_limit=http_limit, |
3923 arg_dict['download'] = True |
2352 db_filename=db_filename, |
3924 elif option == '--loader_opts': |
2353 config_file=config_file, |
3925 arg_dict['loader_opts'] = value |
2354 auth_domain=auth_domain, |
3926 elif option == '--exporter_opts': |
2355 die_fn=lambda: PrintUsageExit(1)) |
3927 arg_dict['exporter_opts'] = value |
|
3928 elif option == '--log_file': |
|
3929 arg_dict['log_file'] = value |
|
3930 elif option == '--email': |
|
3931 arg_dict['email'] = value |
|
3932 elif option == '--passin': |
|
3933 arg_dict['passin'] = True |
|
3934 |
|
3935 return ProcessArguments(arg_dict, die_fn=lambda: PrintUsageExit(1)) |
2356 |
3936 |
2357 |
3937 |
2358 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit): |
3938 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit): |
|
3939 """Return a dictionary indicating the throttle options.""" |
2359 return { |
3940 return { |
2360 BANDWIDTH_UP: bandwidth_limit, |
3941 BANDWIDTH_UP: bandwidth_limit, |
2361 BANDWIDTH_DOWN: bandwidth_limit, |
3942 BANDWIDTH_DOWN: bandwidth_limit, |
2362 REQUESTS: http_limit, |
3943 REQUESTS: http_limit, |
2363 HTTPS_BANDWIDTH_UP: bandwidth_limit / 5, |
3944 HTTPS_BANDWIDTH_UP: bandwidth_limit / 5, |
2365 HTTPS_REQUESTS: http_limit / 5, |
3946 HTTPS_REQUESTS: http_limit / 5, |
2366 RECORDS: rps_limit, |
3947 RECORDS: rps_limit, |
2367 } |
3948 } |
2368 |
3949 |
2369 |
3950 |
2370 def LoadConfig(config_file): |
3951 def CheckOutputFile(filename): |
2371 """Loads a config file and registers any Loader classes present.""" |
3952 """Check that the given file does not exist and can be opened for writing. |
2372 if config_file: |
3953 |
2373 global_dict = dict(globals()) |
3954 Args: |
2374 execfile(config_file, global_dict) |
3955 filename: The name of the file. |
2375 for cls in Loader.__subclasses__(): |
3956 |
2376 Loader.RegisterLoader(cls()) |
3957 Raises: |
2377 |
3958 FileExistsError: if the given filename is not found |
2378 |
3959 FileNotWritableError: if the given filename is not readable. |
2379 def _MissingArgument(arg_name, die_fn): |
3960 """ |
2380 """Print error message about missing argument and die.""" |
3961 if os.path.exists(filename): |
2381 print >>sys.stderr, '%s argument required' % arg_name |
3962 raise FileExistsError('%s: output file exists' % filename) |
2382 die_fn() |
3963 elif not os.access(os.path.dirname(filename), os.W_OK): |
2383 |
3964 raise FileNotWritableError( |
2384 |
3965 '%s: not writable' % os.path.dirname(filename)) |
2385 def ProcessArguments(app_id=None, |
3966 |
2386 url=None, |
3967 |
2387 filename=None, |
3968 def LoadConfig(config_file_name, exit_fn=sys.exit): |
2388 batch_size=DEFAULT_BATCH_SIZE, |
3969 """Loads a config file and registers any Loader classes present. |
2389 kind=None, |
3970 |
2390 num_threads=DEFAULT_THREAD_COUNT, |
3971 Args: |
2391 bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT, |
3972 config_file_name: The name of the configuration file. |
2392 rps_limit=DEFAULT_RPS_LIMIT, |
3973 exit_fn: Used for dependency injection. |
2393 http_limit=DEFAULT_REQUEST_LIMIT, |
3974 """ |
2394 db_filename=None, |
3975 if config_file_name: |
2395 config_file=None, |
3976 config_file = open(config_file_name, 'r') |
2396 auth_domain='gmail.com', |
3977 try: |
|
3978 bulkloader_config = imp.load_module( |
|
3979 'bulkloader_config', config_file, config_file_name, |
|
3980 ('', 'r', imp.PY_SOURCE)) |
|
3981 sys.modules['bulkloader_config'] = bulkloader_config |
|
3982 |
|
3983 if hasattr(bulkloader_config, 'loaders'): |
|
3984 for cls in bulkloader_config.loaders: |
|
3985 Loader.RegisterLoader(cls()) |
|
3986 |
|
3987 if hasattr(bulkloader_config, 'exporters'): |
|
3988 for cls in bulkloader_config.exporters: |
|
3989 Exporter.RegisterExporter(cls()) |
|
3990 except NameError, e: |
|
3991 m = re.search(r"[^']*'([^']*)'.*", str(e)) |
|
3992 if m.groups() and m.group(1) == 'Loader': |
|
3993 print >>sys.stderr, """ |
|
3994 The config file format has changed and you appear to be using an old-style |
|
3995 config file. Please make the following changes: |
|
3996 |
|
3997 1. At the top of the file, add this: |
|
3998 |
|
3999 from google.appengine.tools import bulkloader.Loader |
|
4000 |
|
4001 2. For each of your Loader subclasses add the following at the end of the |
|
4002 __init__ definitioion: |
|
4003 |
|
4004 self.alias_old_names() |
|
4005 |
|
4006 3. At the bottom of the file, add this: |
|
4007 |
|
4008 loaders = [MyLoader1,...,MyLoaderN] |
|
4009 |
|
4010 Where MyLoader1,...,MyLoaderN are the Loader subclasses you want the bulkloader |
|
4011 to have access to. |
|
4012 """ |
|
4013 exit_fn(1) |
|
4014 else: |
|
4015 raise |
|
4016 except Exception, e: |
|
4017 if isinstance(e, NameClashError) or 'bulkloader_config' in vars() and ( |
|
4018 hasattr(bulkloader_config, 'bulkloader') and |
|
4019 isinstance(e, bulkloader_config.bulkloader.NameClashError)): |
|
4020 print >> sys.stderr, ( |
|
4021 'Found both %s and %s while aliasing old names on %s.'% |
|
4022 (e.old_name, e.new_name, e.klass)) |
|
4023 exit_fn(1) |
|
4024 else: |
|
4025 raise |
|
4026 |
|
4027 def GetArgument(kwargs, name, die_fn): |
|
4028 """Get the value of the key name in kwargs, or die with die_fn. |
|
4029 |
|
4030 Args: |
|
4031 kwargs: A dictionary containing the options for the bulkloader. |
|
4032 name: The name of a bulkloader option. |
|
4033 die_fn: The function to call to exit the program. |
|
4034 |
|
4035 Returns: |
|
4036 The value of kwargs[name] is name in kwargs |
|
4037 """ |
|
4038 if name in kwargs: |
|
4039 return kwargs[name] |
|
4040 else: |
|
4041 print >>sys.stderr, '%s argument required' % name |
|
4042 die_fn() |
|
4043 |
|
4044 |
|
4045 def _MakeSignature(app_id=None, |
|
4046 url=None, |
|
4047 kind=None, |
|
4048 db_filename=None, |
|
4049 download=None, |
|
4050 has_header=None, |
|
4051 result_db_filename=None): |
|
4052 """Returns a string that identifies the important options for the database.""" |
|
4053 if download: |
|
4054 result_db_line = 'result_db: %s' % result_db_filename |
|
4055 else: |
|
4056 result_db_line = '' |
|
4057 return u""" |
|
4058 app_id: %s |
|
4059 url: %s |
|
4060 kind: %s |
|
4061 download: %s |
|
4062 progress_db: %s |
|
4063 has_header: %s |
|
4064 %s |
|
4065 """ % (app_id, url, kind, download, db_filename, has_header, result_db_line) |
|
4066 |
|
4067 |
|
4068 def ProcessArguments(arg_dict, |
2397 die_fn=lambda: sys.exit(1)): |
4069 die_fn=lambda: sys.exit(1)): |
2398 """Processes non command-line input arguments.""" |
4070 """Processes non command-line input arguments. |
|
4071 |
|
4072 Args: |
|
4073 arg_dict: Dictionary containing the values of bulkloader options. |
|
4074 die_fn: Function to call in case of an error during argument processing. |
|
4075 |
|
4076 Returns: |
|
4077 A dictionary of bulkloader options. |
|
4078 """ |
|
4079 app_id = GetArgument(arg_dict, 'app_id', die_fn) |
|
4080 url = GetArgument(arg_dict, 'url', die_fn) |
|
4081 filename = GetArgument(arg_dict, 'filename', die_fn) |
|
4082 batch_size = GetArgument(arg_dict, 'batch_size', die_fn) |
|
4083 kind = GetArgument(arg_dict, 'kind', die_fn) |
|
4084 db_filename = GetArgument(arg_dict, 'db_filename', die_fn) |
|
4085 config_file = GetArgument(arg_dict, 'config_file', die_fn) |
|
4086 result_db_filename = GetArgument(arg_dict, 'result_db_filename', die_fn) |
|
4087 download = GetArgument(arg_dict, 'download', die_fn) |
|
4088 log_file = GetArgument(arg_dict, 'log_file', die_fn) |
|
4089 |
|
4090 unused_passin = GetArgument(arg_dict, 'passin', die_fn) |
|
4091 unused_email = GetArgument(arg_dict, 'email', die_fn) |
|
4092 unused_debug = GetArgument(arg_dict, 'debug', die_fn) |
|
4093 unused_num_threads = GetArgument(arg_dict, 'num_threads', die_fn) |
|
4094 unused_bandwidth_limit = GetArgument(arg_dict, 'bandwidth_limit', die_fn) |
|
4095 unused_rps_limit = GetArgument(arg_dict, 'rps_limit', die_fn) |
|
4096 unused_http_limit = GetArgument(arg_dict, 'http_limit', die_fn) |
|
4097 unused_auth_domain = GetArgument(arg_dict, 'auth_domain', die_fn) |
|
4098 unused_has_headers = GetArgument(arg_dict, 'has_header', die_fn) |
|
4099 unused_loader_opts = GetArgument(arg_dict, 'loader_opts', die_fn) |
|
4100 unused_exporter_opts = GetArgument(arg_dict, 'exporter_opts', die_fn) |
|
4101 |
|
4102 errors = [] |
|
4103 |
2399 if db_filename is None: |
4104 if db_filename is None: |
2400 db_filename = time.strftime('bulkloader-progress-%Y%m%d.%H%M%S.sql3') |
4105 arg_dict['db_filename'] = time.strftime( |
|
4106 'bulkloader-progress-%Y%m%d.%H%M%S.sql3') |
|
4107 |
|
4108 if result_db_filename is None: |
|
4109 arg_dict['result_db_filename'] = time.strftime( |
|
4110 'bulkloader-results-%Y%m%d.%H%M%S.sql3') |
|
4111 |
|
4112 if log_file is None: |
|
4113 arg_dict['log_file'] = time.strftime('bulkloader-log-%Y%m%d.%H%M%S') |
2401 |
4114 |
2402 if batch_size <= 0: |
4115 if batch_size <= 0: |
2403 print >>sys.stderr, 'batch_size must be 1 or larger' |
4116 errors.append('batch_size must be at least 1') |
2404 die_fn() |
4117 |
2405 |
4118 required = '%s argument required' |
2406 if url is None: |
4119 |
2407 _MissingArgument('url', die_fn) |
4120 if url is REQUIRED_OPTION: |
2408 |
4121 errors.append(required % 'url') |
2409 if filename is None: |
4122 |
2410 _MissingArgument('filename', die_fn) |
4123 if filename is REQUIRED_OPTION: |
2411 |
4124 errors.append(required % 'filename') |
2412 if kind is None: |
4125 |
2413 _MissingArgument('kind', die_fn) |
4126 if kind is REQUIRED_OPTION: |
2414 |
4127 errors.append(required % 'kind') |
2415 if config_file is None: |
4128 |
2416 _MissingArgument('config_file', die_fn) |
4129 if config_file is REQUIRED_OPTION: |
2417 |
4130 errors.append(required % 'config_file') |
2418 if app_id is None: |
4131 |
|
4132 if download: |
|
4133 if result_db_filename is REQUIRED_OPTION: |
|
4134 errors.append(required % 'result_db_filename') |
|
4135 |
|
4136 if not app_id: |
2419 (unused_scheme, host_port, unused_url_path, |
4137 (unused_scheme, host_port, unused_url_path, |
2420 unused_query, unused_fragment) = urlparse.urlsplit(url) |
4138 unused_query, unused_fragment) = urlparse.urlsplit(url) |
2421 suffix_idx = host_port.find('.appspot.com') |
4139 suffix_idx = host_port.find('.appspot.com') |
2422 if suffix_idx > -1: |
4140 if suffix_idx > -1: |
2423 app_id = host_port[:suffix_idx] |
4141 arg_dict['app_id'] = host_port[:suffix_idx] |
2424 elif host_port.split(':')[0].endswith('google.com'): |
4142 elif host_port.split(':')[0].endswith('google.com'): |
2425 app_id = host_port.split('.')[0] |
4143 arg_dict['app_id'] = host_port.split('.')[0] |
2426 else: |
4144 else: |
2427 print >>sys.stderr, 'app_id required for non appspot.com domains' |
4145 errors.append('app_id argument required for non appspot.com domains') |
2428 die_fn() |
4146 |
2429 |
4147 if errors: |
2430 return (app_id, url, filename, batch_size, kind, num_threads, |
4148 print >>sys.stderr, '\n'.join(errors) |
2431 bandwidth_limit, rps_limit, http_limit, db_filename, config_file, |
4149 die_fn() |
2432 auth_domain) |
4150 |
2433 |
4151 return arg_dict |
2434 |
4152 |
2435 def _PerformBulkload(app_id=None, |
4153 |
2436 url=None, |
4154 def ParseKind(kind): |
2437 filename=None, |
4155 if kind and kind[0] == '(' and kind[-1] == ')': |
2438 batch_size=DEFAULT_BATCH_SIZE, |
4156 return tuple(kind[1:-1].split(',')) |
2439 kind=None, |
4157 else: |
2440 num_threads=DEFAULT_THREAD_COUNT, |
4158 return kind |
2441 bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT, |
4159 |
2442 rps_limit=DEFAULT_RPS_LIMIT, |
4160 |
2443 http_limit=DEFAULT_REQUEST_LIMIT, |
4161 def _PerformBulkload(arg_dict, |
2444 db_filename=None, |
4162 check_file=CheckFile, |
2445 config_file=None, |
4163 check_output_file=CheckOutputFile): |
2446 auth_domain='gmail.com'): |
4164 """Runs the bulkloader, given the command line options. |
2447 """Runs the bulkloader, given the options as keyword arguments. |
|
2448 |
4165 |
2449 Args: |
4166 Args: |
2450 app_id: The application id. |
4167 arg_dict: Dictionary of bulkloader options. |
2451 url: The url of the remote_api endpoint. |
4168 check_file: Used for dependency injection. |
2452 filename: The name of the file containing the CSV data. |
4169 check_output_file: Used for dependency injection. |
2453 batch_size: The number of records to send per request. |
4170 |
2454 kind: The kind of entity to transfer. |
4171 Returns: |
2455 num_threads: The number of threads to use to transfer data. |
4172 An exit code. |
2456 bandwidth_limit: Maximum bytes/second to transfers. |
4173 |
2457 rps_limit: Maximum records/second to transfer. |
4174 Raises: |
2458 http_limit: Maximum requests/second for transfers. |
4175 ConfigurationError: if inconsistent options are passed. |
2459 db_filename: The name of the SQLite3 progress database file. |
4176 """ |
2460 config_file: The name of the configuration file. |
4177 app_id = arg_dict['app_id'] |
2461 auth_domain: The auth domain to use for logins and UserProperty. |
4178 url = arg_dict['url'] |
|
4179 filename = arg_dict['filename'] |
|
4180 batch_size = arg_dict['batch_size'] |
|
4181 kind = arg_dict['kind'] |
|
4182 num_threads = arg_dict['num_threads'] |
|
4183 bandwidth_limit = arg_dict['bandwidth_limit'] |
|
4184 rps_limit = arg_dict['rps_limit'] |
|
4185 http_limit = arg_dict['http_limit'] |
|
4186 db_filename = arg_dict['db_filename'] |
|
4187 config_file = arg_dict['config_file'] |
|
4188 auth_domain = arg_dict['auth_domain'] |
|
4189 has_header = arg_dict['has_header'] |
|
4190 download = arg_dict['download'] |
|
4191 result_db_filename = arg_dict['result_db_filename'] |
|
4192 loader_opts = arg_dict['loader_opts'] |
|
4193 exporter_opts = arg_dict['exporter_opts'] |
|
4194 email = arg_dict['email'] |
|
4195 passin = arg_dict['passin'] |
|
4196 |
|
4197 os.environ['AUTH_DOMAIN'] = auth_domain |
|
4198 |
|
4199 kind = ParseKind(kind) |
|
4200 |
|
4201 check_file(config_file) |
|
4202 if not download: |
|
4203 check_file(filename) |
|
4204 else: |
|
4205 check_output_file(filename) |
|
4206 |
|
4207 LoadConfig(config_file) |
|
4208 |
|
4209 os.environ['APPLICATION_ID'] = app_id |
|
4210 |
|
4211 throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit) |
|
4212 |
|
4213 throttle = Throttle(layout=throttle_layout) |
|
4214 signature = _MakeSignature(app_id=app_id, |
|
4215 url=url, |
|
4216 kind=kind, |
|
4217 db_filename=db_filename, |
|
4218 download=download, |
|
4219 has_header=has_header, |
|
4220 result_db_filename=result_db_filename) |
|
4221 |
|
4222 |
|
4223 max_queue_size = max(DEFAULT_QUEUE_SIZE, 3 * num_threads + 5) |
|
4224 |
|
4225 if db_filename == 'skip': |
|
4226 progress_db = StubProgressDatabase() |
|
4227 elif not download: |
|
4228 progress_db = ProgressDatabase(db_filename, signature) |
|
4229 else: |
|
4230 progress_db = ExportProgressDatabase(db_filename, signature) |
|
4231 |
|
4232 if download: |
|
4233 result_db = ResultDatabase(result_db_filename, signature) |
|
4234 |
|
4235 return_code = 1 |
|
4236 |
|
4237 if not download: |
|
4238 loader = Loader.RegisteredLoader(kind) |
|
4239 try: |
|
4240 loader.initialize(filename, loader_opts) |
|
4241 workitem_generator_factory = GetCSVGeneratorFactory( |
|
4242 kind, filename, batch_size, has_header) |
|
4243 |
|
4244 app = BulkUploaderApp(arg_dict, |
|
4245 workitem_generator_factory, |
|
4246 throttle, |
|
4247 progress_db, |
|
4248 BulkLoaderThread, |
|
4249 ProgressTrackerThread, |
|
4250 max_queue_size, |
|
4251 RequestManager, |
|
4252 DataSourceThread, |
|
4253 ReQueue, |
|
4254 Queue.Queue) |
|
4255 try: |
|
4256 return_code = app.Run() |
|
4257 except AuthenticationError: |
|
4258 logger.info('Authentication Failed') |
|
4259 finally: |
|
4260 loader.finalize() |
|
4261 else: |
|
4262 exporter = Exporter.RegisteredExporter(kind) |
|
4263 try: |
|
4264 exporter.initialize(filename, exporter_opts) |
|
4265 |
|
4266 def KeyRangeGeneratorFactory(progress_queue, progress_gen): |
|
4267 return KeyRangeGenerator(kind, progress_queue, progress_gen) |
|
4268 |
|
4269 def ExportProgressThreadFactory(progress_queue, progress_db): |
|
4270 return ExportProgressThread(kind, |
|
4271 progress_queue, |
|
4272 progress_db, |
|
4273 result_db) |
|
4274 app = BulkDownloaderApp(arg_dict, |
|
4275 KeyRangeGeneratorFactory, |
|
4276 throttle, |
|
4277 progress_db, |
|
4278 BulkExporterThread, |
|
4279 ExportProgressThreadFactory, |
|
4280 0, |
|
4281 RequestManager, |
|
4282 DataSourceThread, |
|
4283 ReQueue, |
|
4284 Queue.Queue) |
|
4285 try: |
|
4286 return_code = app.Run() |
|
4287 except AuthenticationError: |
|
4288 logger.info('Authentication Failed') |
|
4289 finally: |
|
4290 exporter.finalize() |
|
4291 return return_code |
|
4292 |
|
4293 |
|
4294 def SetupLogging(arg_dict): |
|
4295 """Sets up logging for the bulkloader. |
|
4296 |
|
4297 Args: |
|
4298 arg_dict: Dictionary mapping flag names to their arguments. |
|
4299 """ |
|
4300 format = '[%(levelname)-8s %(asctime)s %(filename)s] %(message)s' |
|
4301 debug = arg_dict['debug'] |
|
4302 log_file = arg_dict['log_file'] |
|
4303 |
|
4304 logger.setLevel(logging.DEBUG) |
|
4305 |
|
4306 logger.propagate = False |
|
4307 |
|
4308 file_handler = logging.FileHandler(log_file, 'w') |
|
4309 file_handler.setLevel(logging.DEBUG) |
|
4310 file_formatter = logging.Formatter(format) |
|
4311 file_handler.setFormatter(file_formatter) |
|
4312 logger.addHandler(file_handler) |
|
4313 |
|
4314 console = logging.StreamHandler() |
|
4315 level = logging.INFO |
|
4316 if debug: |
|
4317 level = logging.DEBUG |
|
4318 console.setLevel(level) |
|
4319 console_format = '[%(levelname)-8s] %(message)s' |
|
4320 formatter = logging.Formatter(console_format) |
|
4321 console.setFormatter(formatter) |
|
4322 logger.addHandler(console) |
|
4323 |
|
4324 logger.info('Logging to %s', log_file) |
|
4325 |
|
4326 appengine_rpc.logger.setLevel(logging.WARN) |
|
4327 |
|
4328 |
|
4329 def Run(arg_dict): |
|
4330 """Sets up and runs the bulkloader, given the options as keyword arguments. |
|
4331 |
|
4332 Args: |
|
4333 arg_dict: Dictionary of bulkloader options |
2462 |
4334 |
2463 Returns: |
4335 Returns: |
2464 An exit code. |
4336 An exit code. |
2465 """ |
4337 """ |
2466 os.environ['AUTH_DOMAIN'] = auth_domain |
4338 arg_dict = ProcessArguments(arg_dict) |
2467 LoadConfig(config_file) |
4339 |
2468 |
4340 SetupLogging(arg_dict) |
2469 throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit) |
4341 |
2470 |
4342 return _PerformBulkload(arg_dict) |
2471 throttle = Throttle(layout=throttle_layout) |
|
2472 |
|
2473 |
|
2474 workitem_generator_factory = GetCSVGeneratorFactory(filename, batch_size) |
|
2475 |
|
2476 if db_filename == 'skip': |
|
2477 progress_db = StubProgressDatabase() |
|
2478 else: |
|
2479 progress_db = ProgressDatabase(db_filename) |
|
2480 |
|
2481 |
|
2482 max_queue_size = max(DEFAULT_QUEUE_SIZE, 2 * num_threads + 5) |
|
2483 |
|
2484 PerformBulkUpload(app_id, |
|
2485 url, |
|
2486 kind, |
|
2487 workitem_generator_factory, |
|
2488 num_threads, |
|
2489 throttle, |
|
2490 progress_db, |
|
2491 max_queue_size=max_queue_size) |
|
2492 |
|
2493 return 0 |
|
2494 |
|
2495 |
|
2496 def Run(app_id=None, |
|
2497 url=None, |
|
2498 filename=None, |
|
2499 batch_size=DEFAULT_BATCH_SIZE, |
|
2500 kind=None, |
|
2501 num_threads=DEFAULT_THREAD_COUNT, |
|
2502 bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT, |
|
2503 rps_limit=DEFAULT_RPS_LIMIT, |
|
2504 http_limit=DEFAULT_REQUEST_LIMIT, |
|
2505 db_filename=None, |
|
2506 auth_domain='gmail.com', |
|
2507 config_file=None): |
|
2508 """Sets up and runs the bulkloader, given the options as keyword arguments. |
|
2509 |
|
2510 Args: |
|
2511 app_id: The application id. |
|
2512 url: The url of the remote_api endpoint. |
|
2513 filename: The name of the file containing the CSV data. |
|
2514 batch_size: The number of records to send per request. |
|
2515 kind: The kind of entity to transfer. |
|
2516 num_threads: The number of threads to use to transfer data. |
|
2517 bandwidth_limit: Maximum bytes/second to transfers. |
|
2518 rps_limit: Maximum records/second to transfer. |
|
2519 http_limit: Maximum requests/second for transfers. |
|
2520 db_filename: The name of the SQLite3 progress database file. |
|
2521 config_file: The name of the configuration file. |
|
2522 auth_domain: The auth domain to use for logins and UserProperty. |
|
2523 |
|
2524 Returns: |
|
2525 An exit code. |
|
2526 """ |
|
2527 logging.basicConfig( |
|
2528 format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s') |
|
2529 args = ProcessArguments(app_id=app_id, |
|
2530 url=url, |
|
2531 filename=filename, |
|
2532 batch_size=batch_size, |
|
2533 kind=kind, |
|
2534 num_threads=num_threads, |
|
2535 bandwidth_limit=bandwidth_limit, |
|
2536 rps_limit=rps_limit, |
|
2537 http_limit=http_limit, |
|
2538 db_filename=db_filename, |
|
2539 config_file=config_file) |
|
2540 |
|
2541 (app_id, url, filename, batch_size, kind, num_threads, bandwidth_limit, |
|
2542 rps_limit, http_limit, db_filename, config_file, auth_domain) = args |
|
2543 |
|
2544 return _PerformBulkload(app_id=app_id, |
|
2545 url=url, |
|
2546 filename=filename, |
|
2547 batch_size=batch_size, |
|
2548 kind=kind, |
|
2549 num_threads=num_threads, |
|
2550 bandwidth_limit=bandwidth_limit, |
|
2551 rps_limit=rps_limit, |
|
2552 http_limit=http_limit, |
|
2553 db_filename=db_filename, |
|
2554 config_file=config_file, |
|
2555 auth_domain=auth_domain) |
|
2556 |
4343 |
2557 |
4344 |
2558 def main(argv): |
4345 def main(argv): |
2559 """Runs the importer from the command line.""" |
4346 """Runs the importer from the command line.""" |
2560 logging.basicConfig( |
4347 |
2561 level=logging.INFO, |
4348 arg_dict = ParseArguments(argv) |
2562 format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s') |
4349 |
2563 |
4350 errors = ['%s argument required' % key |
2564 args = ParseArguments(argv) |
4351 for (key, value) in arg_dict.iteritems() |
2565 if None in args: |
4352 if value is REQUIRED_OPTION] |
2566 print >>sys.stderr, 'Invalid arguments' |
4353 if errors: |
|
4354 print >>sys.stderr, '\n'.join(errors) |
2567 PrintUsageExit(1) |
4355 PrintUsageExit(1) |
2568 |
4356 |
2569 (app_id, url, filename, batch_size, kind, num_threads, |
4357 SetupLogging(arg_dict) |
2570 bandwidth_limit, rps_limit, http_limit, db_filename, config_file, |
4358 return _PerformBulkload(arg_dict) |
2571 auth_domain) = args |
|
2572 |
|
2573 return _PerformBulkload(app_id=app_id, |
|
2574 url=url, |
|
2575 filename=filename, |
|
2576 batch_size=batch_size, |
|
2577 kind=kind, |
|
2578 num_threads=num_threads, |
|
2579 bandwidth_limit=bandwidth_limit, |
|
2580 rps_limit=rps_limit, |
|
2581 http_limit=http_limit, |
|
2582 db_filename=db_filename, |
|
2583 config_file=config_file, |
|
2584 auth_domain=auth_domain) |
|
2585 |
4359 |
2586 |
4360 |
2587 if __name__ == '__main__': |
4361 if __name__ == '__main__': |
2588 sys.exit(main(sys.argv)) |
4362 sys.exit(main(sys.argv)) |