thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 2309 be1b94099f2d
parent 2273 e4cb9c53db3e
child 2413 d0b7dac5325c
--- a/thirdparty/google_appengine/google/appengine/tools/bulkloader.py	Tue May 12 13:02:10 2009 +0200
+++ b/thirdparty/google_appengine/google/appengine/tools/bulkloader.py	Tue May 12 15:39:52 2009 +0200
@@ -1702,8 +1702,11 @@
       duration: The duration of the transfer in seconds.
     """
     if duration > self.threshhold2:
-      self.DecreaseWorkers()
+      logger.debug('Transfer took %s, decreasing workers.', duration)
+      self.DecreaseWorkers(backoff=False)
+      return
     elif duration > self.threshhold1:
+      logger.debug('Transfer took %s, not increasing workers.', duration)
       return
     elif self.enabled:
       if self.backoff_time > 0.0:
@@ -1722,13 +1725,17 @@
                      self.enabled_count)
         self.thread_semaphore.release()
 
-  def DecreaseWorkers(self):
+  def DecreaseWorkers(self, backoff=True):
     """Informs the thread_gate that an item failed to send.
 
     If thread throttling is enabled, this method will cause the
     throttler to allow one fewer thread in the critical section. If
     there is only one thread remaining, failures will result in
     exponential backoff until there is a success.
+
+    Args:
+      backoff: Whether to increase exponential backoff if there is only
+        one thread enabled.
     """
     if self.enabled:
       do_disable = False
@@ -1738,7 +1745,7 @@
           if self.enabled_count > 1:
             do_disable = True
             self.enabled_count -= 1
-          else:
+          elif backoff:
             if self.backoff_time == 0.0:
               self.backoff_time = INITIAL_BACKOFF
             else:
@@ -2138,8 +2145,8 @@
               status = 200
               transferred = True
               transfer_time = self.get_time() - t
-              logger.debug('[%s] %s Transferred %d entities', self.getName(),
-                           item, item.count)
+              logger.debug('[%s] %s Transferred %d entities in %0.1f seconds',
+                           self.getName(), item, item.count, transfer_time)
               self.throttle.AddTransfer(RECORDS, item.count)
             except (db.InternalError, db.NotSavedError, db.Timeout,
                     apiproxy_errors.OverQuotaError,
@@ -2169,8 +2176,8 @@
         finally:
           if transferred:
             item.MarkAsTransferred()
+            self.work_queue.task_done()
             self.thread_gate.TransferSuccess(transfer_time)
-            self.work_queue.task_done()
           else:
             item.MarkAsError()
             try:
@@ -2314,6 +2321,7 @@
     if export_result:
       item.Process(export_result, self.num_threads, self.batch_size,
                    self.work_queue)
+    item.state = STATE_GOT
 
 
 class DataSourceThread(_ThreadBase):