2015-04-07 11 views
8

Sto cercando di utilizzare il pacchetto di multiprocessing Python in questo modo:Come posso interrompere un'attività in un multiprocessing.Pool dopo un timeout?

featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments 
for f in featureClass: 
    pool .apply_async(worker, args=f,callback=collectMyResult) 
pool.close() 
pool.join 

dai processi del pool voglio evitare di attesa quelle che prendono più di 60 anni per restituire il suo risultato. È possibile?

+0

Che aspetto ha "lavoratore"? Il modo più semplice per farlo con un 'multiprocessing.Pool' è quello di rendere' worker' interrompibile, ma ciò potrebbe non essere possibile, a seconda di cosa sta facendo. – dano

+0

worker è una semplice funzione con una lista di input e una lista di output – farhawa

+0

Cosa sta facendo in realtà, però? Presumo che stia iterando sulla lista, ma che tipo di operazioni sta facendo su ciascun articolo? Quanto dura ogni operazione? – dano

risposta

10

Ecco un modo per farlo senza la necessità di modificare la funzione worker. L'idea è di avvolgere l'operatore in un'altra funzione, che chiamerà worker in un thread in background, quindi attendere un risultato per timeout secondi. Se il timeout scade, solleva un'eccezione, che bruscamente interrompere il filo worker viene eseguito in:

import multiprocessing 
from multiprocessing.dummy import Pool as ThreadPool 
from functools import partial 

def worker(x, y, z): 
    pass # Do whatever here 

def collectMyResult(result): 
    print("Got result {}".format(result)) 

def abortable_worker(func, *args, **kwargs): 
    timeout = kwargs.get('timeout', None) 
    p = ThreadPool(1) 
    res = p.apply_async(func, args=args) 
    try: 
     out = res.get(timeout) # Wait timeout seconds for func to complete. 
     return out 
    except multiprocessing.TimeoutError: 
     print("Aborting due to timeout") 
     p.terminate() 
     raise 

if __name__ == "__main__": 
    pool = multiprocessing.Pool() 
    featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments 
    for f in featureClass: 
     abortable_func = partial(abortable_worker, worker, timeout=3) 
     pool.apply_async(abortable_func, args=f,callback=collectMyResult) 
    pool.close() 
    pool.join() 

Qualsiasi funzione che timeout aumenterà multiprocessing.TimeoutError. Nota che ciò significa che la tua callback non verrà eseguita quando si verifica un timeout. Se questo non è accettabile, basta cambiare il blocco except di abortable_worker per restituire qualcosa invece di chiamare raise.

+0

Questo è esattamente ciò di cui ho bisogno! grazie – farhawa

+0

Solo un'altra domanda, qual è il numero di processi paralleli nel tuo codice? – farhawa

+0

@wajdi Utilizza il numero predefinito di processi, che è sempre uguale al numero di CPU sulla macchina che esegue lo script. Se si desidera specificare il numero, passarlo al costruttore 'multiprocessing.Pool':' pool = multiprocessing.Pool (4) '. – dano

0

possiamo usare gevent.Timeout per impostare il tempo di esecuzione del worker. gevent tutorial

from multiprocessing.dummy import Pool 
#you should install gevent. 
from gevent import Timeout 
from gevent import monkey 
monkey.patch_all() 
import time 

def worker(sleep_time): 
    try: 

     seconds = 5 # max time the worker may run 
     timeout = Timeout(seconds) 
     timeout.start() 
     time.sleep(sleep_time) 
     print "%s is a early bird"%sleep_time 
    except: 
     print "%s is late(time out)"%sleep_time 

pool = Pool(4) 

pool.map(worker, range(10)) 


output: 
0 is a early bird 
1 is a early bird 
2 is a early bird 
3 is a early bird 
4 is a early bird 
8 is late(time out) 
5 is late(time out) 
6 is late(time out) 
7 is late(time out) 
9 is late(time out) 
+1

Si prega di fornire alcuni commenti testuali. –

+1

perché è necessario il patch per le scimmie? questo è solo qualcosa che gevent deve fare per rendere le cose non bloccanti? – grisaitis

+0

Questo non ha funzionato per me. La patch delle scimmie ha generato degli errori – Kat