thirdparty/google_appengine/google/appengine/tools/bulkloader.py
changeset 2309 be1b94099f2d
parent 2273 e4cb9c53db3e
child 2413 d0b7dac5325c
equal deleted inserted replaced
2307:81c128f487e6 2309:be1b94099f2d
  1700 
  1700 
  1701     Args:
  1701     Args:
  1702       duration: The duration of the transfer in seconds.
  1702       duration: The duration of the transfer in seconds.
  1703     """
  1703     """
  1704     if duration > self.threshhold2:
  1704     if duration > self.threshhold2:
  1705       self.DecreaseWorkers()
  1705       logger.debug('Transfer took %s, decreasing workers.', duration)
       
  1706       self.DecreaseWorkers(backoff=False)
       
  1707       return
  1706     elif duration > self.threshhold1:
  1708     elif duration > self.threshhold1:
       
  1709       logger.debug('Transfer took %s, not increasing workers.', duration)
  1707       return
  1710       return
  1708     elif self.enabled:
  1711     elif self.enabled:
  1709       if self.backoff_time > 0.0:
  1712       if self.backoff_time > 0.0:
  1710         logger.info('Resetting backoff to 0.0')
  1713         logger.info('Resetting backoff to 0.0')
  1711         self.backoff_time = 0.0
  1714         self.backoff_time = 0.0
  1720       if do_enable:
  1723       if do_enable:
  1721         logger.debug('Increasing active thread count to %d',
  1724         logger.debug('Increasing active thread count to %d',
  1722                      self.enabled_count)
  1725                      self.enabled_count)
  1723         self.thread_semaphore.release()
  1726         self.thread_semaphore.release()
  1724 
  1727 
  1725   def DecreaseWorkers(self):
  1728   def DecreaseWorkers(self, backoff=True):
  1726     """Informs the thread_gate that an item failed to send.
  1729     """Informs the thread_gate that an item failed to send.
  1727 
  1730 
  1728     If thread throttling is enabled, this method will cause the
  1731     If thread throttling is enabled, this method will cause the
  1729     throttler to allow one fewer thread in the critical section. If
  1732     throttler to allow one fewer thread in the critical section. If
  1730     there is only one thread remaining, failures will result in
  1733     there is only one thread remaining, failures will result in
  1731     exponential backoff until there is a success.
  1734     exponential backoff until there is a success.
       
  1735 
       
  1736     Args:
       
  1737       backoff: Whether to increase exponential backoff if there is only
       
  1738         one thread enabled.
  1732     """
  1739     """
  1733     if self.enabled:
  1740     if self.enabled:
  1734       do_disable = False
  1741       do_disable = False
  1735       self.lock.acquire()
  1742       self.lock.acquire()
  1736       try:
  1743       try:
  1737         if self.enabled:
  1744         if self.enabled:
  1738           if self.enabled_count > 1:
  1745           if self.enabled_count > 1:
  1739             do_disable = True
  1746             do_disable = True
  1740             self.enabled_count -= 1
  1747             self.enabled_count -= 1
  1741           else:
  1748           elif backoff:
  1742             if self.backoff_time == 0.0:
  1749             if self.backoff_time == 0.0:
  1743               self.backoff_time = INITIAL_BACKOFF
  1750               self.backoff_time = INITIAL_BACKOFF
  1744             else:
  1751             else:
  1745               self.backoff_time *= BACKOFF_FACTOR
  1752               self.backoff_time *= BACKOFF_FACTOR
  1746       finally:
  1753       finally:
  2136               t = self.get_time()
  2143               t = self.get_time()
  2137               response = self.TransferItem(item)
  2144               response = self.TransferItem(item)
  2138               status = 200
  2145               status = 200
  2139               transferred = True
  2146               transferred = True
  2140               transfer_time = self.get_time() - t
  2147               transfer_time = self.get_time() - t
  2141               logger.debug('[%s] %s Transferred %d entities', self.getName(),
  2148               logger.debug('[%s] %s Transferred %d entities in %0.1f seconds',
  2142                            item, item.count)
  2149                            self.getName(), item, item.count, transfer_time)
  2143               self.throttle.AddTransfer(RECORDS, item.count)
  2150               self.throttle.AddTransfer(RECORDS, item.count)
  2144             except (db.InternalError, db.NotSavedError, db.Timeout,
  2151             except (db.InternalError, db.NotSavedError, db.Timeout,
  2145                     apiproxy_errors.OverQuotaError,
  2152                     apiproxy_errors.OverQuotaError,
  2146                     apiproxy_errors.DeadlineExceededError), e:
  2153                     apiproxy_errors.DeadlineExceededError), e:
  2147               logger.exception('Caught non-fatal datastore error: %s', e)
  2154               logger.exception('Caught non-fatal datastore error: %s', e)
  2167             raise
  2174             raise
  2168 
  2175 
  2169         finally:
  2176         finally:
  2170           if transferred:
  2177           if transferred:
  2171             item.MarkAsTransferred()
  2178             item.MarkAsTransferred()
       
  2179             self.work_queue.task_done()
  2172             self.thread_gate.TransferSuccess(transfer_time)
  2180             self.thread_gate.TransferSuccess(transfer_time)
  2173             self.work_queue.task_done()
       
  2174           else:
  2181           else:
  2175             item.MarkAsError()
  2182             item.MarkAsError()
  2176             try:
  2183             try:
  2177               self.work_queue.reput(item, block=False)
  2184               self.work_queue.reput(item, block=False)
  2178             except Queue.Full:
  2185             except Queue.Full:
  2312   def ProcessResponse(self, item, export_result):
  2319   def ProcessResponse(self, item, export_result):
  2313     """Processes the response from the server application."""
  2320     """Processes the response from the server application."""
  2314     if export_result:
  2321     if export_result:
  2315       item.Process(export_result, self.num_threads, self.batch_size,
  2322       item.Process(export_result, self.num_threads, self.batch_size,
  2316                    self.work_queue)
  2323                    self.work_queue)
       
  2324     item.state = STATE_GOT
  2317 
  2325 
  2318 
  2326 
  2319 class DataSourceThread(_ThreadBase):
  2327 class DataSourceThread(_ThreadBase):
  2320   """A thread which reads WorkItems and pushes them into queue.
  2328   """A thread which reads WorkItems and pushes them into queue.
  2321 
  2329