Ecco un modo per farlo senza la necessità di modificare la funzione worker
. L'idea è di avvolgere l'operatore in un'altra funzione, che chiamerà worker
in un thread in background, quindi attendere un risultato per timeout
secondi. Se il timeout scade, solleva un'eccezione, che bruscamente interrompere il filo worker
viene eseguito in:
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def worker(x, y, z):
pass # Do whatever here
def collectMyResult(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
out = res.get(timeout) # Wait timeout seconds for func to complete.
return out
except multiprocessing.TimeoutError:
print("Aborting due to timeout")
p.terminate()
raise
if __name__ == "__main__":
pool = multiprocessing.Pool()
featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments
for f in featureClass:
abortable_func = partial(abortable_worker, worker, timeout=3)
pool.apply_async(abortable_func, args=f,callback=collectMyResult)
pool.close()
pool.join()
Qualsiasi funzione che timeout aumenterà multiprocessing.TimeoutError
. Nota che ciò significa che la tua callback non verrà eseguita quando si verifica un timeout. Se questo non è accettabile, basta cambiare il blocco except
di abortable_worker
per restituire qualcosa invece di chiamare raise
.
Che aspetto ha "lavoratore"? Il modo più semplice per farlo con un 'multiprocessing.Pool' è quello di rendere' worker' interrompibile, ma ciò potrebbe non essere possibile, a seconda di cosa sta facendo. – dano
worker è una semplice funzione con una lista di input e una lista di output – farhawa
Cosa sta facendo in realtà, però? Presumo che stia iterando sulla lista, ma che tipo di operazioni sta facendo su ciascun articolo? Quanto dura ogni operazione? – dano