--- a/thirdparty/google_appengine/google/appengine/tools/bulkloader.py Tue May 12 13:02:10 2009 +0200
+++ b/thirdparty/google_appengine/google/appengine/tools/bulkloader.py Tue May 12 15:39:52 2009 +0200
@@ -1702,8 +1702,11 @@
duration: The duration of the transfer in seconds.
"""
if duration > self.threshhold2:
- self.DecreaseWorkers()
+ logger.debug('Transfer took %s, decreasing workers.', duration)
+ self.DecreaseWorkers(backoff=False)
+ return
elif duration > self.threshhold1:
+ logger.debug('Transfer took %s, not increasing workers.', duration)
return
elif self.enabled:
if self.backoff_time > 0.0:
@@ -1722,13 +1725,17 @@
self.enabled_count)
self.thread_semaphore.release()
- def DecreaseWorkers(self):
+ def DecreaseWorkers(self, backoff=True):
"""Informs the thread_gate that an item failed to send.
If thread throttling is enabled, this method will cause the
throttler to allow one fewer thread in the critical section. If
there is only one thread remaining, failures will result in
exponential backoff until there is a success.
+
+ Args:
+ backoff: Whether to increase exponential backoff if there is only
+ one thread enabled.
"""
if self.enabled:
do_disable = False
@@ -1738,7 +1745,7 @@
if self.enabled_count > 1:
do_disable = True
self.enabled_count -= 1
- else:
+ elif backoff:
if self.backoff_time == 0.0:
self.backoff_time = INITIAL_BACKOFF
else:
@@ -2138,8 +2145,8 @@
status = 200
transferred = True
transfer_time = self.get_time() - t
- logger.debug('[%s] %s Transferred %d entities', self.getName(),
- item, item.count)
+ logger.debug('[%s] %s Transferred %d entities in %0.1f seconds',
+ self.getName(), item, item.count, transfer_time)
self.throttle.AddTransfer(RECORDS, item.count)
except (db.InternalError, db.NotSavedError, db.Timeout,
apiproxy_errors.OverQuotaError,
@@ -2169,8 +2176,8 @@
finally:
if transferred:
item.MarkAsTransferred()
+ self.work_queue.task_done()
self.thread_gate.TransferSuccess(transfer_time)
- self.work_queue.task_done()
else:
item.MarkAsError()
try:
@@ -2314,6 +2321,7 @@
if export_result:
item.Process(export_result, self.num_threads, self.batch_size,
self.work_queue)
+ item.state = STATE_GOT
class DataSourceThread(_ThreadBase):