2013-02-27 7 views
16

Quando ho qualcosa di simile alla seguentesedano - concatenare gruppi e sotto-attività. -> esecuzione fuori ordine

group1 = group(task1.si(), task1.si(), task1.si()) 
group2 = group(task2.si(), task2.si(), task2.si()) 

workflow = chain(group1, group2, task3.si()) 

L'interpretazione intuitiva è che TASK3 dovrebbe eseguire solo dopo che tutte le attività del gruppo 2 hanno finito.

In realtà, l'attività 3 viene eseguita mentre il gruppo 1 è stato avviato ma non è ancora stato completato.

Cosa sto sbagliando?

+0

Qualche aggiornamento su questo con la nuova versione di sedano? – JohnnyM

risposta

11

In questo modo, nel sedano non è possibile concatenare due gruppi.
Ho il sospetto che questo è perché i gruppi incatenato con compiti diventano automaticamente un accordo
- docs> Sedano: http://docs.celeryproject.org/en/latest/userguide/canvas.html

concatenazione un gruppo insieme ad un altro compito aggiornerà automaticamente che sia un accordo:

I gruppi restituiscono un'attività principale. Quando concatenando due gruppi, sospetto che al termine del primo gruppo, l'accordo avvii il "task" di callback. Sospetto che questo "compito" sia in realtà il "compito genitore" del secondo gruppo. Sospetto inoltre che questa attività genitore venga completata non appena termina l'avvio di tutte le attività secondarie all'interno del gruppo e, di conseguenza, l'elemento successivo dopo l'esecuzione del secondo gruppo.

Per dimostrarlo, ecco alcuni esempi di codice. Avrai bisogno di avere già un'istanza di sedani in esecuzione.

# celery_experiment.py 

from celery import task, group, chain, chord 
from celery.signals import task_sent, task_postrun, task_prerun 

import time 
import logging 

import random 
random.seed() 

logging.basicConfig(level=logging.DEBUG) 

### HANDLERS ###  
@task_prerun.connect() 
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):  
    try: 
     logging.info('[%s] starting' % kwargs['id']) 
    except KeyError: 
     pass 

@task_postrun.connect() 
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): 
    try:  
     logging.info('[%s] finished' % kwargs['id']) 
    except KeyError: 
     pass 


def random_sleep(id): 
    slp = random.randint(1, 3) 
    logging.info('[%s] sleep for %ssecs' % (id, slp)) 
    time.sleep(slp) 

@task() 
def thing(id): 
    logging.info('[%s] begin' % id) 
    random_sleep(id) 
    logging.info('[%s] end' % id) 


def exec_exp(): 
    st = thing.si(id='st') 
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),] 
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),] 
    st2 = thing.si(id='st2') 
    st3 = thing.si(id='st3') 
    st4 = thing.si(id='st4') 

    grp1 = group(st_arr) 
    grp2 = group(st_arr2) 

    # chn can chain two groups together because they are seperated by a single subtask 
    chn = (st | grp1 | st2 | grp2 | st3 | st4) 

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes 
    #chn2 = (st | st2 | grp1 | grp2 | st3 | st4) 

    r = chn() 
    #r2 = chn2() 
+0

Grazie per questo. Sfortunatamente per me, il mio flusso di lavoro non mi consente di utilizzare un'attività "rilevante" tra i gruppi. Così ho finito per creare un falso compito 'def fake_celery_task(): pass' per correre tra i gruppi ... – lukik

13

Ho lo stesso problema con il sedano, cercando di avere un flusso di lavoro in cui il primo passo è "generare un milione di attività". Tentativi gruppi di gruppi, sottoattività, alla fine il mio step2 prende il via prima che il passaggio 1 sia finito.

Lunga storia breve mi avrebbe trovato una soluzione con l'uso di corde e di una stazione di finitura muto:

@celery.task 
def chordfinisher(*args, **kwargs): 
    return "OK" 

Senza fare niente, ma mi permette di fare questo:

tasks = [] 
for id in ids: 
    tasks.append(mytask.si(id)) 
step1 = chord(group(tasks), chordfinisher.si()) 

step2 = ... 

workflow = chain(step1, step2) 

Originariamente Volevo avere step1 in un sottoattività ma per la stessa ragione sospettata, l'azione di chiamare un gruppo termina, l'attività è considerata finita, e il mio flusso di lavoro si sposta su ...

Se som eone ha qualcosa di meglio, sono interessato!

+1

Ciao, questo è praticamente quello che ho finito per fare. Una cosa da tenere a mente, è necessario che il dumbfinisher restituisca il risultato dell'esecuzione di gruppo. altrimenti, se qualcosa nel gruppo fallisce, la catena non si fermerà al punto 1. (questo può o non può essere quello che vuoi) –