1700 |
1700 |
1701 Args: |
1701 Args: |
1702 duration: The duration of the transfer in seconds. |
1702 duration: The duration of the transfer in seconds. |
1703 """ |
1703 """ |
1704 if duration > self.threshhold2: |
1704 if duration > self.threshhold2: |
1705 self.DecreaseWorkers() |
1705 logger.debug('Transfer took %s, decreasing workers.', duration) |
|
1706 self.DecreaseWorkers(backoff=False) |
|
1707 return |
1706 elif duration > self.threshhold1: |
1708 elif duration > self.threshhold1: |
|
1709 logger.debug('Transfer took %s, not increasing workers.', duration) |
1707 return |
1710 return |
1708 elif self.enabled: |
1711 elif self.enabled: |
1709 if self.backoff_time > 0.0: |
1712 if self.backoff_time > 0.0: |
1710 logger.info('Resetting backoff to 0.0') |
1713 logger.info('Resetting backoff to 0.0') |
1711 self.backoff_time = 0.0 |
1714 self.backoff_time = 0.0 |
1720 if do_enable: |
1723 if do_enable: |
1721 logger.debug('Increasing active thread count to %d', |
1724 logger.debug('Increasing active thread count to %d', |
1722 self.enabled_count) |
1725 self.enabled_count) |
1723 self.thread_semaphore.release() |
1726 self.thread_semaphore.release() |
1724 |
1727 |
1725 def DecreaseWorkers(self): |
1728 def DecreaseWorkers(self, backoff=True): |
1726 """Informs the thread_gate that an item failed to send. |
1729 """Informs the thread_gate that an item failed to send. |
1727 |
1730 |
1728 If thread throttling is enabled, this method will cause the |
1731 If thread throttling is enabled, this method will cause the |
1729 throttler to allow one fewer thread in the critical section. If |
1732 throttler to allow one fewer thread in the critical section. If |
1730 there is only one thread remaining, failures will result in |
1733 there is only one thread remaining, failures will result in |
1731 exponential backoff until there is a success. |
1734 exponential backoff until there is a success. |
|
1735 |
|
1736 Args: |
|
1737 backoff: Whether to increase exponential backoff if there is only |
|
1738 one thread enabled. |
1732 """ |
1739 """ |
1733 if self.enabled: |
1740 if self.enabled: |
1734 do_disable = False |
1741 do_disable = False |
1735 self.lock.acquire() |
1742 self.lock.acquire() |
1736 try: |
1743 try: |
1737 if self.enabled: |
1744 if self.enabled: |
1738 if self.enabled_count > 1: |
1745 if self.enabled_count > 1: |
1739 do_disable = True |
1746 do_disable = True |
1740 self.enabled_count -= 1 |
1747 self.enabled_count -= 1 |
1741 else: |
1748 elif backoff: |
1742 if self.backoff_time == 0.0: |
1749 if self.backoff_time == 0.0: |
1743 self.backoff_time = INITIAL_BACKOFF |
1750 self.backoff_time = INITIAL_BACKOFF |
1744 else: |
1751 else: |
1745 self.backoff_time *= BACKOFF_FACTOR |
1752 self.backoff_time *= BACKOFF_FACTOR |
1746 finally: |
1753 finally: |
2136 t = self.get_time() |
2143 t = self.get_time() |
2137 response = self.TransferItem(item) |
2144 response = self.TransferItem(item) |
2138 status = 200 |
2145 status = 200 |
2139 transferred = True |
2146 transferred = True |
2140 transfer_time = self.get_time() - t |
2147 transfer_time = self.get_time() - t |
2141 logger.debug('[%s] %s Transferred %d entities', self.getName(), |
2148 logger.debug('[%s] %s Transferred %d entities in %0.1f seconds', |
2142 item, item.count) |
2149 self.getName(), item, item.count, transfer_time) |
2143 self.throttle.AddTransfer(RECORDS, item.count) |
2150 self.throttle.AddTransfer(RECORDS, item.count) |
2144 except (db.InternalError, db.NotSavedError, db.Timeout, |
2151 except (db.InternalError, db.NotSavedError, db.Timeout, |
2145 apiproxy_errors.OverQuotaError, |
2152 apiproxy_errors.OverQuotaError, |
2146 apiproxy_errors.DeadlineExceededError), e: |
2153 apiproxy_errors.DeadlineExceededError), e: |
2147 logger.exception('Caught non-fatal datastore error: %s', e) |
2154 logger.exception('Caught non-fatal datastore error: %s', e) |
2312 def ProcessResponse(self, item, export_result): |
2319 def ProcessResponse(self, item, export_result): |
2313 """Processes the response from the server application.""" |
2320 """Processes the response from the server application.""" |
2314 if export_result: |
2321 if export_result: |
2315 item.Process(export_result, self.num_threads, self.batch_size, |
2322 item.Process(export_result, self.num_threads, self.batch_size, |
2316 self.work_queue) |
2323 self.work_queue) |
|
2324 item.state = STATE_GOT |
2317 |
2325 |
2318 |
2326 |
2319 class DataSourceThread(_ThreadBase): |
2327 class DataSourceThread(_ThreadBase): |
2320 """A thread which reads WorkItems and pushes them into queue. |
2328 """A thread which reads WorkItems and pushes them into queue. |
2321 |
2329 |