2014-06-23 9 views
13

Voglio applicare una funzione in parallelo usando multiprocessing.Pool. Il problema è che se una chiamata a una funzione attiva un errore di segmentazione, il Pool si blocca per sempre. Qualcuno ha idea di come posso creare un Pool che rileva quando succede qualcosa di simile e genera un errore?multiprocessing.Pool si blocca se il figlio causa un errore di segmentazione

L'esempio seguente mostra come riprodurre esso (richiede scikit-learn> 0,14)

import numpy as np 
from sklearn.ensemble import gradient_boosting 
import time 

from multiprocessing import Pool 

class Bad(object): 
    tree_ = None 


def fit_one(i): 
    if i == 3: 
     # this will segfault              
     bad = np.array([[Bad()] * 2], dtype=np.object) 
     gradient_boosting.predict_stages(bad, 
             np.random.rand(20, 2).astype(np.float32), 
             1.0, np.random.rand(20, 2)) 
    else: 
     time.sleep(1) 
    return i 


pool = Pool(2) 
out = pool.imap_unordered(fit_one, range(10)) 
# we will never see 3 
for o in out: 
    print o 
+2

Risolto il problema di segmentazione? Di solito i segoult sono causati da un accesso di memoria non valido, che è un comportamento _undefined_ e non garantito per causare un segfault. –

+0

Nessuna risposta, ma posso dire che joblib.Parallel sembra rimanere per sempre. Da quello che posso dire, non c'è modo di restituire il segfault o aggiungere un timeout "watchdog" in multiprocessing. –

+1

In realtà, forse puoi aggiungere un decoratore di timeout? Come mostrato qui: http://code.activestate.com/recipes/577028/ –

risposta

1

Invece di utilizzare Pool().imap() forse si preferisce creare manualmente processi figli da soli con Process(). Scommetto che l'oggetto restituito ti consentirebbe di ottenere lo stato di vividezza di qualsiasi bambino. Saprai se riagganciano.

0

Non ho eseguito il tuo esempio per vedere se può gestire l'errore, ma provare i futuri concorrenti. Sostituisci semplicemente my_function (i) con fit_one (i). Mantenere la struttura __name__=='__main__':. i futuri concorrenti sembrano aver bisogno di questo. Il codice qui sotto è testato sulla mia macchina, quindi spero che funzionerà direttamente sul tuo.

import concurrent.futures 

def my_function(i): 
    print('function running') 
    return i 

def run(): 
    number_processes=4 
    executor = concurrent.futures.ProcessPoolExecutor(number_processes) 
    futures = [executor.submit(my_function,i) for i in range(10)] 
    concurrent.futures.wait(futures) 

    for f in futures: 
     print(f.result()) 

if __name__ == '__main__': 
    run() 
+0

Richiede Python 3.2 – Ninga

+0

Sto solo pensando che potrebbe funzionare perché puoi chiamare tutti i tipi di metodi sui "futuri" itterabili che vengono restituiti dopo il completamento di tutti i processi. Quindi potrebbe essere in grado di prendere l'errore nel suo passo. – Ninga

0

Come descritto nei commenti, questo solo funziona in Python 3 se si utilizza concurrent.Futures.ProcessPoolExecutor invece di multiprocessing.Pool.

Se sei bloccato su Python 2, l'opzione migliore che ho trovato è quello di utilizzare l'argomento timeout sugli oggetti risultato restituiti da Pool.apply_async e Pool.map_async. Per esempio:

pool = Pool(2) 
out = pool.map_async(fit_one, range(10)) 
for o in out: 
    print o.get(timeout=1000) # allow 1000 seconds max 

Questo funziona fino a quando si dispone di un limite superiore per quanto tempo un processo figlio dovrebbe prendere per completare un compito.

1

Questo è un known bug, issue #22393, in Python. Non c'è soluzione significativa se si utilizza multiprocessing.pool finché non viene corretto. Una patch è disponibile a quel link, ma non è ancora stata integrata nella versione principale, quindi nessuna versione stabile di Python risolve il problema.