2013-07-04 24 views
18

Ho un compito check_orders che viene eseguito periodicamente. Crea un gruppo di attività in modo da poter calcolare il tempo necessario per l'esecuzione delle attività ed eseguire qualcosa quando sono terminate (questo è lo scopo di res.join [1] e grouped_subs). Le attività raggruppate sono coppie di compiti concatenati.Il sedano interrompe l'esecuzione di una catena

Quello che voglio è quando la prima attività non soddisfa una condizione (non riuscita) non esegue la seconda attività nella catena. Non riesco a capirlo per la vita di me e ritengo che questa sia una funzionalità di base per un gestore di code di lavoro. Quando provo le cose che ho commentato dopo [2] (sollevando eccezioni, rimuovendo le callback) ... rimaniamo bloccati su join() in check_orders per qualche ragione (rompe il gruppo). Ho provato a impostare ignore_result su False per tutte queste attività, ma ancora non funziona.

@task(ignore_result=True) 
def check_orders(): 
    # check all the orders and send out appropriate notifications 
    grouped_subs = [] 

    for thingy in things: 
     ... 

     grouped_subs.append(chain(is_room_open.subtask((args_sub_1,)), 
         notify.subtask((args_sub_2,), immutable=True))) 

    res = group(grouped_subs).apply_async() 

    res.join()   #[1] 
    logger.info('Done checking orders at %s' % current_task.request.id)) 

@task(ignore_result=True) 
def is_room_open(args_sub_1): 
    #something time consuming 
    if http_req_and_parse(args_sub_1): 
     # go on and do the notify task 
     return True 
    else: 
     # [2] 
     # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? 
     # None of the following things work: 
     # is_room_open.update_state(state='FAILURE') 
     # raise celery.exceptions.Ignore() 
     # raise Exception('spam', 'eggs') 
     # current_task.request.callbacks[:] = [] 

@task(ignore_result=True) 
def notify(args_sub_2): 
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful' 
    notify_user(args_sub_2) 

risposta

1

In primo luogo, sembra che se nella funzione esiste l'eccezione ignore_result non ti aiuta.

In secondo luogo, si utilizza immutabile = True Significa che il prossimo funzione (nel nostro caso è notificare) non prende argomenti aggiuntivi. Dovresti usare notify.subtask((args_sub_2,), immutable=False) ovviamente se adatto alla tua decisione.

In terzo luogo, è possibile utilizzare le scorciatoie:

notify.si(args_sub_2) invece notify.subtask((args_sub_2,), immutable=True)

e

is_room_open.s(args_sub_1) invece is_room_open.subtask((args_sub_1,))

Prova utilizzare il codice è:

@task 
def check_orders(): 
    # check all the orders and send out appropriate notifications 
    grouped_subs = [] 

    for thingy in things: 
     ... 

     grouped_subs.append(chain(is_room_open.s(args_sub_1), 
            notify.s(args_sub_2))) 

    res = group(grouped_subs).apply_async() 

    res.join()   #[1] 
    logger.info('Done checking orders at %s' % current_task.request.id)) 

@task 
def is_room_open(args_sub_1): 
    #something time consuming 
    if http_req_and_parse(args_sub_1): 
     # go on and do the notify task 
     return True 
    else: 
     # [2] 
     # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? 
     # None of the following things work: 
     # is_room_open.update_state(state='FAILURE') 
     # raise celery.exceptions.Ignore() 
     # raise Exception('spam', 'eggs') 
     # current_task.request.callbacks[:] = [] 
     return False 

@task 
def notify(result, args_sub_2): 
    if result: 
     # something else time consuming, only do this if the first part of the chain 
     # passed a test (the chained tasks before this were 'successful' 
     notify_user(args_sub_2) 
     return True 
    return False 

Se si vuole intercettare le eccezioni è necessario utilizzare callback come modo

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery 

@celery.task 
def log_error(task_id): 
    result = celery.AsyncResult(task_id) 
    result.get(propagate=False) # make sure result written. 
    with open(os.path.join('/var/errors', task_id), 'a') as fh: 
     fh.write('--\n\n%s %s %s' % (
      task_id, result.result, result.traceback)) 
+0

Grazie per i suggerimenti sulle scorciatoie. Sebbene funzioni, non risolve il mio problema. Voglio che il secondo compito non venga mai eseguito se il primo fallisce. Questa soluzione ha ancora il sovraccarico di avviare la seconda attività ogni volta indipendentemente dai risultati della prima attività. Voglio interrompere l'esecuzione della catena. – Salami

+0

ti ho capito. Se l'attività ha generato un'eccezione, l'esecuzione della catena si interrompe. Il suo comportamento di default. Non è necessario cercare una decisione speciale per questo. –

+0

@Alexander, sollevando l'eccezione NON funziona correttamente. "Quando provo le cose che ho commentato dopo [2] (sollevando eccezioni, rimuovendo le callback) ... rimaniamo bloccati su join() in check_orders per qualche ragione (rompe il gruppo)." – Salami

12

A mio parere si tratta di un caso d'uso comune che non riceve abbastanza amore nella documentazione.

Supponendo che si desideri interrompere una catena a metà mentre si sta ancora segnalando SUCCESSO come stato delle attività completate e non si invia alcun registro errori o quant'altro (altrimenti si può solo sollevare un'eccezione), un modo per ottenere ciò è:

@app.task(bind=True) # Note that we need bind=True for self to work 
def task1(self, other_args): 
    #do_stuff 
    if end_chain: 
     self.request.callbacks = None 
     return 
    #Other stuff to do if end_chain is False 

Quindi nel tuo esempio:

@app.task(ignore_result=True, bind=True) 
def is_room_open(self, args_sub_1): 
    #something time consuming 
    if http_req_and_parse(args_sub_1): 
     # go on and do the notify task 
     return True 
    else: 
     self.request.callbacks = None 

funzionerà. Si noti che invece di ignore_result=True e subtask() è possibile utilizzare la scorciatoia .si() come affermato da @ Abbasov-alexander

A cura di lavorare con la modalità EAGER, come suggerito da @PhilipGarnero nei commenti.

+1

Se si stanno eseguendo attività in modalità EAGER, quanto sopra interromperà l'attività. Ho sostituito 'self.request.callbacks [:] = []' da 'self.request.callbacks = None' e ora funziona in entrambi i casi. – PhilipGarnero

+0

Se funziona in entrambi i casi, suggeriamo di farlo. Grazie per il commento per migliorare la risposta :) – AntonioMO

+4

Apparentemente non funziona più per Celery 4.0, ma 'self.request.chain = None' fa. http://stackoverflow.com/questions/23793928/celery-clean-way-of-revoking-the-entire-chain-from-within-a-task/40579984#40579984 –

6

È incredibile come un caso così comune non è trattato in alcuna documentazione ufficiale.Ho dovuto affrontare lo stesso problema (ma usando shared_tasks con bind opzione, quindi abbiamo la visibilità di self oggetto), così ho scritto un decoratore personalizzato che gestisce automaticamente la revoca:

def revoke_chain_authority(a_shared_task): 
    """ 
    @see: https://gist.github.com/bloudermilk/2173940 
    @param a_shared_task: a @shared_task(bind=True) celery function. 
    @return: 
    """ 
    @wraps(a_shared_task) 
    def inner(self, *args, **kwargs): 
     try: 
      return a_shared_task(self, *args, **kwargs) 
     except RevokeChainRequested, e: 
      # Drop subsequent tasks in chain (if not EAGER mode) 
      if self.request.callbacks: 
       self.request.callbacks[:] = [] 
      return e.return_value 

    return inner 

è possibile utilizzarlo come segue :

@shared_task(bind=True) 
@revoke_chain_authority 
def apply_fetching_decision(self, latitude, longitude): 
    #... 

    if condition: 
     raise RevokeChainRequested(False) 

Vedere la spiegazione completa here. Spero che aiuti!

+0

Grazie. Ottima soluzione. – kstratis

+0

Sembra che ora la variabile 'callbacks' sia una tupla, quindi restituisce un errore quando si tenta di eseguire quell'operazione: ' self.request.callbacks [:] = [] '' 'line break' '' TypeError: l'oggetto 'tupla' non supporta l'assegnazione dell'oggetto ' – mccc