2015-10-13 20 views
10

Dire che ho due compiti: sedanocome gestire gli errori in Task.map di sedano

@celery.task 
def run_flakey_things(*args, **kwargs): 
    return run_flakey_and_synchronous_thing.map(
     xrange(10) 
    ).apply_async() 


@celery.task 
def run_flakey_and_synchronous_thing(a): 
    if a % 5: 
     return a 
    raise RuntimeError(a) 

Quindi, quando si va a correre run_flakey_things essa cadrà sopra subito perché il primo elemento della sequenza solleva un'eccezione . Quello che vorrei è eseguire l'attività per tutti gli elementi nella sequenza nell'ordine come mappa, ma continuare a funzionare su eccezione, generando una nuova eccezione una volta completate tutte.

L'ideale sarebbe se potessi aggiungere un on_failure all'oggetto xmap prima di applicarlo, ma xmap non sembra essere un oggetto task completo.

risposta

0

È possibile modificare il valore restituito per indicare e propagare gli errori. In questo modo:

import traceback 

@celery.task 
def run_flakey_things(*args, **kwargs): 
    return run_flakey_and_synchronous_thing.map(
     xrange(10) 
    ).apply_async() 


@celery.task 
def run_flakey_and_synchronous_thing(a): 
    d = {'value': None, 'error': None} 
    try: 
     if a % 5: 
      d['value'] = a 
    except: 
     d['error'] = traceback.format_exc() 
    return d 

È quindi possibile fare un paio di cose:

1) Cambia i tuoi run_flakey_things alle cose di gruppo con e senza errori, tornare quelli senza errori e segnalare quelle con errori.

2) Gestire questo comportamento in qualsiasi cosa sta chiamando run_flakey_things

0

Non ho usato il sedano. Tuttavia potresti aggiungere un metodo __call__ alla sottoattività? Qualcosa di simile:

class MyTask: 
    def __call__(self, *args, **kwargs): 
     try: 
      super(self, MyTask).__call__(*args, **kwargs) 
     except Exception, exc: 
      # store exc to be raised later in the main task 

@celery.task(base=MyTask): 
def run_flakey_and_synchronous_thing(a): 
    # ... 

Poi per il principale run_flakey_things compito, forse ignorare apply_async() per recuperare l'eccezione memorizzati, come in How to override __call__ in celery on main?.