|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """Imports CSV data over HTTP. |
|
19 |
|
20 Usage: |
|
21 %(arg0)s [flags] |
|
22 |
|
23 --debug Show debugging information. (Optional) |
|
24 --app_id=<string> Application ID of endpoint (Optional for |
|
25 *.appspot.com) |
|
26 --auth_domain=<domain> The auth domain to use for logging in and for |
|
27 UserProperties. (Default: gmail.com) |
|
28 --bandwidth_limit=<int> The maximum number of bytes per second for the |
|
29 aggregate transfer of data to the server. Bursts |
|
30 --batch_size=<int> Number of Entity objects to include in each post to |
|
31 the URL endpoint. The more data per row/Entity, the |
|
32 smaller the batch size should be. (Default 10) |
|
33 --config_file=<path> File containing Model and Loader definitions. |
|
34 (Required) |
|
35 --db_filename=<path> Specific progress database to write to, or to |
|
36 resume from. If not supplied, then a new database |
|
37 will be started, named: |
|
38 bulkloader-progress-TIMESTAMP. |
|
39 The special filename "skip" may be used to simply |
|
40 skip reading/writing any progress information. |
|
41 --filename=<path> Path to the CSV file to import. (Required) |
|
42 --http_limit=<int> The maximum numer of HTTP requests per second to |
|
43 send to the server. (Default: 8) |
|
44 --kind=<string> Name of the Entity object kind to put in the |
|
45 datastore. (Required) |
|
46 --num_threads=<int> Number of threads to use for uploading entities |
|
47 (Default 10) |
|
48 may exceed this, but overall transfer rate is |
|
49 restricted to this rate. (Default 250000) |
|
50 --rps_limit=<int> The maximum number of records per second to |
|
51 transfer to the server. (Default: 20) |
|
52 --url=<string> URL endpoint to post to for importing data. |
|
53 (Required) |
|
54 |
|
55 The exit status will be 0 on success, non-zero on import failure. |
|
56 |
|
57 Works with the remote_api mix-in library for google.appengine.ext.remote_api. |
|
58 Please look there for documentation about how to setup the server side. |
|
59 |
|
60 Example: |
|
61 |
|
62 %(arg0)s --url=http://app.appspot.com/remote_api --kind=Model \ |
|
63 --filename=data.csv --config_file=loader_config.py |
|
64 |
|
65 """ |
|
66 |
|
67 |
|
68 |
|
69 import csv |
|
70 import getopt |
|
71 import getpass |
|
72 import logging |
|
73 import new |
|
74 import os |
|
75 import Queue |
|
76 import signal |
|
77 import sys |
|
78 import threading |
|
79 import time |
|
80 import traceback |
|
81 import urllib2 |
|
82 import urlparse |
|
83 |
|
84 from google.appengine.ext import db |
|
85 from google.appengine.ext.remote_api import remote_api_stub |
|
86 from google.appengine.tools import appengine_rpc |
|
87 |
|
88 try: |
|
89 import sqlite3 |
|
90 except ImportError: |
|
91 pass |
|
92 |
|
93 UPLOADER_VERSION = '1' |
|
94 |
|
95 DEFAULT_THREAD_COUNT = 10 |
|
96 |
|
97 DEFAULT_BATCH_SIZE = 10 |
|
98 |
|
99 DEFAULT_QUEUE_SIZE = DEFAULT_THREAD_COUNT * 10 |
|
100 |
|
101 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT' |
|
102 |
|
103 STATE_READ = 0 |
|
104 STATE_SENDING = 1 |
|
105 STATE_SENT = 2 |
|
106 STATE_NOT_SENT = 3 |
|
107 |
|
108 MINIMUM_THROTTLE_SLEEP_DURATION = 0.001 |
|
109 |
|
110 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE' |
|
111 |
|
112 INITIAL_BACKOFF = 1.0 |
|
113 |
|
114 BACKOFF_FACTOR = 2.0 |
|
115 |
|
116 |
|
117 DEFAULT_BANDWIDTH_LIMIT = 250000 |
|
118 |
|
119 DEFAULT_RPS_LIMIT = 20 |
|
120 |
|
121 DEFAULT_REQUEST_LIMIT = 8 |
|
122 |
|
123 BANDWIDTH_UP = 'http-bandwidth-up' |
|
124 BANDWIDTH_DOWN = 'http-bandwidth-down' |
|
125 REQUESTS = 'http-requests' |
|
126 HTTPS_BANDWIDTH_UP = 'https-bandwidth-up' |
|
127 HTTPS_BANDWIDTH_DOWN = 'https-bandwidth-down' |
|
128 HTTPS_REQUESTS = 'https-requests' |
|
129 RECORDS = 'records' |
|
130 |
|
131 |
|
132 def StateMessage(state): |
|
133 """Converts a numeric state identifier to a status message.""" |
|
134 return ({ |
|
135 STATE_READ: 'Batch read from file.', |
|
136 STATE_SENDING: 'Sending batch to server.', |
|
137 STATE_SENT: 'Batch successfully sent.', |
|
138 STATE_NOT_SENT: 'Error while sending batch.' |
|
139 }[state]) |
|
140 |
|
141 |
|
142 class Error(Exception): |
|
143 """Base-class for exceptions in this module.""" |
|
144 |
|
145 |
|
146 class FatalServerError(Error): |
|
147 """An unrecoverable error occurred while trying to post data to the server.""" |
|
148 |
|
149 |
|
150 class ResumeError(Error): |
|
151 """Error while trying to resume a partial upload.""" |
|
152 |
|
153 |
|
154 class ConfigurationError(Error): |
|
155 """Error in configuration options.""" |
|
156 |
|
157 |
|
158 class AuthenticationError(Error): |
|
159 """Error while trying to authenticate with the server.""" |
|
160 |
|
161 |
|
162 def GetCSVGeneratorFactory(csv_filename, batch_size, |
|
163 openfile=open, create_csv_reader=csv.reader): |
|
164 """Return a factory that creates a CSV-based WorkItem generator. |
|
165 |
|
166 Args: |
|
167 csv_filename: File on disk containing CSV data. |
|
168 batch_size: Maximum number of CSV rows to stash into a WorkItem. |
|
169 openfile: Used for dependency injection. |
|
170 create_csv_reader: Used for dependency injection. |
|
171 |
|
172 Returns: A callable (accepting the Progress Queue and Progress |
|
173 Generators as input) which creates the WorkItem generator. |
|
174 """ |
|
175 |
|
176 def CreateGenerator(progress_queue, progress_generator): |
|
177 """Initialize a CSV generator linked to a progress generator and queue. |
|
178 |
|
179 Args: |
|
180 progress_queue: A ProgressQueue instance to send progress information. |
|
181 progress_generator: A generator of progress information or None. |
|
182 |
|
183 Returns: |
|
184 A CSVGenerator instance. |
|
185 """ |
|
186 return CSVGenerator(progress_queue, |
|
187 progress_generator, |
|
188 csv_filename, |
|
189 batch_size, |
|
190 openfile, |
|
191 create_csv_reader) |
|
192 return CreateGenerator |
|
193 |
|
194 |
|
195 class CSVGenerator(object): |
|
196 """Reads a CSV file and generates WorkItems containing batches of records.""" |
|
197 |
|
198 def __init__(self, |
|
199 progress_queue, |
|
200 progress_generator, |
|
201 csv_filename, |
|
202 batch_size, |
|
203 openfile, |
|
204 create_csv_reader): |
|
205 """Initializes a CSV generator. |
|
206 |
|
207 Args: |
|
208 progress_queue: A queue used for tracking progress information. |
|
209 progress_generator: A generator of prior progress information, or None |
|
210 if there is no prior status. |
|
211 csv_filename: File on disk containing CSV data. |
|
212 batch_size: Maximum number of CSV rows to stash into a WorkItem. |
|
213 openfile: Used for dependency injection of 'open'. |
|
214 create_csv_reader: Used for dependency injection of 'csv.reader'. |
|
215 """ |
|
216 self.progress_queue = progress_queue |
|
217 self.progress_generator = progress_generator |
|
218 self.csv_filename = csv_filename |
|
219 self.batch_size = batch_size |
|
220 self.openfile = openfile |
|
221 self.create_csv_reader = create_csv_reader |
|
222 self.line_number = 1 |
|
223 self.column_count = None |
|
224 self.read_rows = [] |
|
225 self.reader = None |
|
226 self.row_count = 0 |
|
227 self.sent_count = 0 |
|
228 |
|
229 def _AdvanceTo(self, line): |
|
230 """Advance the reader to the given line. |
|
231 |
|
232 Args: |
|
233 line: A line number to advance to. |
|
234 """ |
|
235 while self.line_number < line: |
|
236 self.reader.next() |
|
237 self.line_number += 1 |
|
238 self.row_count += 1 |
|
239 self.sent_count += 1 |
|
240 |
|
241 def _ReadRows(self, key_start, key_end): |
|
242 """Attempts to read and encode rows [key_start, key_end]. |
|
243 |
|
244 The encoded rows are stored in self.read_rows. |
|
245 |
|
246 Args: |
|
247 key_start: The starting line number. |
|
248 key_end: The ending line number. |
|
249 |
|
250 Raises: |
|
251 StopIteration: if the reader runs out of rows |
|
252 ResumeError: if there are an inconsistent number of columns. |
|
253 """ |
|
254 assert self.line_number == key_start |
|
255 self.read_rows = [] |
|
256 while self.line_number <= key_end: |
|
257 row = self.reader.next() |
|
258 self.row_count += 1 |
|
259 if self.column_count is None: |
|
260 self.column_count = len(row) |
|
261 else: |
|
262 if self.column_count != len(row): |
|
263 raise ResumeError('Column count mismatch, %d: %s' % |
|
264 (self.column_count, str(row))) |
|
265 self.read_rows.append((self.line_number, row)) |
|
266 self.line_number += 1 |
|
267 |
|
268 def _MakeItem(self, key_start, key_end, rows, progress_key=None): |
|
269 """Makes a WorkItem containing the given rows, with the given keys. |
|
270 |
|
271 Args: |
|
272 key_start: The start key for the WorkItem. |
|
273 key_end: The end key for the WorkItem. |
|
274 rows: A list of the rows for the WorkItem. |
|
275 progress_key: The progress key for the WorkItem |
|
276 |
|
277 Returns: |
|
278 A WorkItem instance for the given batch. |
|
279 """ |
|
280 assert rows |
|
281 |
|
282 item = WorkItem(self.progress_queue, rows, |
|
283 key_start, key_end, |
|
284 progress_key=progress_key) |
|
285 |
|
286 return item |
|
287 |
|
288 def Batches(self): |
|
289 """Reads the CSV data file and generates WorkItems. |
|
290 |
|
291 Yields: |
|
292 Instances of class WorkItem |
|
293 |
|
294 Raises: |
|
295 ResumeError: If the progress database and data file indicate a different |
|
296 number of rows. |
|
297 """ |
|
298 csv_file = self.openfile(self.csv_filename, 'r') |
|
299 csv_content = csv_file.read() |
|
300 if csv_content: |
|
301 has_headers = csv.Sniffer().has_header(csv_content) |
|
302 else: |
|
303 has_headers = False |
|
304 csv_file.seek(0) |
|
305 self.reader = self.create_csv_reader(csv_file, skipinitialspace=True) |
|
306 if has_headers: |
|
307 logging.info('The CSV file appears to have a header line, skipping.') |
|
308 self.reader.next() |
|
309 |
|
310 exhausted = False |
|
311 |
|
312 self.line_number = 1 |
|
313 self.column_count = None |
|
314 |
|
315 logging.info('Starting import; maximum %d entities per post', |
|
316 self.batch_size) |
|
317 |
|
318 state = None |
|
319 if self.progress_generator is not None: |
|
320 for progress_key, state, key_start, key_end in self.progress_generator: |
|
321 if key_start: |
|
322 try: |
|
323 self._AdvanceTo(key_start) |
|
324 self._ReadRows(key_start, key_end) |
|
325 yield self._MakeItem(key_start, |
|
326 key_end, |
|
327 self.read_rows, |
|
328 progress_key=progress_key) |
|
329 except StopIteration: |
|
330 logging.error('Mismatch between data file and progress database') |
|
331 raise ResumeError( |
|
332 'Mismatch between data file and progress database') |
|
333 elif state == DATA_CONSUMED_TO_HERE: |
|
334 try: |
|
335 self._AdvanceTo(key_end + 1) |
|
336 except StopIteration: |
|
337 state = None |
|
338 |
|
339 if self.progress_generator is None or state == DATA_CONSUMED_TO_HERE: |
|
340 while not exhausted: |
|
341 key_start = self.line_number |
|
342 key_end = self.line_number + self.batch_size - 1 |
|
343 try: |
|
344 self._ReadRows(key_start, key_end) |
|
345 except StopIteration: |
|
346 exhausted = True |
|
347 key_end = self.line_number - 1 |
|
348 if key_start <= key_end: |
|
349 yield self._MakeItem(key_start, key_end, self.read_rows) |
|
350 |
|
351 |
|
352 class ReQueue(object): |
|
353 """A special thread-safe queue. |
|
354 |
|
355 A ReQueue allows unfinished work items to be returned with a call to |
|
356 reput(). When an item is reput, task_done() should *not* be called |
|
357 in addition, getting an item that has been reput does not increase |
|
358 the number of outstanding tasks. |
|
359 |
|
360 This class shares an interface with Queue.Queue and provides the |
|
361 additional Reput method. |
|
362 """ |
|
363 |
|
364 def __init__(self, |
|
365 queue_capacity, |
|
366 requeue_capacity=None, |
|
367 queue_factory=Queue.Queue, |
|
368 get_time=time.time): |
|
369 """Initialize a ReQueue instance. |
|
370 |
|
371 Args: |
|
372 queue_capacity: The number of items that can be put in the ReQueue. |
|
373 requeue_capacity: The numer of items that can be reput in the ReQueue. |
|
374 queue_factory: Used for dependency injection. |
|
375 get_time: Used for dependency injection. |
|
376 """ |
|
377 if requeue_capacity is None: |
|
378 requeue_capacity = queue_capacity |
|
379 |
|
380 self.get_time = get_time |
|
381 self.queue = queue_factory(queue_capacity) |
|
382 self.requeue = queue_factory(requeue_capacity) |
|
383 self.lock = threading.Lock() |
|
384 self.put_cond = threading.Condition(self.lock) |
|
385 self.get_cond = threading.Condition(self.lock) |
|
386 |
|
387 def _DoWithTimeout(self, |
|
388 action, |
|
389 exc, |
|
390 wait_cond, |
|
391 done_cond, |
|
392 lock, |
|
393 timeout=None, |
|
394 block=True): |
|
395 """Performs the given action with a timeout. |
|
396 |
|
397 The action must be non-blocking, and raise an instance of exc on a |
|
398 recoverable failure. If the action fails with an instance of exc, |
|
399 we wait on wait_cond before trying again. Failure after the |
|
400 timeout is reached is propagated as an exception. Success is |
|
401 signalled by notifying on done_cond and returning the result of |
|
402 the action. If action raises any exception besides an instance of |
|
403 exc, it is immediately propagated. |
|
404 |
|
405 Args: |
|
406 action: A callable that performs a non-blocking action. |
|
407 exc: An exception type that is thrown by the action to indicate |
|
408 a recoverable error. |
|
409 wait_cond: A condition variable which should be waited on when |
|
410 action throws exc. |
|
411 done_cond: A condition variable to signal if the action returns. |
|
412 lock: The lock used by wait_cond and done_cond. |
|
413 timeout: A non-negative float indicating the maximum time to wait. |
|
414 block: Whether to block if the action cannot complete immediately. |
|
415 |
|
416 Returns: |
|
417 The result of the action, if it is successful. |
|
418 |
|
419 Raises: |
|
420 ValueError: If the timeout argument is negative. |
|
421 """ |
|
422 if timeout is not None and timeout < 0.0: |
|
423 raise ValueError('\'timeout\' must not be a negative number') |
|
424 if not block: |
|
425 timeout = 0.0 |
|
426 result = None |
|
427 success = False |
|
428 start_time = self.get_time() |
|
429 lock.acquire() |
|
430 try: |
|
431 while not success: |
|
432 try: |
|
433 result = action() |
|
434 success = True |
|
435 except Exception, e: |
|
436 if not isinstance(e, exc): |
|
437 raise e |
|
438 if timeout is not None: |
|
439 elapsed_time = self.get_time() - start_time |
|
440 timeout -= elapsed_time |
|
441 if timeout <= 0.0: |
|
442 raise e |
|
443 wait_cond.wait(timeout) |
|
444 finally: |
|
445 if success: |
|
446 done_cond.notify() |
|
447 lock.release() |
|
448 return result |
|
449 |
|
450 def put(self, item, block=True, timeout=None): |
|
451 """Put an item into the requeue. |
|
452 |
|
453 Args: |
|
454 item: An item to add to the requeue. |
|
455 block: Whether to block if the requeue is full. |
|
456 timeout: Maximum on how long to wait until the queue is non-full. |
|
457 |
|
458 Raises: |
|
459 Queue.Full if the queue is full and the timeout expires. |
|
460 """ |
|
461 def PutAction(): |
|
462 self.queue.put(item, block=False) |
|
463 self._DoWithTimeout(PutAction, |
|
464 Queue.Full, |
|
465 self.get_cond, |
|
466 self.put_cond, |
|
467 self.lock, |
|
468 timeout=timeout, |
|
469 block=block) |
|
470 |
|
471 def reput(self, item, block=True, timeout=None): |
|
472 """Re-put an item back into the requeue. |
|
473 |
|
474 Re-putting an item does not increase the number of outstanding |
|
475 tasks, so the reput item should be uniquely associated with an |
|
476 item that was previously removed from the requeue and for which |
|
477 task_done has not been called. |
|
478 |
|
479 Args: |
|
480 item: An item to add to the requeue. |
|
481 block: Whether to block if the requeue is full. |
|
482 timeout: Maximum on how long to wait until the queue is non-full. |
|
483 |
|
484 Raises: |
|
485 Queue.Full is the queue is full and the timeout expires. |
|
486 """ |
|
487 def ReputAction(): |
|
488 self.requeue.put(item, block=False) |
|
489 self._DoWithTimeout(ReputAction, |
|
490 Queue.Full, |
|
491 self.get_cond, |
|
492 self.put_cond, |
|
493 self.lock, |
|
494 timeout=timeout, |
|
495 block=block) |
|
496 |
|
497 def get(self, block=True, timeout=None): |
|
498 """Get an item from the requeue. |
|
499 |
|
500 Args: |
|
501 block: Whether to block if the requeue is empty. |
|
502 timeout: Maximum on how long to wait until the requeue is non-empty. |
|
503 |
|
504 Returns: |
|
505 An item from the requeue. |
|
506 |
|
507 Raises: |
|
508 Queue.Empty if the queue is empty and the timeout expires. |
|
509 """ |
|
510 def GetAction(): |
|
511 try: |
|
512 result = self.requeue.get(block=False) |
|
513 self.requeue.task_done() |
|
514 except Queue.Empty: |
|
515 result = self.queue.get(block=False) |
|
516 return result |
|
517 return self._DoWithTimeout(GetAction, |
|
518 Queue.Empty, |
|
519 self.put_cond, |
|
520 self.get_cond, |
|
521 self.lock, |
|
522 timeout=timeout, |
|
523 block=block) |
|
524 |
|
525 def join(self): |
|
526 """Blocks until all of the items in the requeue have been processed.""" |
|
527 self.queue.join() |
|
528 |
|
529 def task_done(self): |
|
530 """Indicate that a previously enqueued item has been fully processed.""" |
|
531 self.queue.task_done() |
|
532 |
|
533 def empty(self): |
|
534 """Returns true if the requeue is empty.""" |
|
535 return self.queue.empty() and self.requeue.empty() |
|
536 |
|
537 def get_nowait(self): |
|
538 """Try to get an item from the queue without blocking.""" |
|
539 return self.get(block=False) |
|
540 |
|
541 |
|
542 class ThrottleHandler(urllib2.BaseHandler): |
|
543 """A urllib2 handler for http and https requests that adds to a throttle.""" |
|
544 |
|
545 def __init__(self, throttle): |
|
546 """Initialize a ThrottleHandler. |
|
547 |
|
548 Args: |
|
549 throttle: A Throttle instance to call for bandwidth and http/https request |
|
550 throttling. |
|
551 """ |
|
552 self.throttle = throttle |
|
553 |
|
554 def AddRequest(self, throttle_name, req): |
|
555 """Add to bandwidth throttle for given request. |
|
556 |
|
557 Args: |
|
558 throttle_name: The name of the bandwidth throttle to add to. |
|
559 req: The request whose size will be added to the throttle. |
|
560 """ |
|
561 size = 0 |
|
562 for key, value in req.headers.iteritems(): |
|
563 size += len('%s: %s\n' % (key, value)) |
|
564 for key, value in req.unredirected_hdrs.iteritems(): |
|
565 size += len('%s: %s\n' % (key, value)) |
|
566 (unused_scheme, |
|
567 unused_host_port, url_path, |
|
568 unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url()) |
|
569 size += len('%s %s HTTP/1.1\n' % (req.get_method(), url_path)) |
|
570 data = req.get_data() |
|
571 if data: |
|
572 size += len(data) |
|
573 self.throttle.AddTransfer(throttle_name, size) |
|
574 |
|
575 def AddResponse(self, throttle_name, res): |
|
576 """Add to bandwidth throttle for given response. |
|
577 |
|
578 Args: |
|
579 throttle_name: The name of the bandwidth throttle to add to. |
|
580 res: The response whose size will be added to the throttle. |
|
581 """ |
|
582 content = res.read() |
|
583 def ReturnContent(): |
|
584 return content |
|
585 res.read = ReturnContent |
|
586 size = len(content) |
|
587 headers = res.info() |
|
588 for key, value in headers.items(): |
|
589 size += len('%s: %s\n' % (key, value)) |
|
590 self.throttle.AddTransfer(throttle_name, size) |
|
591 |
|
592 def http_request(self, req): |
|
593 """Process an HTTP request. |
|
594 |
|
595 If the throttle is over quota, sleep first. Then add request size to |
|
596 throttle before returning it to be sent. |
|
597 |
|
598 Args: |
|
599 req: A urllib2.Request object. |
|
600 |
|
601 Returns: |
|
602 The request passed in. |
|
603 """ |
|
604 self.throttle.Sleep() |
|
605 self.AddRequest(BANDWIDTH_UP, req) |
|
606 return req |
|
607 |
|
608 def https_request(self, req): |
|
609 """Process an HTTPS request. |
|
610 |
|
611 If the throttle is over quota, sleep first. Then add request size to |
|
612 throttle before returning it to be sent. |
|
613 |
|
614 Args: |
|
615 req: A urllib2.Request object. |
|
616 |
|
617 Returns: |
|
618 The request passed in. |
|
619 """ |
|
620 self.throttle.Sleep() |
|
621 self.AddRequest(HTTPS_BANDWIDTH_UP, req) |
|
622 return req |
|
623 |
|
624 def http_response(self, unused_req, res): |
|
625 """Process an HTTP response. |
|
626 |
|
627 The size of the response is added to the bandwidth throttle and the request |
|
628 throttle is incremented by one. |
|
629 |
|
630 Args: |
|
631 unused_req: The urllib2 request for this response. |
|
632 res: A urllib2 response object. |
|
633 |
|
634 Returns: |
|
635 The response passed in. |
|
636 """ |
|
637 self.AddResponse(BANDWIDTH_DOWN, res) |
|
638 self.throttle.AddTransfer(REQUESTS, 1) |
|
639 return res |
|
640 |
|
641 def https_response(self, unused_req, res): |
|
642 """Process an HTTPS response. |
|
643 |
|
644 The size of the response is added to the bandwidth throttle and the request |
|
645 throttle is incremented by one. |
|
646 |
|
647 Args: |
|
648 unused_req: The urllib2 request for this response. |
|
649 res: A urllib2 response object. |
|
650 |
|
651 Returns: |
|
652 The response passed in. |
|
653 """ |
|
654 self.AddResponse(HTTPS_BANDWIDTH_DOWN, res) |
|
655 self.throttle.AddTransfer(HTTPS_REQUESTS, 1) |
|
656 return res |
|
657 |
|
658 |
|
659 class ThrottledHttpRpcServer(appengine_rpc.HttpRpcServer): |
|
660 """Provides a simplified RPC-style interface for HTTP requests. |
|
661 |
|
662 This RPC server uses a Throttle to prevent exceeding quotas. |
|
663 """ |
|
664 |
|
665 def __init__(self, throttle, request_manager, *args, **kwargs): |
|
666 """Initialize a ThrottledHttpRpcServer. |
|
667 |
|
668 Also sets request_manager.rpc_server to the ThrottledHttpRpcServer instance. |
|
669 |
|
670 Args: |
|
671 throttle: A Throttles instance. |
|
672 request_manager: A RequestManager instance. |
|
673 args: Positional arguments to pass through to |
|
674 appengine_rpc.HttpRpcServer.__init__ |
|
675 kwargs: Keyword arguments to pass through to |
|
676 appengine_rpc.HttpRpcServer.__init__ |
|
677 """ |
|
678 self.throttle = throttle |
|
679 appengine_rpc.HttpRpcServer.__init__(self, *args, **kwargs) |
|
680 request_manager.rpc_server = self |
|
681 |
|
682 def _GetOpener(self): |
|
683 """Returns an OpenerDirector that supports cookies and ignores redirects. |
|
684 |
|
685 Returns: |
|
686 A urllib2.OpenerDirector object. |
|
687 """ |
|
688 opener = appengine_rpc.HttpRpcServer._GetOpener(self) |
|
689 opener.add_handler(ThrottleHandler(self.throttle)) |
|
690 |
|
691 return opener |
|
692 |
|
693 |
|
694 def ThrottledHttpRpcServerFactory(throttle, request_manager): |
|
695 """Create a factory to produce ThrottledHttpRpcServer for a given throttle. |
|
696 |
|
697 Args: |
|
698 throttle: A Throttle instance to use for the ThrottledHttpRpcServer. |
|
699 request_manager: A RequestManager instance. |
|
700 |
|
701 Returns: |
|
702 A factory to produce a ThrottledHttpRpcServer. |
|
703 """ |
|
704 def MakeRpcServer(*args, **kwargs): |
|
705 kwargs['account_type'] = 'HOSTED_OR_GOOGLE' |
|
706 kwargs['save_cookies'] = True |
|
707 return ThrottledHttpRpcServer(throttle, request_manager, *args, **kwargs) |
|
708 return MakeRpcServer |
|
709 |
|
710 |
|
711 class RequestManager(object): |
|
712 """A class which wraps a connection to the server.""" |
|
713 |
|
714 source = 'google-bulkloader-%s' % UPLOADER_VERSION |
|
715 user_agent = source |
|
716 |
|
717 def __init__(self, |
|
718 app_id, |
|
719 host_port, |
|
720 url_path, |
|
721 kind, |
|
722 throttle): |
|
723 """Initialize a RequestManager object. |
|
724 |
|
725 Args: |
|
726 app_id: String containing the application id for requests. |
|
727 host_port: String containing the "host:port" pair; the port is optional. |
|
728 url_path: partial URL (path) to post entity data to. |
|
729 kind: Kind of the Entity records being posted. |
|
730 throttle: A Throttle instance. |
|
731 """ |
|
732 self.app_id = app_id |
|
733 self.host_port = host_port |
|
734 self.host = host_port.split(':')[0] |
|
735 if url_path and url_path[0] != '/': |
|
736 url_path = '/' + url_path |
|
737 self.url_path = url_path |
|
738 self.kind = kind |
|
739 self.throttle = throttle |
|
740 self.credentials = None |
|
741 throttled_rpc_server_factory = ThrottledHttpRpcServerFactory( |
|
742 self.throttle, self) |
|
743 logging.debug('Configuring remote_api. app_id = %s, url_path = %s, ' |
|
744 'servername = %s' % (app_id, url_path, host_port)) |
|
745 remote_api_stub.ConfigureRemoteDatastore( |
|
746 app_id, |
|
747 url_path, |
|
748 self.AuthFunction, |
|
749 servername=host_port, |
|
750 rpc_server_factory=throttled_rpc_server_factory) |
|
751 self.authenticated = False |
|
752 |
|
753 def Authenticate(self): |
|
754 """Invoke authentication if necessary.""" |
|
755 self.rpc_server.Send(self.url_path, payload=None) |
|
756 self.authenticated = True |
|
757 |
|
758 def AuthFunction(self, |
|
759 raw_input_fn=raw_input, |
|
760 password_input_fn=getpass.getpass): |
|
761 """Prompts the user for a username and password. |
|
762 |
|
763 Caches the results the first time it is called and returns the |
|
764 same result every subsequent time. |
|
765 |
|
766 Args: |
|
767 raw_input_fn: Used for dependency injection. |
|
768 password_input_fn: Used for dependency injection. |
|
769 |
|
770 Returns: |
|
771 A pair of the username and password. |
|
772 """ |
|
773 if self.credentials is not None: |
|
774 return self.credentials |
|
775 print 'Please enter login credentials for %s (%s)' % ( |
|
776 self.host, self.app_id) |
|
777 email = raw_input_fn('Email: ') |
|
778 if email: |
|
779 password_prompt = 'Password for %s: ' % email |
|
780 password = password_input_fn(password_prompt) |
|
781 else: |
|
782 password = None |
|
783 self.credentials = (email, password) |
|
784 return self.credentials |
|
785 |
|
786 def _GetHeaders(self): |
|
787 """Constructs a dictionary of extra headers to send with a request.""" |
|
788 headers = { |
|
789 'GAE-Uploader-Version': UPLOADER_VERSION, |
|
790 'GAE-Uploader-Kind': self.kind |
|
791 } |
|
792 return headers |
|
793 |
|
794 def EncodeContent(self, rows): |
|
795 """Encodes row data to the wire format. |
|
796 |
|
797 Args: |
|
798 rows: A list of pairs of a line number and a list of column values. |
|
799 |
|
800 Returns: |
|
801 A list of db.Model instances. |
|
802 """ |
|
803 try: |
|
804 loader = Loader.RegisteredLoaders()[self.kind] |
|
805 except KeyError: |
|
806 logging.error('No Loader defined for kind %s.' % self.kind) |
|
807 raise ConfigurationError('No Loader defined for kind %s.' % self.kind) |
|
808 entities = [] |
|
809 for line_number, values in rows: |
|
810 key = loader.GenerateKey(line_number, values) |
|
811 entity = loader.CreateEntity(values, key_name=key) |
|
812 entities.extend(entity) |
|
813 |
|
814 return entities |
|
815 |
|
816 def PostEntities(self, item): |
|
817 """Posts Entity records to a remote endpoint over HTTP. |
|
818 |
|
819 Args: |
|
820 item: A workitem containing the entities to post. |
|
821 |
|
822 Returns: |
|
823 A pair of the estimated size of the request in bytes and the response |
|
824 from the server as a str. |
|
825 """ |
|
826 entities = item.content |
|
827 db.put(entities) |
|
828 |
|
829 |
|
830 class WorkItem(object): |
|
831 """Holds a unit of uploading work. |
|
832 |
|
833 A WorkItem represents a number of entities that need to be uploaded to |
|
834 Google App Engine. These entities are encoded in the "content" field of |
|
835 the WorkItem, and will be POST'd as-is to the server. |
|
836 |
|
837 The entities are identified by a range of numeric keys, inclusively. In |
|
838 the case of a resumption of an upload, or a replay to correct errors, |
|
839 these keys must be able to identify the same set of entities. |
|
840 |
|
841 Note that keys specify a range. The entities do not have to sequentially |
|
842 fill the entire range, they must simply bound a range of valid keys. |
|
843 """ |
|
844 |
|
845 def __init__(self, progress_queue, rows, key_start, key_end, |
|
846 progress_key=None): |
|
847 """Initialize the WorkItem instance. |
|
848 |
|
849 Args: |
|
850 progress_queue: A queue used for tracking progress information. |
|
851 rows: A list of pairs of a line number and a list of column values |
|
852 key_start: The (numeric) starting key, inclusive. |
|
853 key_end: The (numeric) ending key, inclusive. |
|
854 progress_key: If this WorkItem represents state from a prior run, |
|
855 then this will be the key within the progress database. |
|
856 """ |
|
857 self.state = STATE_READ |
|
858 |
|
859 self.progress_queue = progress_queue |
|
860 |
|
861 assert isinstance(key_start, (int, long)) |
|
862 assert isinstance(key_end, (int, long)) |
|
863 assert key_start <= key_end |
|
864 |
|
865 self.key_start = key_start |
|
866 self.key_end = key_end |
|
867 self.progress_key = progress_key |
|
868 |
|
869 self.progress_event = threading.Event() |
|
870 |
|
871 self.rows = rows |
|
872 self.content = None |
|
873 self.count = len(rows) |
|
874 |
|
875 def MarkAsRead(self): |
|
876 """Mark this WorkItem as read/consumed from the data source.""" |
|
877 |
|
878 assert self.state == STATE_READ |
|
879 |
|
880 self._StateTransition(STATE_READ, blocking=True) |
|
881 |
|
882 assert self.progress_key is not None |
|
883 |
|
884 def MarkAsSending(self): |
|
885 """Mark this WorkItem as in-process on being uploaded to the server.""" |
|
886 |
|
887 assert self.state == STATE_READ or self.state == STATE_NOT_SENT |
|
888 assert self.progress_key is not None |
|
889 |
|
890 self._StateTransition(STATE_SENDING, blocking=True) |
|
891 |
|
892 def MarkAsSent(self): |
|
893 """Mark this WorkItem as sucessfully-sent to the server.""" |
|
894 |
|
895 assert self.state == STATE_SENDING |
|
896 assert self.progress_key is not None |
|
897 |
|
898 self._StateTransition(STATE_SENT, blocking=False) |
|
899 |
|
900 def MarkAsError(self): |
|
901 """Mark this WorkItem as required manual error recovery.""" |
|
902 |
|
903 assert self.state == STATE_SENDING |
|
904 assert self.progress_key is not None |
|
905 |
|
906 self._StateTransition(STATE_NOT_SENT, blocking=True) |
|
907 |
|
908 def _StateTransition(self, new_state, blocking=False): |
|
909 """Transition the work item to a new state, storing progress information. |
|
910 |
|
911 Args: |
|
912 new_state: The state to transition to. |
|
913 blocking: Whether to block for the progress thread to acknowledge the |
|
914 transition. |
|
915 """ |
|
916 logging.debug('[%s-%s] %s' % |
|
917 (self.key_start, self.key_end, StateMessage(self.state))) |
|
918 assert not self.progress_event.isSet() |
|
919 |
|
920 self.state = new_state |
|
921 |
|
922 self.progress_queue.put(self) |
|
923 |
|
924 if blocking: |
|
925 self.progress_event.wait() |
|
926 |
|
927 self.progress_event.clear() |
|
928 |
|
929 |
|
930 |
|
931 def InterruptibleSleep(sleep_time): |
|
932 """Puts thread to sleep, checking this threads exit_flag twice a second. |
|
933 |
|
934 Args: |
|
935 sleep_time: Time to sleep. |
|
936 """ |
|
937 slept = 0.0 |
|
938 epsilon = .0001 |
|
939 thread = threading.currentThread() |
|
940 while slept < sleep_time - epsilon: |
|
941 remaining = sleep_time - slept |
|
942 this_sleep_time = min(remaining, 0.5) |
|
943 time.sleep(this_sleep_time) |
|
944 slept += this_sleep_time |
|
945 if thread.exit_flag: |
|
946 return |
|
947 |
|
948 |
|
949 class ThreadGate(object): |
|
950 """Manage the number of active worker threads. |
|
951 |
|
952 The ThreadGate limits the number of threads that are simultaneously |
|
953 uploading batches of records in order to implement adaptive rate |
|
954 control. The number of simultaneous upload threads that it takes to |
|
955 start causing timeout varies widely over the course of the day, so |
|
956 adaptive rate control allows the uploader to do many uploads while |
|
957 reducing the error rate and thus increasing the throughput. |
|
958 |
|
959 Initially the ThreadGate allows only one uploader thread to be active. |
|
960 For each successful upload, another thread is activated and for each |
|
961 failed upload, the number of active threads is reduced by one. |
|
962 """ |
|
963 |
|
964 def __init__(self, enabled, sleep=InterruptibleSleep): |
|
965 self.enabled = enabled |
|
966 self.enabled_count = 1 |
|
967 self.lock = threading.Lock() |
|
968 self.thread_semaphore = threading.Semaphore(self.enabled_count) |
|
969 self._threads = [] |
|
970 self.backoff_time = 0 |
|
971 self.sleep = sleep |
|
972 |
|
973 def Register(self, thread): |
|
974 """Register a thread with the thread gate.""" |
|
975 self._threads.append(thread) |
|
976 |
|
977 def Threads(self): |
|
978 """Yields the registered threads.""" |
|
979 for thread in self._threads: |
|
980 yield thread |
|
981 |
|
982 def EnableThread(self): |
|
983 """Enable one more worker thread.""" |
|
984 self.lock.acquire() |
|
985 try: |
|
986 self.enabled_count += 1 |
|
987 finally: |
|
988 self.lock.release() |
|
989 self.thread_semaphore.release() |
|
990 |
|
991 def EnableAllThreads(self): |
|
992 """Enable all worker threads.""" |
|
993 for unused_idx in range(len(self._threads) - self.enabled_count): |
|
994 self.EnableThread() |
|
995 |
|
996 def StartWork(self): |
|
997 """Starts a critical section in which the number of workers is limited. |
|
998 |
|
999 If thread throttling is enabled then this method starts a critical |
|
1000 section which allows self.enabled_count simultaneously operating |
|
1001 threads. The critical section is ended by calling self.FinishWork(). |
|
1002 """ |
|
1003 if self.enabled: |
|
1004 self.thread_semaphore.acquire() |
|
1005 if self.backoff_time > 0.0: |
|
1006 if not threading.currentThread().exit_flag: |
|
1007 logging.info('Backing off: %.1f seconds', |
|
1008 self.backoff_time) |
|
1009 self.sleep(self.backoff_time) |
|
1010 |
|
1011 def FinishWork(self): |
|
1012 """Ends a critical section started with self.StartWork().""" |
|
1013 if self.enabled: |
|
1014 self.thread_semaphore.release() |
|
1015 |
|
1016 def IncreaseWorkers(self): |
|
1017 """Informs the throttler that an item was successfully sent. |
|
1018 |
|
1019 If thread throttling is enabled, this method will cause an |
|
1020 additional thread to run in the critical section. |
|
1021 """ |
|
1022 if self.enabled: |
|
1023 if self.backoff_time > 0.0: |
|
1024 logging.info('Resetting backoff to 0.0') |
|
1025 self.backoff_time = 0.0 |
|
1026 do_enable = False |
|
1027 self.lock.acquire() |
|
1028 try: |
|
1029 if self.enabled and len(self._threads) > self.enabled_count: |
|
1030 do_enable = True |
|
1031 self.enabled_count += 1 |
|
1032 finally: |
|
1033 self.lock.release() |
|
1034 if do_enable: |
|
1035 self.thread_semaphore.release() |
|
1036 |
|
1037 def DecreaseWorkers(self): |
|
1038 """Informs the thread_gate that an item failed to send. |
|
1039 |
|
1040 If thread throttling is enabled, this method will cause the |
|
1041 throttler to allow one fewer thread in the critical section. If |
|
1042 there is only one thread remaining, failures will result in |
|
1043 exponential backoff until there is a success. |
|
1044 """ |
|
1045 if self.enabled: |
|
1046 do_disable = False |
|
1047 self.lock.acquire() |
|
1048 try: |
|
1049 if self.enabled: |
|
1050 if self.enabled_count > 1: |
|
1051 do_disable = True |
|
1052 self.enabled_count -= 1 |
|
1053 else: |
|
1054 if self.backoff_time == 0.0: |
|
1055 self.backoff_time = INITIAL_BACKOFF |
|
1056 else: |
|
1057 self.backoff_time *= BACKOFF_FACTOR |
|
1058 finally: |
|
1059 self.lock.release() |
|
1060 if do_disable: |
|
1061 self.thread_semaphore.acquire() |
|
1062 |
|
1063 |
|
1064 class Throttle(object): |
|
1065 """A base class for upload rate throttling. |
|
1066 |
|
1067 Transferring large number of records, too quickly, to an application |
|
1068 could trigger quota limits and cause the transfer process to halt. |
|
1069 In order to stay within the application's quota, we throttle the |
|
1070 data transfer to a specified limit (across all transfer threads). |
|
1071 This limit defaults to about half of the Google App Engine default |
|
1072 for an application, but can be manually adjusted faster/slower as |
|
1073 appropriate. |
|
1074 |
|
1075 This class tracks a moving average of some aspect of the transfer |
|
1076 rate (bandwidth, records per second, http connections per |
|
1077 second). It keeps two windows of counts of bytes transferred, on a |
|
1078 per-thread basis. One block is the "current" block, and the other is |
|
1079 the "prior" block. It will rotate the counts from current to prior |
|
1080 when ROTATE_PERIOD has passed. Thus, the current block will |
|
1081 represent from 0 seconds to ROTATE_PERIOD seconds of activity |
|
1082 (determined by: time.time() - self.last_rotate). The prior block |
|
1083 will always represent a full ROTATE_PERIOD. |
|
1084 |
|
1085 Sleeping is performed just before a transfer of another block, and is |
|
1086 based on the counts transferred *before* the next transfer. It really |
|
1087 does not matter how much will be transferred, but only that for all the |
|
1088 data transferred SO FAR that we have interspersed enough pauses to |
|
1089 ensure the aggregate transfer rate is within the specified limit. |
|
1090 |
|
1091 These counts are maintained on a per-thread basis, so we do not require |
|
1092 any interlocks around incrementing the counts. There IS an interlock on |
|
1093 the rotation of the counts because we do not want multiple threads to |
|
1094 multiply-rotate the counts. |
|
1095 |
|
1096 There are various race conditions in the computation and collection |
|
1097 of these counts. We do not require precise values, but simply to |
|
1098 keep the overall transfer within the bandwidth limits. If a given |
|
1099 pause is a little short, or a little long, then the aggregate delays |
|
1100 will be correct. |
|
1101 """ |
|
1102 |
|
1103 ROTATE_PERIOD = 600 |
|
1104 |
|
1105 def __init__(self, |
|
1106 get_time=time.time, |
|
1107 thread_sleep=InterruptibleSleep, |
|
1108 layout=None): |
|
1109 self.get_time = get_time |
|
1110 self.thread_sleep = thread_sleep |
|
1111 |
|
1112 self.start_time = get_time() |
|
1113 self.transferred = {} |
|
1114 self.prior_block = {} |
|
1115 self.totals = {} |
|
1116 self.throttles = {} |
|
1117 |
|
1118 self.last_rotate = {} |
|
1119 self.rotate_mutex = {} |
|
1120 if layout: |
|
1121 self.AddThrottles(layout) |
|
1122 |
|
1123 def AddThrottle(self, name, limit): |
|
1124 self.throttles[name] = limit |
|
1125 self.transferred[name] = {} |
|
1126 self.prior_block[name] = {} |
|
1127 self.totals[name] = {} |
|
1128 self.last_rotate[name] = self.get_time() |
|
1129 self.rotate_mutex[name] = threading.Lock() |
|
1130 |
|
1131 def AddThrottles(self, layout): |
|
1132 for key, value in layout.iteritems(): |
|
1133 self.AddThrottle(key, value) |
|
1134 |
|
1135 def Register(self, thread): |
|
1136 """Register this thread with the throttler.""" |
|
1137 thread_name = thread.getName() |
|
1138 for throttle_name in self.throttles.iterkeys(): |
|
1139 self.transferred[throttle_name][thread_name] = 0 |
|
1140 self.prior_block[throttle_name][thread_name] = 0 |
|
1141 self.totals[throttle_name][thread_name] = 0 |
|
1142 |
|
1143 def VerifyName(self, throttle_name): |
|
1144 if throttle_name not in self.throttles: |
|
1145 raise AssertionError('%s is not a registered throttle' % throttle_name) |
|
1146 |
|
1147 def AddTransfer(self, throttle_name, token_count): |
|
1148 """Add a count to the amount this thread has transferred. |
|
1149 |
|
1150 Each time a thread transfers some data, it should call this method to |
|
1151 note the amount sent. The counts may be rotated if sufficient time |
|
1152 has passed since the last rotation. |
|
1153 |
|
1154 Note: this method should only be called by the BulkLoaderThread |
|
1155 instances. The token count is allocated towards the |
|
1156 "current thread". |
|
1157 |
|
1158 Args: |
|
1159 throttle_name: The name of the throttle to add to. |
|
1160 token_count: The number to add to the throttle counter. |
|
1161 """ |
|
1162 self.VerifyName(throttle_name) |
|
1163 transferred = self.transferred[throttle_name] |
|
1164 transferred[threading.currentThread().getName()] += token_count |
|
1165 |
|
1166 if self.last_rotate[throttle_name] + self.ROTATE_PERIOD < self.get_time(): |
|
1167 self._RotateCounts(throttle_name) |
|
1168 |
|
1169 def Sleep(self, throttle_name=None): |
|
1170 """Possibly sleep in order to limit the transfer rate. |
|
1171 |
|
1172 Note that we sleep based on *prior* transfers rather than what we |
|
1173 may be about to transfer. The next transfer could put us under/over |
|
1174 and that will be rectified *after* that transfer. Net result is that |
|
1175 the average transfer rate will remain within bounds. Spiky behavior |
|
1176 or uneven rates among the threads could possibly bring the transfer |
|
1177 rate above the requested limit for short durations. |
|
1178 |
|
1179 Args: |
|
1180 throttle_name: The name of the throttle to sleep on. If None or |
|
1181 omitted, then sleep on all throttles. |
|
1182 """ |
|
1183 if throttle_name is None: |
|
1184 for throttle_name in self.throttles: |
|
1185 self.Sleep(throttle_name=throttle_name) |
|
1186 return |
|
1187 |
|
1188 self.VerifyName(throttle_name) |
|
1189 |
|
1190 thread = threading.currentThread() |
|
1191 |
|
1192 while True: |
|
1193 duration = self.get_time() - self.last_rotate[throttle_name] |
|
1194 |
|
1195 total = 0 |
|
1196 for count in self.prior_block[throttle_name].values(): |
|
1197 total += count |
|
1198 |
|
1199 if total: |
|
1200 duration += self.ROTATE_PERIOD |
|
1201 |
|
1202 for count in self.transferred[throttle_name].values(): |
|
1203 total += count |
|
1204 |
|
1205 sleep_time = (float(total) / self.throttles[throttle_name]) - duration |
|
1206 |
|
1207 if sleep_time < MINIMUM_THROTTLE_SLEEP_DURATION: |
|
1208 break |
|
1209 |
|
1210 logging.debug('[%s] Throttling on %s. Sleeping for %.1f ms ' |
|
1211 '(duration=%.1f ms, total=%d)', |
|
1212 thread.getName(), throttle_name, |
|
1213 sleep_time * 1000, duration * 1000, total) |
|
1214 self.thread_sleep(sleep_time) |
|
1215 if thread.exit_flag: |
|
1216 break |
|
1217 self._RotateCounts(throttle_name) |
|
1218 |
|
1219 def _RotateCounts(self, throttle_name): |
|
1220 """Rotate the transfer counters. |
|
1221 |
|
1222 If sufficient time has passed, then rotate the counters from active to |
|
1223 the prior-block of counts. |
|
1224 |
|
1225 This rotation is interlocked to ensure that multiple threads do not |
|
1226 over-rotate the counts. |
|
1227 |
|
1228 Args: |
|
1229 throttle_name: The name of the throttle to rotate. |
|
1230 """ |
|
1231 self.VerifyName(throttle_name) |
|
1232 self.rotate_mutex[throttle_name].acquire() |
|
1233 try: |
|
1234 next_rotate_time = self.last_rotate[throttle_name] + self.ROTATE_PERIOD |
|
1235 if next_rotate_time >= self.get_time(): |
|
1236 return |
|
1237 |
|
1238 for name, count in self.transferred[throttle_name].items(): |
|
1239 |
|
1240 |
|
1241 self.prior_block[throttle_name][name] = count |
|
1242 self.transferred[throttle_name][name] = 0 |
|
1243 |
|
1244 self.totals[throttle_name][name] += count |
|
1245 |
|
1246 self.last_rotate[throttle_name] = self.get_time() |
|
1247 |
|
1248 finally: |
|
1249 self.rotate_mutex[throttle_name].release() |
|
1250 |
|
1251 def TotalTransferred(self, throttle_name): |
|
1252 """Return the total transferred, and over what period. |
|
1253 |
|
1254 Args: |
|
1255 throttle_name: The name of the throttle to total. |
|
1256 |
|
1257 Returns: |
|
1258 A tuple of the total count and running time for the given throttle name. |
|
1259 """ |
|
1260 total = 0 |
|
1261 for count in self.totals[throttle_name].values(): |
|
1262 total += count |
|
1263 for count in self.transferred[throttle_name].values(): |
|
1264 total += count |
|
1265 return total, self.get_time() - self.start_time |
|
1266 |
|
1267 |
|
1268 class _ThreadBase(threading.Thread): |
|
1269 """Provide some basic features for the threads used in the uploader. |
|
1270 |
|
1271 This abstract base class is used to provide some common features: |
|
1272 |
|
1273 * Flag to ask thread to exit as soon as possible. |
|
1274 * Record exit/error status for the primary thread to pick up. |
|
1275 * Capture exceptions and record them for pickup. |
|
1276 * Some basic logging of thread start/stop. |
|
1277 * All threads are "daemon" threads. |
|
1278 * Friendly names for presenting to users. |
|
1279 |
|
1280 Concrete sub-classes must implement PerformWork(). |
|
1281 |
|
1282 Either self.NAME should be set or GetFriendlyName() be overridden to |
|
1283 return a human-friendly name for this thread. |
|
1284 |
|
1285 The run() method starts the thread and prints start/exit messages. |
|
1286 |
|
1287 self.exit_flag is intended to signal that this thread should exit |
|
1288 when it gets the chance. PerformWork() should check self.exit_flag |
|
1289 whenever it has the opportunity to exit gracefully. |
|
1290 """ |
|
1291 |
|
1292 def __init__(self): |
|
1293 threading.Thread.__init__(self) |
|
1294 |
|
1295 self.setDaemon(True) |
|
1296 |
|
1297 self.exit_flag = False |
|
1298 self.error = None |
|
1299 |
|
1300 def run(self): |
|
1301 """Perform the work of the thread.""" |
|
1302 logging.info('[%s] %s: started', self.getName(), self.__class__.__name__) |
|
1303 |
|
1304 try: |
|
1305 self.PerformWork() |
|
1306 except: |
|
1307 self.error = sys.exc_info()[1] |
|
1308 logging.exception('[%s] %s:', self.getName(), self.__class__.__name__) |
|
1309 |
|
1310 logging.info('[%s] %s: exiting', self.getName(), self.__class__.__name__) |
|
1311 |
|
1312 def PerformWork(self): |
|
1313 """Perform the thread-specific work.""" |
|
1314 raise NotImplementedError() |
|
1315 |
|
1316 def CheckError(self): |
|
1317 """If an error is present, then log it.""" |
|
1318 if self.error: |
|
1319 logging.error('Error in %s: %s', self.GetFriendlyName(), self.error) |
|
1320 |
|
1321 def GetFriendlyName(self): |
|
1322 """Returns a human-friendly description of the thread.""" |
|
1323 if hasattr(self, 'NAME'): |
|
1324 return self.NAME |
|
1325 return 'unknown thread' |
|
1326 |
|
1327 |
|
1328 class BulkLoaderThread(_ThreadBase): |
|
1329 """A thread which transmits entities to the server application. |
|
1330 |
|
1331 This thread will read WorkItem instances from the work_queue and upload |
|
1332 the entities to the server application. Progress information will be |
|
1333 pushed into the progress_queue as the work is being performed. |
|
1334 |
|
1335 If a BulkLoaderThread encounters a transient error, the entities will be |
|
1336 resent, if a fatal error is encoutered the BulkLoaderThread exits. |
|
1337 """ |
|
1338 |
|
1339 def __init__(self, |
|
1340 work_queue, |
|
1341 throttle, |
|
1342 thread_gate, |
|
1343 request_manager): |
|
1344 """Initialize the BulkLoaderThread instance. |
|
1345 |
|
1346 Args: |
|
1347 work_queue: A queue containing WorkItems for processing. |
|
1348 throttle: A Throttles to control upload bandwidth. |
|
1349 thread_gate: A ThreadGate to control number of simultaneous uploads. |
|
1350 request_manager: A RequestManager instance. |
|
1351 """ |
|
1352 _ThreadBase.__init__(self) |
|
1353 |
|
1354 self.work_queue = work_queue |
|
1355 self.throttle = throttle |
|
1356 self.thread_gate = thread_gate |
|
1357 |
|
1358 self.request_manager = request_manager |
|
1359 |
|
1360 def PerformWork(self): |
|
1361 """Perform the work of a BulkLoaderThread.""" |
|
1362 while not self.exit_flag: |
|
1363 success = False |
|
1364 self.thread_gate.StartWork() |
|
1365 try: |
|
1366 try: |
|
1367 item = self.work_queue.get(block=True, timeout=1.0) |
|
1368 except Queue.Empty: |
|
1369 continue |
|
1370 if item == _THREAD_SHOULD_EXIT: |
|
1371 break |
|
1372 |
|
1373 logging.debug('[%s] Got work item [%d-%d]', |
|
1374 self.getName(), item.key_start, item.key_end) |
|
1375 |
|
1376 try: |
|
1377 |
|
1378 item.MarkAsSending() |
|
1379 try: |
|
1380 if item.content is None: |
|
1381 item.content = self.request_manager.EncodeContent(item.rows) |
|
1382 try: |
|
1383 self.request_manager.PostEntities(item) |
|
1384 success = True |
|
1385 logging.debug( |
|
1386 '[%d-%d] Sent %d entities', |
|
1387 item.key_start, item.key_end, item.count) |
|
1388 self.throttle.AddTransfer(RECORDS, item.count) |
|
1389 except (db.InternalError, db.NotSavedError, db.Timeout), e: |
|
1390 logging.debug('Caught non-fatal error: %s', e) |
|
1391 except urllib2.HTTPError, e: |
|
1392 if e.code == 403 or (e.code >= 500 and e.code < 600): |
|
1393 logging.debug('Caught HTTP error %d', e.code) |
|
1394 logging.debug('%s', e.read()) |
|
1395 else: |
|
1396 raise e |
|
1397 |
|
1398 except: |
|
1399 self.error = sys.exc_info()[1] |
|
1400 logging.exception('[%s] %s: caught exception %s', self.getName(), |
|
1401 self.__class__.__name__, str(sys.exc_info())) |
|
1402 raise |
|
1403 |
|
1404 finally: |
|
1405 if success: |
|
1406 item.MarkAsSent() |
|
1407 self.thread_gate.IncreaseWorkers() |
|
1408 self.work_queue.task_done() |
|
1409 else: |
|
1410 item.MarkAsError() |
|
1411 self.thread_gate.DecreaseWorkers() |
|
1412 try: |
|
1413 self.work_queue.reput(item, block=False) |
|
1414 except Queue.Full: |
|
1415 logging.error('[%s] Failed to reput work item.', self.getName()) |
|
1416 raise Error('Failed to reput work item') |
|
1417 logging.info('[%d-%d] %s', |
|
1418 item.key_start, item.key_end, StateMessage(item.state)) |
|
1419 |
|
1420 finally: |
|
1421 self.thread_gate.FinishWork() |
|
1422 |
|
1423 |
|
1424 def GetFriendlyName(self): |
|
1425 """Returns a human-friendly name for this thread.""" |
|
1426 return 'worker [%s]' % self.getName() |
|
1427 |
|
1428 |
|
1429 class DataSourceThread(_ThreadBase): |
|
1430 """A thread which reads WorkItems and pushes them into queue. |
|
1431 |
|
1432 This thread will read/consume WorkItems from a generator (produced by |
|
1433 the generator factory). These WorkItems will then be pushed into the |
|
1434 work_queue. Note that reading will block if/when the work_queue becomes |
|
1435 full. Information on content consumed from the generator will be pushed |
|
1436 into the progress_queue. |
|
1437 """ |
|
1438 |
|
1439 NAME = 'data source thread' |
|
1440 |
|
1441 def __init__(self, |
|
1442 work_queue, |
|
1443 progress_queue, |
|
1444 workitem_generator_factory, |
|
1445 progress_generator_factory): |
|
1446 """Initialize the DataSourceThread instance. |
|
1447 |
|
1448 Args: |
|
1449 work_queue: A queue containing WorkItems for processing. |
|
1450 progress_queue: A queue used for tracking progress information. |
|
1451 workitem_generator_factory: A factory that creates a WorkItem generator |
|
1452 progress_generator_factory: A factory that creates a generator which |
|
1453 produces prior progress status, or None if there is no prior status |
|
1454 to use. |
|
1455 """ |
|
1456 _ThreadBase.__init__(self) |
|
1457 |
|
1458 self.work_queue = work_queue |
|
1459 self.progress_queue = progress_queue |
|
1460 self.workitem_generator_factory = workitem_generator_factory |
|
1461 self.progress_generator_factory = progress_generator_factory |
|
1462 self.entity_count = 0 |
|
1463 |
|
1464 def PerformWork(self): |
|
1465 """Performs the work of a DataSourceThread.""" |
|
1466 if self.progress_generator_factory: |
|
1467 progress_gen = self.progress_generator_factory() |
|
1468 else: |
|
1469 progress_gen = None |
|
1470 |
|
1471 content_gen = self.workitem_generator_factory(self.progress_queue, |
|
1472 progress_gen) |
|
1473 |
|
1474 self.sent_count = 0 |
|
1475 self.read_count = 0 |
|
1476 self.read_all = False |
|
1477 |
|
1478 for item in content_gen.Batches(): |
|
1479 item.MarkAsRead() |
|
1480 |
|
1481 while not self.exit_flag: |
|
1482 try: |
|
1483 self.work_queue.put(item, block=True, timeout=1.0) |
|
1484 self.entity_count += item.count |
|
1485 break |
|
1486 except Queue.Full: |
|
1487 pass |
|
1488 |
|
1489 if self.exit_flag: |
|
1490 break |
|
1491 |
|
1492 if not self.exit_flag: |
|
1493 self.read_all = True |
|
1494 self.read_count = content_gen.row_count |
|
1495 self.sent_count = content_gen.sent_count |
|
1496 |
|
1497 |
|
1498 |
|
1499 def _RunningInThread(thread): |
|
1500 """Return True if we are running within the specified thread.""" |
|
1501 return threading.currentThread().getName() == thread.getName() |
|
1502 |
|
1503 |
|
1504 class ProgressDatabase(object): |
|
1505 """Persistently record all progress information during an upload. |
|
1506 |
|
1507 This class wraps a very simple SQLite database which records each of |
|
1508 the relevant details from the WorkItem instances. If the uploader is |
|
1509 resumed, then data is replayed out of the database. |
|
1510 """ |
|
1511 |
|
1512 def __init__(self, db_filename, commit_periodicity=100): |
|
1513 """Initialize the ProgressDatabase instance. |
|
1514 |
|
1515 Args: |
|
1516 db_filename: The name of the SQLite database to use. |
|
1517 commit_periodicity: How many operations to perform between commits. |
|
1518 """ |
|
1519 self.db_filename = db_filename |
|
1520 |
|
1521 logging.info('Using progress database: %s', db_filename) |
|
1522 self.primary_conn = sqlite3.connect(db_filename, isolation_level=None) |
|
1523 self.primary_thread = threading.currentThread() |
|
1524 |
|
1525 self.progress_conn = None |
|
1526 self.progress_thread = None |
|
1527 |
|
1528 self.operation_count = 0 |
|
1529 self.commit_periodicity = commit_periodicity |
|
1530 |
|
1531 self.prior_key_end = None |
|
1532 |
|
1533 try: |
|
1534 self.primary_conn.execute( |
|
1535 """create table progress ( |
|
1536 id integer primary key autoincrement, |
|
1537 state integer not null, |
|
1538 key_start integer not null, |
|
1539 key_end integer not null |
|
1540 ) |
|
1541 """) |
|
1542 except sqlite3.OperationalError, e: |
|
1543 if 'already exists' not in e.message: |
|
1544 raise |
|
1545 |
|
1546 try: |
|
1547 self.primary_conn.execute('create index i_state on progress (state)') |
|
1548 except sqlite3.OperationalError, e: |
|
1549 if 'already exists' not in e.message: |
|
1550 raise |
|
1551 |
|
1552 def ThreadComplete(self): |
|
1553 """Finalize any operations the progress thread has performed. |
|
1554 |
|
1555 The database aggregates lots of operations into a single commit, and |
|
1556 this method is used to commit any pending operations as the thread |
|
1557 is about to shut down. |
|
1558 """ |
|
1559 if self.progress_conn: |
|
1560 self._MaybeCommit(force_commit=True) |
|
1561 |
|
1562 def _MaybeCommit(self, force_commit=False): |
|
1563 """Periodically commit changes into the SQLite database. |
|
1564 |
|
1565 Committing every operation is quite expensive, and slows down the |
|
1566 operation of the script. Thus, we only commit after every N operations, |
|
1567 as determined by the self.commit_periodicity value. Optionally, the |
|
1568 caller can force a commit. |
|
1569 |
|
1570 Args: |
|
1571 force_commit: Pass True in order for a commit to occur regardless |
|
1572 of the current operation count. |
|
1573 """ |
|
1574 self.operation_count += 1 |
|
1575 if force_commit or (self.operation_count % self.commit_periodicity) == 0: |
|
1576 self.progress_conn.commit() |
|
1577 |
|
1578 def _OpenProgressConnection(self): |
|
1579 """Possibly open a database connection for the progress tracker thread. |
|
1580 |
|
1581 If the connection is not open (for the calling thread, which is assumed |
|
1582 to be the progress tracker thread), then open it. We also open a couple |
|
1583 cursors for later use (and reuse). |
|
1584 """ |
|
1585 if self.progress_conn: |
|
1586 return |
|
1587 |
|
1588 assert not _RunningInThread(self.primary_thread) |
|
1589 |
|
1590 self.progress_thread = threading.currentThread() |
|
1591 |
|
1592 self.progress_conn = sqlite3.connect(self.db_filename) |
|
1593 |
|
1594 self.insert_cursor = self.progress_conn.cursor() |
|
1595 self.update_cursor = self.progress_conn.cursor() |
|
1596 |
|
1597 def HasUnfinishedWork(self): |
|
1598 """Returns True if the database has progress information. |
|
1599 |
|
1600 Note there are two basic cases for progress information: |
|
1601 1) All saved records indicate a successful upload. In this case, we |
|
1602 need to skip everything transmitted so far and then send the rest. |
|
1603 2) Some records for incomplete transfer are present. These need to be |
|
1604 sent again, and then we resume sending after all the successful |
|
1605 data. |
|
1606 |
|
1607 Returns: |
|
1608 True if the database has progress information, False otherwise. |
|
1609 |
|
1610 Raises: |
|
1611 ResumeError: If there is an error reading the progress database. |
|
1612 """ |
|
1613 assert _RunningInThread(self.primary_thread) |
|
1614 |
|
1615 cursor = self.primary_conn.cursor() |
|
1616 cursor.execute('select count(*) from progress') |
|
1617 row = cursor.fetchone() |
|
1618 if row is None: |
|
1619 raise ResumeError('Error reading progress information.') |
|
1620 |
|
1621 return row[0] != 0 |
|
1622 |
|
1623 def StoreKeys(self, key_start, key_end): |
|
1624 """Record a new progress record, returning a key for later updates. |
|
1625 |
|
1626 The specified progress information will be persisted into the database. |
|
1627 A unique key will be returned that identifies this progress state. The |
|
1628 key is later used to (quickly) update this record. |
|
1629 |
|
1630 For the progress resumption to proceed properly, calls to StoreKeys |
|
1631 MUST specify monotonically increasing key ranges. This will result in |
|
1632 a database whereby the ID, KEY_START, and KEY_END rows are all |
|
1633 increasing (rather than having ranges out of order). |
|
1634 |
|
1635 NOTE: the above precondition is NOT tested by this method (since it |
|
1636 would imply an additional table read or two on each invocation). |
|
1637 |
|
1638 Args: |
|
1639 key_start: The starting key of the WorkItem (inclusive) |
|
1640 key_end: The end key of the WorkItem (inclusive) |
|
1641 |
|
1642 Returns: |
|
1643 A string to later be used as a unique key to update this state. |
|
1644 """ |
|
1645 self._OpenProgressConnection() |
|
1646 |
|
1647 assert _RunningInThread(self.progress_thread) |
|
1648 assert isinstance(key_start, int) |
|
1649 assert isinstance(key_end, int) |
|
1650 assert key_start <= key_end |
|
1651 |
|
1652 if self.prior_key_end is not None: |
|
1653 assert key_start > self.prior_key_end |
|
1654 self.prior_key_end = key_end |
|
1655 |
|
1656 self.insert_cursor.execute( |
|
1657 'insert into progress (state, key_start, key_end) values (?, ?, ?)', |
|
1658 (STATE_READ, key_start, key_end)) |
|
1659 |
|
1660 progress_key = self.insert_cursor.lastrowid |
|
1661 |
|
1662 self._MaybeCommit() |
|
1663 |
|
1664 return progress_key |
|
1665 |
|
1666 def UpdateState(self, key, new_state): |
|
1667 """Update a specified progress record with new information. |
|
1668 |
|
1669 Args: |
|
1670 key: The key for this progress record, returned from StoreKeys |
|
1671 new_state: The new state to associate with this progress record. |
|
1672 """ |
|
1673 self._OpenProgressConnection() |
|
1674 |
|
1675 assert _RunningInThread(self.progress_thread) |
|
1676 assert isinstance(new_state, int) |
|
1677 |
|
1678 self.update_cursor.execute('update progress set state=? where id=?', |
|
1679 (new_state, key)) |
|
1680 |
|
1681 self._MaybeCommit() |
|
1682 |
|
1683 def GetProgressStatusGenerator(self): |
|
1684 """Get a generator which returns progress information. |
|
1685 |
|
1686 The returned generator will yield a series of 4-tuples that specify |
|
1687 progress information about a prior run of the uploader. The 4-tuples |
|
1688 have the following values: |
|
1689 |
|
1690 progress_key: The unique key to later update this record with new |
|
1691 progress information. |
|
1692 state: The last state saved for this progress record. |
|
1693 key_start: The starting key of the items for uploading (inclusive). |
|
1694 key_end: The ending key of the items for uploading (inclusive). |
|
1695 |
|
1696 After all incompletely-transferred records are provided, then one |
|
1697 more 4-tuple will be generated: |
|
1698 |
|
1699 None |
|
1700 DATA_CONSUMED_TO_HERE: A unique string value indicating this record |
|
1701 is being provided. |
|
1702 None |
|
1703 key_end: An integer value specifying the last data source key that |
|
1704 was handled by the previous run of the uploader. |
|
1705 |
|
1706 The caller should begin uploading records which occur after key_end. |
|
1707 |
|
1708 Yields: |
|
1709 Progress information as tuples (progress_key, state, key_start, key_end). |
|
1710 """ |
|
1711 conn = sqlite3.connect(self.db_filename, isolation_level=None) |
|
1712 cursor = conn.cursor() |
|
1713 |
|
1714 cursor.execute('select max(id) from progress') |
|
1715 batch_id = cursor.fetchone()[0] |
|
1716 |
|
1717 cursor.execute('select key_end from progress where id = ?', (batch_id,)) |
|
1718 key_end = cursor.fetchone()[0] |
|
1719 |
|
1720 self.prior_key_end = key_end |
|
1721 |
|
1722 cursor.execute( |
|
1723 'select id, state, key_start, key_end from progress' |
|
1724 ' where state != ?' |
|
1725 ' order by id', |
|
1726 (STATE_SENT,)) |
|
1727 |
|
1728 rows = cursor.fetchall() |
|
1729 |
|
1730 for row in rows: |
|
1731 if row is None: |
|
1732 break |
|
1733 |
|
1734 yield row |
|
1735 |
|
1736 yield None, DATA_CONSUMED_TO_HERE, None, key_end |
|
1737 |
|
1738 |
|
1739 class StubProgressDatabase(object): |
|
1740 """A stub implementation of ProgressDatabase which does nothing.""" |
|
1741 |
|
1742 def HasUnfinishedWork(self): |
|
1743 """Whether the stub database has progress information (it doesn't).""" |
|
1744 return False |
|
1745 |
|
1746 def StoreKeys(self, unused_key_start, unused_key_end): |
|
1747 """Pretend to store a key in the stub database.""" |
|
1748 return 'fake-key' |
|
1749 |
|
1750 def UpdateState(self, unused_key, unused_new_state): |
|
1751 """Pretend to update the state of a progress item.""" |
|
1752 pass |
|
1753 |
|
1754 def ThreadComplete(self): |
|
1755 """Finalize operations on the stub database (i.e. do nothing).""" |
|
1756 pass |
|
1757 |
|
1758 |
|
1759 class ProgressTrackerThread(_ThreadBase): |
|
1760 """A thread which records progress information for the upload process. |
|
1761 |
|
1762 The progress information is stored into the provided progress database. |
|
1763 This class is not responsible for replaying a prior run's progress |
|
1764 information out of the database. Separate mechanisms must be used to |
|
1765 resume a prior upload attempt. |
|
1766 """ |
|
1767 |
|
1768 NAME = 'progress tracking thread' |
|
1769 |
|
1770 def __init__(self, progress_queue, progress_db): |
|
1771 """Initialize the ProgressTrackerThread instance. |
|
1772 |
|
1773 Args: |
|
1774 progress_queue: A Queue used for tracking progress information. |
|
1775 progress_db: The database for tracking progress information; should |
|
1776 be an instance of ProgressDatabase. |
|
1777 """ |
|
1778 _ThreadBase.__init__(self) |
|
1779 |
|
1780 self.progress_queue = progress_queue |
|
1781 self.db = progress_db |
|
1782 self.entities_sent = 0 |
|
1783 |
|
1784 def PerformWork(self): |
|
1785 """Performs the work of a ProgressTrackerThread.""" |
|
1786 while not self.exit_flag: |
|
1787 try: |
|
1788 item = self.progress_queue.get(block=True, timeout=1.0) |
|
1789 except Queue.Empty: |
|
1790 continue |
|
1791 if item == _THREAD_SHOULD_EXIT: |
|
1792 break |
|
1793 |
|
1794 if item.state == STATE_READ and item.progress_key is None: |
|
1795 item.progress_key = self.db.StoreKeys(item.key_start, item.key_end) |
|
1796 else: |
|
1797 assert item.progress_key is not None |
|
1798 |
|
1799 self.db.UpdateState(item.progress_key, item.state) |
|
1800 if item.state == STATE_SENT: |
|
1801 self.entities_sent += item.count |
|
1802 |
|
1803 item.progress_event.set() |
|
1804 |
|
1805 self.progress_queue.task_done() |
|
1806 |
|
1807 self.db.ThreadComplete() |
|
1808 |
|
1809 |
|
1810 |
|
1811 def Validate(value, typ): |
|
1812 """Checks that value is non-empty and of the right type. |
|
1813 |
|
1814 Args: |
|
1815 value: any value |
|
1816 typ: a type or tuple of types |
|
1817 |
|
1818 Raises: |
|
1819 ValueError if value is None or empty. |
|
1820 TypeError if it's not the given type. |
|
1821 |
|
1822 """ |
|
1823 if not value: |
|
1824 raise ValueError('Value should not be empty; received %s.' % value) |
|
1825 elif not isinstance(value, typ): |
|
1826 raise TypeError('Expected a %s, but received %s (a %s).' % |
|
1827 (typ, value, value.__class__)) |
|
1828 |
|
1829 |
|
1830 class Loader(object): |
|
1831 """A base class for creating datastore entities from input data. |
|
1832 |
|
1833 To add a handler for bulk loading a new entity kind into your datastore, |
|
1834 write a subclass of this class that calls Loader.__init__ from your |
|
1835 class's __init__. |
|
1836 |
|
1837 If you need to run extra code to convert entities from the input |
|
1838 data, create new properties, or otherwise modify the entities before |
|
1839 they're inserted, override HandleEntity. |
|
1840 |
|
1841 See the CreateEntity method for the creation of entities from the |
|
1842 (parsed) input data. |
|
1843 """ |
|
1844 |
|
1845 __loaders = {} |
|
1846 __kind = None |
|
1847 __properties = None |
|
1848 |
|
1849 def __init__(self, kind, properties): |
|
1850 """Constructor. |
|
1851 |
|
1852 Populates this Loader's kind and properties map. Also registers it with |
|
1853 the bulk loader, so that all you need to do is instantiate your Loader, |
|
1854 and the bulkload handler will automatically use it. |
|
1855 |
|
1856 Args: |
|
1857 kind: a string containing the entity kind that this loader handles |
|
1858 |
|
1859 properties: list of (name, converter) tuples. |
|
1860 |
|
1861 This is used to automatically convert the CSV columns into |
|
1862 properties. The converter should be a function that takes one |
|
1863 argument, a string value from the CSV file, and returns a |
|
1864 correctly typed property value that should be inserted. The |
|
1865 tuples in this list should match the columns in your CSV file, |
|
1866 in order. |
|
1867 |
|
1868 For example: |
|
1869 [('name', str), |
|
1870 ('id_number', int), |
|
1871 ('email', datastore_types.Email), |
|
1872 ('user', users.User), |
|
1873 ('birthdate', lambda x: datetime.datetime.fromtimestamp(float(x))), |
|
1874 ('description', datastore_types.Text), |
|
1875 ] |
|
1876 """ |
|
1877 Validate(kind, basestring) |
|
1878 self.__kind = kind |
|
1879 |
|
1880 db.class_for_kind(kind) |
|
1881 |
|
1882 Validate(properties, list) |
|
1883 for name, fn in properties: |
|
1884 Validate(name, basestring) |
|
1885 assert callable(fn), ( |
|
1886 'Conversion function %s for property %s is not callable.' % (fn, name)) |
|
1887 |
|
1888 self.__properties = properties |
|
1889 |
|
1890 @staticmethod |
|
1891 def RegisterLoader(loader): |
|
1892 |
|
1893 Loader.__loaders[loader.__kind] = loader |
|
1894 |
|
1895 def kind(self): |
|
1896 """ Return the entity kind that this Loader handes. |
|
1897 """ |
|
1898 return self.__kind |
|
1899 |
|
1900 def CreateEntity(self, values, key_name=None): |
|
1901 """Creates a entity from a list of property values. |
|
1902 |
|
1903 Args: |
|
1904 values: list/tuple of str |
|
1905 key_name: if provided, the name for the (single) resulting entity |
|
1906 |
|
1907 Returns: |
|
1908 list of db.Model |
|
1909 |
|
1910 The returned entities are populated with the property values from the |
|
1911 argument, converted to native types using the properties map given in |
|
1912 the constructor, and passed through HandleEntity. They're ready to be |
|
1913 inserted. |
|
1914 |
|
1915 Raises: |
|
1916 AssertionError if the number of values doesn't match the number |
|
1917 of properties in the properties map. |
|
1918 ValueError if any element of values is None or empty. |
|
1919 TypeError if values is not a list or tuple. |
|
1920 """ |
|
1921 Validate(values, (list, tuple)) |
|
1922 assert len(values) == len(self.__properties), ( |
|
1923 'Expected %d CSV columns, found %d.' % |
|
1924 (len(self.__properties), len(values))) |
|
1925 |
|
1926 model_class = db.class_for_kind(self.__kind) |
|
1927 |
|
1928 properties = {'key_name': key_name} |
|
1929 for (name, converter), val in zip(self.__properties, values): |
|
1930 if converter is bool and val.lower() in ('0', 'false', 'no'): |
|
1931 val = False |
|
1932 properties[name] = converter(val) |
|
1933 |
|
1934 entity = model_class(**properties) |
|
1935 entities = self.HandleEntity(entity) |
|
1936 |
|
1937 if entities: |
|
1938 if not isinstance(entities, (list, tuple)): |
|
1939 entities = [entities] |
|
1940 |
|
1941 for entity in entities: |
|
1942 if not isinstance(entity, db.Model): |
|
1943 raise TypeError('Expected a db.Model, received %s (a %s).' % |
|
1944 (entity, entity.__class__)) |
|
1945 |
|
1946 return entities |
|
1947 |
|
1948 def GenerateKey(self, i, values): |
|
1949 """Generates a key_name to be used in creating the underlying object. |
|
1950 |
|
1951 The default implementation returns None. |
|
1952 |
|
1953 This method can be overridden to control the key generation for |
|
1954 uploaded entities. The value returned should be None (to use a |
|
1955 server generated numeric key), or a string which neither starts |
|
1956 with a digit nor has the form __*__. (See |
|
1957 http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html) |
|
1958 |
|
1959 If you generate your own string keys, keep in mind: |
|
1960 |
|
1961 1. The key name for each entity must be unique. |
|
1962 2. If an entity of the same kind and key already exists in the |
|
1963 datastore, it will be overwritten. |
|
1964 |
|
1965 Args: |
|
1966 i: Number corresponding to this object (assume it's run in a loop, |
|
1967 this is your current count. |
|
1968 values: list/tuple of str. |
|
1969 |
|
1970 Returns: |
|
1971 A string to be used as the key_name for an entity. |
|
1972 """ |
|
1973 return None |
|
1974 |
|
1975 def HandleEntity(self, entity): |
|
1976 """Subclasses can override this to add custom entity conversion code. |
|
1977 |
|
1978 This is called for each entity, after its properties are populated from |
|
1979 CSV but before it is stored. Subclasses can override this to add custom |
|
1980 entity handling code. |
|
1981 |
|
1982 The entity to be inserted should be returned. If multiple entities should |
|
1983 be inserted, return a list of entities. If no entities should be inserted, |
|
1984 return None or []. |
|
1985 |
|
1986 Args: |
|
1987 entity: db.Model |
|
1988 |
|
1989 Returns: |
|
1990 db.Model or list of db.Model |
|
1991 """ |
|
1992 return entity |
|
1993 |
|
1994 |
|
1995 @staticmethod |
|
1996 def RegisteredLoaders(): |
|
1997 """Returns a list of the Loader instances that have been created. |
|
1998 """ |
|
1999 return dict(Loader.__loaders) |
|
2000 |
|
2001 |
|
2002 class QueueJoinThread(threading.Thread): |
|
2003 """A thread that joins a queue and exits. |
|
2004 |
|
2005 Queue joins do not have a timeout. To simulate a queue join with |
|
2006 timeout, run this thread and join it with a timeout. |
|
2007 """ |
|
2008 |
|
2009 def __init__(self, queue): |
|
2010 """Initialize a QueueJoinThread. |
|
2011 |
|
2012 Args: |
|
2013 queue: The queue for this thread to join. |
|
2014 """ |
|
2015 threading.Thread.__init__(self) |
|
2016 assert isinstance(queue, (Queue.Queue, ReQueue)) |
|
2017 self.queue = queue |
|
2018 |
|
2019 def run(self): |
|
2020 """Perform the queue join in this thread.""" |
|
2021 self.queue.join() |
|
2022 |
|
2023 |
|
2024 def InterruptibleQueueJoin(queue, |
|
2025 thread_local, |
|
2026 thread_gate, |
|
2027 queue_join_thread_factory=QueueJoinThread): |
|
2028 """Repeatedly joins the given ReQueue or Queue.Queue with short timeout. |
|
2029 |
|
2030 Between each timeout on the join, worker threads are checked. |
|
2031 |
|
2032 Args: |
|
2033 queue: A Queue.Queue or ReQueue instance. |
|
2034 thread_local: A threading.local instance which indicates interrupts. |
|
2035 thread_gate: A ThreadGate instance. |
|
2036 queue_join_thread_factory: Used for dependency injection. |
|
2037 |
|
2038 Returns: |
|
2039 True unless the queue join is interrupted by SIGINT or worker death. |
|
2040 """ |
|
2041 thread = queue_join_thread_factory(queue) |
|
2042 thread.start() |
|
2043 while True: |
|
2044 thread.join(timeout=.5) |
|
2045 if not thread.isAlive(): |
|
2046 return True |
|
2047 if thread_local.shut_down: |
|
2048 logging.debug('Queue join interrupted') |
|
2049 return False |
|
2050 for worker_thread in thread_gate.Threads(): |
|
2051 if not worker_thread.isAlive(): |
|
2052 return False |
|
2053 |
|
2054 |
|
2055 def ShutdownThreads(data_source_thread, work_queue, thread_gate): |
|
2056 """Shuts down the worker and data source threads. |
|
2057 |
|
2058 Args: |
|
2059 data_source_thread: A running DataSourceThread instance. |
|
2060 work_queue: The work queue. |
|
2061 thread_gate: A ThreadGate instance with workers registered. |
|
2062 """ |
|
2063 logging.info('An error occurred. Shutting down...') |
|
2064 |
|
2065 data_source_thread.exit_flag = True |
|
2066 |
|
2067 for thread in thread_gate.Threads(): |
|
2068 thread.exit_flag = True |
|
2069 |
|
2070 for unused_thread in thread_gate.Threads(): |
|
2071 thread_gate.EnableThread() |
|
2072 |
|
2073 data_source_thread.join(timeout=3.0) |
|
2074 if data_source_thread.isAlive(): |
|
2075 logging.warn('%s hung while trying to exit', |
|
2076 data_source_thread.GetFriendlyName()) |
|
2077 |
|
2078 while not work_queue.empty(): |
|
2079 try: |
|
2080 unused_item = work_queue.get_nowait() |
|
2081 work_queue.task_done() |
|
2082 except Queue.Empty: |
|
2083 pass |
|
2084 |
|
2085 |
|
2086 def PerformBulkUpload(app_id, |
|
2087 post_url, |
|
2088 kind, |
|
2089 workitem_generator_factory, |
|
2090 num_threads, |
|
2091 throttle, |
|
2092 progress_db, |
|
2093 max_queue_size=DEFAULT_QUEUE_SIZE, |
|
2094 request_manager_factory=RequestManager, |
|
2095 bulkloaderthread_factory=BulkLoaderThread, |
|
2096 progresstrackerthread_factory=ProgressTrackerThread, |
|
2097 datasourcethread_factory=DataSourceThread, |
|
2098 work_queue_factory=ReQueue, |
|
2099 progress_queue_factory=Queue.Queue): |
|
2100 """Uploads data into an application using a series of HTTP POSTs. |
|
2101 |
|
2102 This function will spin up a number of threads to read entities from |
|
2103 the data source, pass those to a number of worker ("uploader") threads |
|
2104 for sending to the application, and track all of the progress in a |
|
2105 small database in case an error or pause/termination requires a |
|
2106 restart/resumption of the upload process. |
|
2107 |
|
2108 Args: |
|
2109 app_id: String containing application id. |
|
2110 post_url: URL to post the Entity data to. |
|
2111 kind: Kind of the Entity records being posted. |
|
2112 workitem_generator_factory: A factory that creates a WorkItem generator. |
|
2113 num_threads: How many uploader threads should be created. |
|
2114 throttle: A Throttle instance. |
|
2115 progress_db: The database to use for replaying/recording progress. |
|
2116 max_queue_size: Maximum size of the queues before they should block. |
|
2117 request_manager_factory: Used for dependency injection. |
|
2118 bulkloaderthread_factory: Used for dependency injection. |
|
2119 progresstrackerthread_factory: Used for dependency injection. |
|
2120 datasourcethread_factory: Used for dependency injection. |
|
2121 work_queue_factory: Used for dependency injection. |
|
2122 progress_queue_factory: Used for dependency injection. |
|
2123 |
|
2124 Raises: |
|
2125 AuthenticationError: If authentication is required and fails. |
|
2126 """ |
|
2127 thread_gate = ThreadGate(True) |
|
2128 |
|
2129 (unused_scheme, |
|
2130 host_port, url_path, |
|
2131 unused_query, unused_fragment) = urlparse.urlsplit(post_url) |
|
2132 |
|
2133 work_queue = work_queue_factory(max_queue_size) |
|
2134 progress_queue = progress_queue_factory(max_queue_size) |
|
2135 request_manager = request_manager_factory(app_id, |
|
2136 host_port, |
|
2137 url_path, |
|
2138 kind, |
|
2139 throttle) |
|
2140 |
|
2141 throttle.Register(threading.currentThread()) |
|
2142 try: |
|
2143 request_manager.Authenticate() |
|
2144 except Exception, e: |
|
2145 logging.exception(e) |
|
2146 raise AuthenticationError('Authentication failed') |
|
2147 if (request_manager.credentials is not None and |
|
2148 not request_manager.authenticated): |
|
2149 raise AuthenticationError('Authentication failed') |
|
2150 |
|
2151 for unused_idx in range(num_threads): |
|
2152 thread = bulkloaderthread_factory(work_queue, |
|
2153 throttle, |
|
2154 thread_gate, |
|
2155 request_manager) |
|
2156 throttle.Register(thread) |
|
2157 thread_gate.Register(thread) |
|
2158 |
|
2159 progress_thread = progresstrackerthread_factory(progress_queue, progress_db) |
|
2160 |
|
2161 if progress_db.HasUnfinishedWork(): |
|
2162 logging.debug('Restarting upload using progress database') |
|
2163 progress_generator_factory = progress_db.GetProgressStatusGenerator |
|
2164 else: |
|
2165 progress_generator_factory = None |
|
2166 |
|
2167 data_source_thread = datasourcethread_factory(work_queue, |
|
2168 progress_queue, |
|
2169 workitem_generator_factory, |
|
2170 progress_generator_factory) |
|
2171 |
|
2172 thread_local = threading.local() |
|
2173 thread_local.shut_down = False |
|
2174 |
|
2175 def Interrupt(unused_signum, unused_frame): |
|
2176 """Shutdown gracefully in response to a signal.""" |
|
2177 thread_local.shut_down = True |
|
2178 |
|
2179 signal.signal(signal.SIGINT, Interrupt) |
|
2180 |
|
2181 progress_thread.start() |
|
2182 data_source_thread.start() |
|
2183 for thread in thread_gate.Threads(): |
|
2184 thread.start() |
|
2185 |
|
2186 |
|
2187 while not thread_local.shut_down: |
|
2188 data_source_thread.join(timeout=0.25) |
|
2189 |
|
2190 if data_source_thread.isAlive(): |
|
2191 for thread in list(thread_gate.Threads()) + [progress_thread]: |
|
2192 if not thread.isAlive(): |
|
2193 logging.info('Unexpected thread death: %s', thread.getName()) |
|
2194 thread_local.shut_down = True |
|
2195 break |
|
2196 else: |
|
2197 break |
|
2198 |
|
2199 if thread_local.shut_down: |
|
2200 ShutdownThreads(data_source_thread, work_queue, thread_gate) |
|
2201 |
|
2202 def _Join(ob, msg): |
|
2203 logging.debug('Waiting for %s...', msg) |
|
2204 if isinstance(ob, threading.Thread): |
|
2205 ob.join(timeout=3.0) |
|
2206 if ob.isAlive(): |
|
2207 logging.debug('Joining %s failed', ob.GetFriendlyName()) |
|
2208 else: |
|
2209 logging.debug('... done.') |
|
2210 elif isinstance(ob, (Queue.Queue, ReQueue)): |
|
2211 if not InterruptibleQueueJoin(ob, thread_local, thread_gate): |
|
2212 ShutdownThreads(data_source_thread, work_queue, thread_gate) |
|
2213 else: |
|
2214 ob.join() |
|
2215 logging.debug('... done.') |
|
2216 |
|
2217 _Join(work_queue, 'work_queue to flush') |
|
2218 |
|
2219 for unused_thread in thread_gate.Threads(): |
|
2220 work_queue.put(_THREAD_SHOULD_EXIT) |
|
2221 |
|
2222 for unused_thread in thread_gate.Threads(): |
|
2223 thread_gate.EnableThread() |
|
2224 |
|
2225 for thread in thread_gate.Threads(): |
|
2226 _Join(thread, 'thread [%s] to terminate' % thread.getName()) |
|
2227 |
|
2228 thread.CheckError() |
|
2229 |
|
2230 if progress_thread.isAlive(): |
|
2231 _Join(progress_queue, 'progress_queue to finish') |
|
2232 else: |
|
2233 logging.warn('Progress thread exited prematurely') |
|
2234 |
|
2235 progress_queue.put(_THREAD_SHOULD_EXIT) |
|
2236 _Join(progress_thread, 'progress_thread to terminate') |
|
2237 progress_thread.CheckError() |
|
2238 |
|
2239 data_source_thread.CheckError() |
|
2240 |
|
2241 total_up, duration = throttle.TotalTransferred(BANDWIDTH_UP) |
|
2242 s_total_up, unused_duration = throttle.TotalTransferred(HTTPS_BANDWIDTH_UP) |
|
2243 total_up += s_total_up |
|
2244 logging.info('%d entites read, %d previously transferred', |
|
2245 data_source_thread.read_count, |
|
2246 data_source_thread.sent_count) |
|
2247 logging.info('%d entities (%d bytes) transferred in %.1f seconds', |
|
2248 progress_thread.entities_sent, total_up, duration) |
|
2249 if (data_source_thread.read_all and |
|
2250 progress_thread.entities_sent + data_source_thread.sent_count >= |
|
2251 data_source_thread.read_count): |
|
2252 logging.info('All entities successfully uploaded') |
|
2253 else: |
|
2254 logging.info('Some entities not successfully uploaded') |
|
2255 |
|
2256 |
|
2257 def PrintUsageExit(code): |
|
2258 """Prints usage information and exits with a status code. |
|
2259 |
|
2260 Args: |
|
2261 code: Status code to pass to sys.exit() after displaying usage information. |
|
2262 """ |
|
2263 print __doc__ % {'arg0': sys.argv[0]} |
|
2264 sys.stdout.flush() |
|
2265 sys.stderr.flush() |
|
2266 sys.exit(code) |
|
2267 |
|
2268 |
|
2269 def ParseArguments(argv): |
|
2270 """Parses command-line arguments. |
|
2271 |
|
2272 Prints out a help message if -h or --help is supplied. |
|
2273 |
|
2274 Args: |
|
2275 argv: List of command-line arguments. |
|
2276 |
|
2277 Returns: |
|
2278 Tuple (url, filename, cookie, batch_size, kind) containing the values from |
|
2279 each corresponding command-line flag. |
|
2280 """ |
|
2281 opts, unused_args = getopt.getopt( |
|
2282 argv[1:], |
|
2283 'h', |
|
2284 ['debug', |
|
2285 'help', |
|
2286 'url=', |
|
2287 'filename=', |
|
2288 'batch_size=', |
|
2289 'kind=', |
|
2290 'num_threads=', |
|
2291 'bandwidth_limit=', |
|
2292 'rps_limit=', |
|
2293 'http_limit=', |
|
2294 'db_filename=', |
|
2295 'app_id=', |
|
2296 'config_file=', |
|
2297 'auth_domain=', |
|
2298 ]) |
|
2299 |
|
2300 url = None |
|
2301 filename = None |
|
2302 batch_size = DEFAULT_BATCH_SIZE |
|
2303 kind = None |
|
2304 num_threads = DEFAULT_THREAD_COUNT |
|
2305 bandwidth_limit = DEFAULT_BANDWIDTH_LIMIT |
|
2306 rps_limit = DEFAULT_RPS_LIMIT |
|
2307 http_limit = DEFAULT_REQUEST_LIMIT |
|
2308 db_filename = None |
|
2309 app_id = None |
|
2310 config_file = None |
|
2311 auth_domain = 'gmail.com' |
|
2312 |
|
2313 for option, value in opts: |
|
2314 if option == '--debug': |
|
2315 logging.getLogger().setLevel(logging.DEBUG) |
|
2316 elif option in ('-h', '--help'): |
|
2317 PrintUsageExit(0) |
|
2318 elif option == '--url': |
|
2319 url = value |
|
2320 elif option == '--filename': |
|
2321 filename = value |
|
2322 elif option == '--batch_size': |
|
2323 batch_size = int(value) |
|
2324 elif option == '--kind': |
|
2325 kind = value |
|
2326 elif option == '--num_threads': |
|
2327 num_threads = int(value) |
|
2328 elif option == '--bandwidth_limit': |
|
2329 bandwidth_limit = int(value) |
|
2330 elif option == '--rps_limit': |
|
2331 rps_limit = int(value) |
|
2332 elif option == '--http_limit': |
|
2333 http_limit = int(value) |
|
2334 elif option == '--db_filename': |
|
2335 db_filename = value |
|
2336 elif option == '--app_id': |
|
2337 app_id = value |
|
2338 elif option == '--config_file': |
|
2339 config_file = value |
|
2340 elif option == '--auth_domain': |
|
2341 auth_domain = value |
|
2342 |
|
2343 return ProcessArguments(app_id=app_id, |
|
2344 url=url, |
|
2345 filename=filename, |
|
2346 batch_size=batch_size, |
|
2347 kind=kind, |
|
2348 num_threads=num_threads, |
|
2349 bandwidth_limit=bandwidth_limit, |
|
2350 rps_limit=rps_limit, |
|
2351 http_limit=http_limit, |
|
2352 db_filename=db_filename, |
|
2353 config_file=config_file, |
|
2354 auth_domain=auth_domain, |
|
2355 die_fn=lambda: PrintUsageExit(1)) |
|
2356 |
|
2357 |
|
2358 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit): |
|
2359 return { |
|
2360 BANDWIDTH_UP: bandwidth_limit, |
|
2361 BANDWIDTH_DOWN: bandwidth_limit, |
|
2362 REQUESTS: http_limit, |
|
2363 HTTPS_BANDWIDTH_UP: bandwidth_limit / 5, |
|
2364 HTTPS_BANDWIDTH_DOWN: bandwidth_limit / 5, |
|
2365 HTTPS_REQUESTS: http_limit / 5, |
|
2366 RECORDS: rps_limit, |
|
2367 } |
|
2368 |
|
2369 |
|
2370 def LoadConfig(config_file): |
|
2371 """Loads a config file and registers any Loader classes present.""" |
|
2372 if config_file: |
|
2373 global_dict = dict(globals()) |
|
2374 execfile(config_file, global_dict) |
|
2375 for cls in Loader.__subclasses__(): |
|
2376 Loader.RegisterLoader(cls()) |
|
2377 |
|
2378 |
|
2379 def _MissingArgument(arg_name, die_fn): |
|
2380 """Print error message about missing argument and die.""" |
|
2381 print >>sys.stderr, '%s argument required' % arg_name |
|
2382 die_fn() |
|
2383 |
|
2384 |
|
2385 def ProcessArguments(app_id=None, |
|
2386 url=None, |
|
2387 filename=None, |
|
2388 batch_size=DEFAULT_BATCH_SIZE, |
|
2389 kind=None, |
|
2390 num_threads=DEFAULT_THREAD_COUNT, |
|
2391 bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT, |
|
2392 rps_limit=DEFAULT_RPS_LIMIT, |
|
2393 http_limit=DEFAULT_REQUEST_LIMIT, |
|
2394 db_filename=None, |
|
2395 config_file=None, |
|
2396 auth_domain='gmail.com', |
|
2397 die_fn=lambda: sys.exit(1)): |
|
2398 """Processes non command-line input arguments.""" |
|
2399 if db_filename is None: |
|
2400 db_filename = time.strftime('bulkloader-progress-%Y%m%d.%H%M%S.sql3') |
|
2401 |
|
2402 if batch_size <= 0: |
|
2403 print >>sys.stderr, 'batch_size must be 1 or larger' |
|
2404 die_fn() |
|
2405 |
|
2406 if url is None: |
|
2407 _MissingArgument('url', die_fn) |
|
2408 |
|
2409 if filename is None: |
|
2410 _MissingArgument('filename', die_fn) |
|
2411 |
|
2412 if kind is None: |
|
2413 _MissingArgument('kind', die_fn) |
|
2414 |
|
2415 if config_file is None: |
|
2416 _MissingArgument('config_file', die_fn) |
|
2417 |
|
2418 if app_id is None: |
|
2419 (unused_scheme, host_port, unused_url_path, |
|
2420 unused_query, unused_fragment) = urlparse.urlsplit(url) |
|
2421 suffix_idx = host_port.find('.appspot.com') |
|
2422 if suffix_idx > -1: |
|
2423 app_id = host_port[:suffix_idx] |
|
2424 elif host_port.split(':')[0].endswith('google.com'): |
|
2425 app_id = host_port.split('.')[0] |
|
2426 else: |
|
2427 print >>sys.stderr, 'app_id required for non appspot.com domains' |
|
2428 die_fn() |
|
2429 |
|
2430 return (app_id, url, filename, batch_size, kind, num_threads, |
|
2431 bandwidth_limit, rps_limit, http_limit, db_filename, config_file, |
|
2432 auth_domain) |
|
2433 |
|
2434 |
|
2435 def _PerformBulkload(app_id=None, |
|
2436 url=None, |
|
2437 filename=None, |
|
2438 batch_size=DEFAULT_BATCH_SIZE, |
|
2439 kind=None, |
|
2440 num_threads=DEFAULT_THREAD_COUNT, |
|
2441 bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT, |
|
2442 rps_limit=DEFAULT_RPS_LIMIT, |
|
2443 http_limit=DEFAULT_REQUEST_LIMIT, |
|
2444 db_filename=None, |
|
2445 config_file=None, |
|
2446 auth_domain='gmail.com'): |
|
2447 """Runs the bulkloader, given the options as keyword arguments. |
|
2448 |
|
2449 Args: |
|
2450 app_id: The application id. |
|
2451 url: The url of the remote_api endpoint. |
|
2452 filename: The name of the file containing the CSV data. |
|
2453 batch_size: The number of records to send per request. |
|
2454 kind: The kind of entity to transfer. |
|
2455 num_threads: The number of threads to use to transfer data. |
|
2456 bandwidth_limit: Maximum bytes/second to transfers. |
|
2457 rps_limit: Maximum records/second to transfer. |
|
2458 http_limit: Maximum requests/second for transfers. |
|
2459 db_filename: The name of the SQLite3 progress database file. |
|
2460 config_file: The name of the configuration file. |
|
2461 auth_domain: The auth domain to use for logins and UserProperty. |
|
2462 |
|
2463 Returns: |
|
2464 An exit code. |
|
2465 """ |
|
2466 os.environ['AUTH_DOMAIN'] = auth_domain |
|
2467 LoadConfig(config_file) |
|
2468 |
|
2469 throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit) |
|
2470 |
|
2471 throttle = Throttle(layout=throttle_layout) |
|
2472 |
|
2473 |
|
2474 workitem_generator_factory = GetCSVGeneratorFactory(filename, batch_size) |
|
2475 |
|
2476 if db_filename == 'skip': |
|
2477 progress_db = StubProgressDatabase() |
|
2478 else: |
|
2479 progress_db = ProgressDatabase(db_filename) |
|
2480 |
|
2481 |
|
2482 max_queue_size = max(DEFAULT_QUEUE_SIZE, 2 * num_threads + 5) |
|
2483 |
|
2484 PerformBulkUpload(app_id, |
|
2485 url, |
|
2486 kind, |
|
2487 workitem_generator_factory, |
|
2488 num_threads, |
|
2489 throttle, |
|
2490 progress_db, |
|
2491 max_queue_size=max_queue_size) |
|
2492 |
|
2493 return 0 |
|
2494 |
|
2495 |
|
2496 def Run(app_id=None, |
|
2497 url=None, |
|
2498 filename=None, |
|
2499 batch_size=DEFAULT_BATCH_SIZE, |
|
2500 kind=None, |
|
2501 num_threads=DEFAULT_THREAD_COUNT, |
|
2502 bandwidth_limit=DEFAULT_BANDWIDTH_LIMIT, |
|
2503 rps_limit=DEFAULT_RPS_LIMIT, |
|
2504 http_limit=DEFAULT_REQUEST_LIMIT, |
|
2505 db_filename=None, |
|
2506 auth_domain='gmail.com', |
|
2507 config_file=None): |
|
2508 """Sets up and runs the bulkloader, given the options as keyword arguments. |
|
2509 |
|
2510 Args: |
|
2511 app_id: The application id. |
|
2512 url: The url of the remote_api endpoint. |
|
2513 filename: The name of the file containing the CSV data. |
|
2514 batch_size: The number of records to send per request. |
|
2515 kind: The kind of entity to transfer. |
|
2516 num_threads: The number of threads to use to transfer data. |
|
2517 bandwidth_limit: Maximum bytes/second to transfers. |
|
2518 rps_limit: Maximum records/second to transfer. |
|
2519 http_limit: Maximum requests/second for transfers. |
|
2520 db_filename: The name of the SQLite3 progress database file. |
|
2521 config_file: The name of the configuration file. |
|
2522 auth_domain: The auth domain to use for logins and UserProperty. |
|
2523 |
|
2524 Returns: |
|
2525 An exit code. |
|
2526 """ |
|
2527 logging.basicConfig( |
|
2528 format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s') |
|
2529 args = ProcessArguments(app_id=app_id, |
|
2530 url=url, |
|
2531 filename=filename, |
|
2532 batch_size=batch_size, |
|
2533 kind=kind, |
|
2534 num_threads=num_threads, |
|
2535 bandwidth_limit=bandwidth_limit, |
|
2536 rps_limit=rps_limit, |
|
2537 http_limit=http_limit, |
|
2538 db_filename=db_filename, |
|
2539 config_file=config_file) |
|
2540 |
|
2541 (app_id, url, filename, batch_size, kind, num_threads, bandwidth_limit, |
|
2542 rps_limit, http_limit, db_filename, config_file, auth_domain) = args |
|
2543 |
|
2544 return _PerformBulkload(app_id=app_id, |
|
2545 url=url, |
|
2546 filename=filename, |
|
2547 batch_size=batch_size, |
|
2548 kind=kind, |
|
2549 num_threads=num_threads, |
|
2550 bandwidth_limit=bandwidth_limit, |
|
2551 rps_limit=rps_limit, |
|
2552 http_limit=http_limit, |
|
2553 db_filename=db_filename, |
|
2554 config_file=config_file, |
|
2555 auth_domain=auth_domain) |
|
2556 |
|
2557 |
|
2558 def main(argv): |
|
2559 """Runs the importer from the command line.""" |
|
2560 logging.basicConfig( |
|
2561 level=logging.INFO, |
|
2562 format='%(levelname)-8s %(asctime)s %(filename)s] %(message)s') |
|
2563 |
|
2564 args = ParseArguments(argv) |
|
2565 if None in args: |
|
2566 print >>sys.stderr, 'Invalid arguments' |
|
2567 PrintUsageExit(1) |
|
2568 |
|
2569 (app_id, url, filename, batch_size, kind, num_threads, |
|
2570 bandwidth_limit, rps_limit, http_limit, db_filename, config_file, |
|
2571 auth_domain) = args |
|
2572 |
|
2573 return _PerformBulkload(app_id=app_id, |
|
2574 url=url, |
|
2575 filename=filename, |
|
2576 batch_size=batch_size, |
|
2577 kind=kind, |
|
2578 num_threads=num_threads, |
|
2579 bandwidth_limit=bandwidth_limit, |
|
2580 rps_limit=rps_limit, |
|
2581 http_limit=http_limit, |
|
2582 db_filename=db_filename, |
|
2583 config_file=config_file, |
|
2584 auth_domain=auth_domain) |
|
2585 |
|
2586 |
|
2587 if __name__ == '__main__': |
|
2588 sys.exit(main(sys.argv)) |