2014-09-26 21 views
6

Voglio interrompere tutti i thread da un singolo operatore.Come uccidere tutti i pool worker in multiprocesso?

Ho un pool di thread con 10 lavoratori:

def myfunction(i): 
    print(i) 
    if (i == 20): 
     sys.exit() 

p = multiprocessing.Pool(10, init_worker) 

for i in range(100): 
    p.apply_async(myfunction, (i,)) 

Il mio programma non si ferma e gli altri processi continuare a lavorare fino a quando tutti i 100 iterazioni sono completi. Voglio interrompere completamente il pool dall'interno del thread che chiama sys.exit(). Il modo in cui è attualmente scritto fermerà solo il lavoratore che chiama sys.exit().

risposta

11

Questo non funziona come previsto perché chiamare in un processo di lavoro interromperà solo il lavoratore. Non ha alcun effetto sul processo padre o sugli altri lavoratori, perché sono processi separati e l'aumento di SystemExit riguarda solo il processo corrente. È necessario inviare un segnale indietro al processo padre per dirgli che dovrebbe spegnersi. Un modo per fare questo per il vostro caso d'uso potrebbe essere quella di utilizzare un Event creata in un server multiprocessing.Manager:

import multiprocessing 

def myfunction(i, event): 
    if not event.is_set(): 
     print i 
    if i == 20: 
     event.set() 

if __name__ == "__main__": 
    p= multiprocessing.Pool(10) 
    m = multiprocessing.Manager() 
    event = m.Event() 
    for i in range(100): 
     p.apply_async(myfunction , (i, event)) 
    p.close() 

    event.wait() # We'll block here until a worker calls `event.set()` 
    p.terminate() # Terminate all processes in the Pool 

uscita:

0 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 

Come sottolineato nella risposta di Luca, c'è una gara qui : Non è garantito che tutti i lavoratori funzionino in ordine, quindi è possibile che myfunction(20, ..) venga eseguito prima di myfuntion(19, ..), ad esempio. È anche possibile che altri lavoratori dopo 20 vengano eseguiti prima che il processo principale possa agire sull'evento impostato. Ho ridotto la dimensione della finestra della gara aggiungendo la chiamata if not event.is_set(): prima di stampare i, ma esiste ancora.

+1

grazie mille +1 – N3TC4t

+0

cos'è il pool di variabili nel codice precedente? –

+0

@RajanChaudan Un errore di battitura! L'ho riparato ora. – dano

1

Non è possibile farlo.

Anche se si è in grado di terminare tutti i processi quando i == 20, non si è certi che siano stati stampati solo 20 numeri poiché i processi verranno eseguiti in un ordine non deterministico.

Se si desidera eseguire solo 20 processi, è necessario gestirlo dal processo principale (ad esempio il ciclo di controllo).