2014-12-18 11 views
6

consideri il file sample.py contiene il codice seguente:imap_unordered() riaggancia se iterable genera un errore

from multiprocessing import Pool 

def sample_worker(x): 
    print "sample_worker processes item", x 
    return x 

def get_sample_sequence(): 
    for i in xrange(2,30): 
     if i % 10 == 0: 
      raise Exception('That sequence is corrupted!') 
     yield i 

if __name__ == "__main__": 
    pool = Pool(24) 
    try: 
     for x in pool.imap_unordered(sample_worker, get_sample_sequence()): 
      print "sample_worker returned value", x 
    except: 
     print "Outer exception caught!" 
    pool.close() 
    pool.join() 
    print "done" 

Quando eseguire esso, ottengo il seguente output:

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner 
    self.run() 
    File "C:\Python27\lib\threading.py", line 763, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "C:\Python27\lib\multiprocessing\pool.py", line 338, in _handle_tasks 
    for i, task in enumerate(taskseq): 
    File "C:\Python27\lib\multiprocessing\pool.py", line 278, in <genexpr> 
    self._taskqueue.put((((result._job, i, func, (x,), {}) 
    File "C:\Users\renat-nasyrov\Desktop\sample.py", line 10, in get_sample_sequence 
    raise Exception('That sequence is corrupted!') 
Exception: That sequence is corrupted! 

Dopo di che, l'applicazione riaggancia. Come posso gestire la situazione senza rotture?

+0

Il tuo rientro è sbagliato ... – tamasgal

+0

@septi: corretto, grazie. – Pehat

+0

Ho passato il codice con il debugger e ha funzionato come previsto. Tuttavia, quando lo eseguo, ottengo il comportamento che descrivi. Quindi forse c'è un problema di concorrenza in imap_unordered? – phobic

risposta

2

Come detto in precedenza il tuo rientro è (ancora) sbagliato. Rientra la dichiarazione di rendimento in modo che io rientri nel suo ambito. Io non sono del tutto sicuro di ciò che accade realmente nel codice, ma ottenendo una variabile che è fuori del campo di applicazione non sembra una buona idea:

from multiprocessing import Pool 

def sample_worker(x): 
    print "sample_worker processes item", x 
    return x 

def get_sample_sequence(): 
    for i in xrange(2,30): 
     if i % 10 == 0: 
      raise Exception('That sequence is corrupted!') 
     yield i # fixed 

if __name__ == "__main__": 
    pool = Pool(24) 
    try: 
     for x in pool.imap_unordered(sample_worker, get_sample_sequence()): 
      print "sample_worker returned value", x 
    except: 
     print "Outer exception caught!" 
    pool.close() 
    pool.join() 
    print "done" 

per gestire le eccezioni nel generatore, è possibile utilizzare un wrapper come questo:

import logging 
def robust_generator(): 
    try: 
     for i in get_sample_sequence(): 
      logging.debug("yield "+str(i)) 
      yield i 
    except Exception, e: 
     logging.exception(e) 
     raise StopIteration() 
+0

Grazie per la tua risposta, mi è davvero mancato quel trattino, ma non è questo il problema. Il problema è che se il generatore solleva un'eccezione, il tutto viene riattaccato. Un generatore robusto semplicemente inghiotte l'eccezione, dando come risultato una gestione parziale della sequenza senza alcun avviso (eccetto per la voce di registro). In generale, questo non è un comportamento desiderabile. Idealmente, dovrebbe generare un'eccezione catrabile. – Pehat

+0

Il rob_generator converte l'eccezione in un'eccezione StopIteration, che viene gestita in modo appropriato. Invece di aumentare StopIteration(), è possibile inserire il codice di gestione degli errori. Puoi persino ignorare l'eccezione e riprovare, anche se non sono sicuro che tu possa semplicemente continuare a ricevere gli elementi da un generatore una volta che ha generato un'eccezione. – phobic

0

Non so che ti aspetti che accada. Perché una funzione di tipo map dovrebbe sapere cosa fare in caso di un'eccezione. L'unica cosa sensata che può fare non è gestire l'eccezione. Se è necessario gestire le eccezioni, è necessario comunicare al generatore come gestire l'eccezione. Il problema non è che il imap_unordered abbia perso il suo posto, ma il fatto che il generatore abbia perso il suo posto. Il generatore non saprà dove riprendere quando verrà richiesto un altro oggetto.

+0

Seguendo la tua logica, perché dovrebbe funzionare di qualsiasi tipo cosa fare in caso di eccezione? In effetti, lanciare eccezioni è il modo di generare errori in un posto e gestirli nell'altro posto. Nel caso generale (e anche nel mio caso), non ho il permesso di modificare il codice del generatore, in modo che non possa dirgli come gestire le eccezioni. Nessuno chiede al generatore di riprendere dopo aver sollevato un'eccezione - nella maggior parte dei casi è tecnicamente impossibile - ma se si è verificato un errore, il codice del chiamante dovrebbe esserne a conoscenza, né riagganciare né sopprimere l'errore. – Pehat