2014-07-16 13 views
23

Sto cercando di capire un po 'cosa sta succedendo dietro le quinte quando si utilizza il metodo apply_sync di un pool di multiprocessing.Chi esegue la richiamata quando si utilizza il metodo apply_async di un lotto di multiprocessing?

Chi esegue il metodo di richiamata? È il processo principale che ha chiamato apply_async?

Diciamo che invio un intero gruppo di comandi apply_async con i callback e quindi proseguo con il mio programma. Il mio programma sta ancora facendo le cose quando termina l'inizio di apply_async. Come viene eseguito il callback come "processo principale" mentre il processo principale è ancora occupato con lo script?

Ecco un esempio.

import multiprocessing 
import time 

def callback(x): 
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x) 

def func(x): 
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x) 
    return x 

pool = multiprocessing.Pool() 

args = range(20) 

for a in args: 
    pool.apply_async(func, (a,), callback=callback) 

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name) 

t0 = time.time() 
while time.time() - t0 < 60: 
    pass 

print 'Finished with the script' 

L'output è simile

PoolWorker-1 esegue func con arg 0

PoolWorker-2 esegue func con arg 1

PoolWorker-3 esecuzione func con arg 2

MainProcess andare a dormire per un minuto < - processo principale è occupato

PoolWorker-4 func esecuzione con arg 3

PoolWorker-1 esegue func con arg 4

PoolWorker-2 esegue func con arg 5

PoolWorker-3 esecuzione func con arg 6

PoolWorker-4 esecuzione func con arg 7

MainProcess esecuzione callback w con arg 0 < - processo principale che esegue la richiamata mentre è ancora nel ciclo while !!

MainProcess callback esecuzione con arg 1

MainProcess esecuzione richiamata con arg 2

MainProcess callback esecuzione con arg 3

MainProcess callback esecuzione con arg 4

PoolWorker-1 func esecuzione con arg 8

...

finito con lo script

Come viene MainProcess in esecuzione la richiamata mentre è nel bel mezzo di quel ciclo while ??

Esiste questa dichiarazione sul callback nella documentazione per multiprocessing.Pool che sembra un suggerimento ma non lo capisco.

apply_async (func [, args [, kwds [, callback]]])

Una variante del metodo apply() che restituisce un oggetto risultato.

Se la callback è specificata, dovrebbe essere un callable che accetta un singolo argomento. Quando il risultato diventa pronto, viene applicato il callback (a meno che la chiamata non abbia avuto esito positivo). la callback dovrebbe essere completata immediatamente poiché altrimenti il ​​thread che gestisce i risultati verrà bloccato.

risposta

25

V'è infatti un accenno nella documentazione:

callback dovrebbe completare immediatamente poiché altrimenti il ​​thread che gestisce i risultati otterrà bloccati.

I callback sono gestite nel processo principale, ma stanno corrono nel proprio thread separato. Quando si crea un Pool in realtà crea un paio di Thread oggetti internamente:

class Pool(object): 
    Process = Process 

    def __init__(self, processes=None, initializer=None, initargs=(), 
       maxtasksperchild=None): 
     self._setup_queues() 
     self._taskqueue = Queue.Queue() 
     self._cache = {} 
     ... # stuff we don't care about 
     self._worker_handler = threading.Thread(
      target=Pool._handle_workers, 
      args=(self,) 
      ) 
     self._worker_handler.daemon = True 
     self._worker_handler._state = RUN 
     self._worker_handler.start() 

     self._task_handler = threading.Thread(
      target=Pool._handle_tasks, 
      args=(self._taskqueue, self._quick_put, self._outqueue, 
        self._pool, self._cache) 
      ) 
     self._task_handler.daemon = True 
     self._task_handler._state = RUN 
     self._task_handler.start() 

     self._result_handler = threading.Thread(
      target=Pool._handle_results, 
      args=(self._outqueue, self._quick_get, self._cache) 
      ) 
     self._result_handler.daemon = True 
     self._result_handler._state = RUN 
     self._result_handler.start() 

Il filo interessante per noi è _result_handler; arriveremo a perché a breve.

commutazione ingranaggi per un secondo, quando si esegue apply_async, si crea un oggetto ApplyResult internamente per gestire ottenere il risultato da parte del bambino:

def apply_async(self, func, args=(), kwds={}, callback=None): 
    assert self._state == RUN 
    result = ApplyResult(self._cache, callback) 
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) 
    return result 

class ApplyResult(object): 

    def __init__(self, cache, callback): 
     self._cond = threading.Condition(threading.Lock()) 
     self._job = job_counter.next() 
     self._cache = cache 
     self._ready = False 
     self._callback = callback 
     cache[self._job] = self 


    def _set(self, i, obj): 
     self._success, self._value = obj 
     if self._callback and self._success: 
      self._callback(self._value) 
     self._cond.acquire() 
     try: 
      self._ready = True 
      self._cond.notify() 
     finally: 
      self._cond.release() 
     del self._cache[self._job] 

Come si può vedere, il metodo _set è quello che le estremità passato effettivamente l'esecuzione dello callback passato, supponendo che l'attività fosse andata a buon fine. Si noti inoltre che si aggiunge a un valore globale cache dict alla fine di __init__.

Ora, torna all'oggetto filetto _result_handler. Quell'oggetto chiama la funzione _handle_results, che assomiglia a questo:

while 1: 
     try: 
      task = get() 
     except (IOError, EOFError): 
      debug('result handler got EOFError/IOError -- exiting') 
      return 

     if thread._state: 
      assert thread._state == TERMINATE 
      debug('result handler found thread._state=TERMINATE') 
      break 

     if task is None: 
      debug('result handler got sentinel') 
      break 

     job, i, obj = task 
     try: 
      cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called! 
     except KeyError: 
      pass 

     # More stuff 

E 'un ciclo che solo tira i risultati di figli di coda, rileva che la voce per esso in cache e chiede _set, che esegue la richiamata. È in grado di funzionare anche se si è in un ciclo perché non è in esecuzione nel thread principale.

+0

Grazie Dano per aver trovato il tempo di scrivere una risposta così dettagliata! Se ho capito bene, il pool crea un * singolo * nuovo thread (il result_handler) il cui compito è quello di attendere che apply_async sia completato e quindi chiama il callback nel thread del result_handler (che fa parte del MainProcess). I callback (per un oggetto pool singolo) verranno chiamati in sequenza? Cioè Un gruppo di apply_async può finire insieme ma i callbacks verranno eseguiti uno ad uno in serie dal result_handler? – Alex

+1

Un'altra domanda. Cosa succede se la funzione di callback e lo script principale funzionano entrambi con gli stessi oggetti (nel MainProcess)? Potrebbe esserci un comportamento imprevedibile? Cioè se il callback e qualcosa più avanti nello script principale tentano entrambi di scrivere sullo stesso file o modificare lo stesso array. Quando viene effettivamente eseguito il callback, chissà cosa farà lo script principale in quel momento. – Alex

+4

@Alex Sì, i callback saranno eseguiti in modo squenziale.Il thread '_result_handler' estrae una task completata dalla coda, chiama' _set' (che esegue la callback), quindi passa a quella successiva. Questo è il motivo per cui la documentazione dice di assicurarsi che il callback sia completato immediatamente; l'esecuzione della richiamata blocca altri risultati dall'elaborazione. – dano