2014-09-24 84 views
8

In ThreadPoolExecutor (TPE), la richiamata è sempre garantita per essere eseguita nello stesso thread della funzione inoltrata?Python ThreadPoolExecutor: la richiamata è garantita per essere eseguita nello stesso thread della funzione inviata?

Ad esempio, ho provato questo con il seguente codice. L'ho eseguito più volte e mi è sembrato come func e callback eseguito sempre nella stessa discussione.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
     print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

Tuttavia, sembrava fallire quando ho rimosso i time.sleep(random.random()) dichiarazioni, cioè almeno alcune func funzioni e callbacksno fuga stesso thread.

Per un progetto su cui sto lavorando, il callback deve sempre essere eseguito sullo stesso thread della funzione inoltrata, quindi volevo essere sicuro che ciò sia garantito da TPE. (E anche i risultati del test senza il sonno casuale sembravano sconcertanti).

Ho guardato il source code for executors e non sembra che passiamo il thread al thread principale prima di eseguire il callback. Ma volevo solo essere sicuro.

risposta

6

La richiamata per un inoltrata a ThreadPoolExecutor verrà eseguita nello stesso thread dell'attività in esecuzione, ma solo se la richiamata viene aggiunta a Future prima che l'attività venga completata. Se si aggiunge la richiamata dopo le Future Completa, il callback eseguirà in qualunque discussione hai chiamato add_done_callback in Potete vedere questo osservando il add_done_callback fonte:.

def add_done_callback(self, fn): 
    """Attaches a callable that will be called when the future finishes. 

    Args: 
     fn: A callable that will be called with this future as its only 
      argument when the future completes or is cancelled. The callable 
      will always be called by a thread in the same process in which 
      it was added. If the future has already completed or been 
      cancelled then the callable will be called immediately. These 
      callables are called in the order that they were added. 
    """ 
    with self._condition: 
     if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 
      self._done_callbacks.append(fn) 
      return 
    fn(self) 

Se lo stato della Future indica che è annullato o finito, fn viene chiamato immediatamente nel thread corrente di esecuzione. In caso contrario, viene aggiunto a un elenco interno di callback da eseguire quando lo Future è completo.

Ad esempio:

>>> def func(*args): 
... time.sleep(5) 
... print("func {}".format(threading.current_thread())) 
>>> def cb(a): print("cb {}".format(threading.current_thread())) 
... 
>>> fut = ex.submit(func) 
>>> func <Thread(Thread-1, started daemon 140084551563008)> 
>>> fut = e.add_done_callback(cb) 
cb <_MainThread(MainThread, started 140084622018368)> 
+0

E che dire 'ProcessPoolExecutor'? Anche il 'get_ident' di callback è diverso – Winand