2014-07-23 7 views
7

Ho 2 attività personalizzate (TaskA e TaskB), entrambe ereditate da celery.Task. Scheduler avvia TaskA di tanto in tanto, e TaskA avvia volte TaskB con argomenti diversi ogni volta. Ma per qualche ragione, a volte lo stesso TaskB, con gli stessi argomenti, viene eseguito due volte allo stesso tempo e ciò causa problemi diversi con il database.Celery/Redis stesso task eseguito più volte in parallelo

class TaskA(celery.Task): 

    def run(self, *args, **kwargs): 
     objects = MyModel.objects.filter(processed=False)\ 
           .values_list('id', flat=True) 
     task_b = TaskB() 
     for o in objects: 
      o.apply_async(args=[o, ]) 

class TaskB(celery.Task): 

    def run(self, obj_id, *args, **kwargs): 
     obj = MyModel.objects.get(id=obj_id) 
     # do some stuff with obj 

cose che ho provato

Ho provato ad utilizzare celery.group nella speranza che possa risolvere questi problemi, ma tutto quello che ho sono stati errori, dicendo che run prende 2 argomenti e nessuno sono stati forniti.

Ecco come ho cercato di lanciare TaskB utilizzando celery.group:

# somewhere in TaskA 
task_b = TaskB() 
g = celery.group([task_b.s(id) for id in objects]) 
g.apply_async() 

ho provato anche in questo modo:

# somewhere in TaskA 
task_b = TaskB() 
g = celery.group([task_b.run(id) for id in objects]) 
g.apply_async() 

che esegue i compiti proprio lì, prima di g.apply_async().

Domanda

fa il problema viene da come mi lancio compiti o è qualcos'altro? È un comportamento normale?

Altre Informazioni

sulla mia macchina locale corro celery 3.1.13 con RabbitMQ 3.3.4, e sul server celery 3.1.13 piste con Redis 2.8.9. Sul computer locale non vedo alcun comportamento del genere, ogni operazione viene eseguita una volta. Sul server vedo da qualche parte tra 1 e 10 di questi compiti che vengono eseguiti due volte di seguito.

Questo è come io corro sedano su macchina locale e sul server:

celery_beat: celery -A proj beat -l info 

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50 

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200 

soluzione che funziona

ho introdotto un blocco su TaskB sulla base di quali argomenti essa ricevuto. Dopo circa 10 ore di test, vedo cosa viene eseguito esattamente due volte, ma il blocco impedisce la collisione sul database. Questo risolve il mio problema, ma vorrei ancora capire perché sta accadendo.

+0

Il tuo codice dovrebbe funzionare correttamente. Ho copiato [il tuo codice in un file come questo] (http://pastebin.com/f1gAf4R4) e tutte le attività sono state eseguite dopo aver chiamato 'TaskA(). Apply_async()'. Potresti postare il tuo traceback per vedere dove è un problema? – daniula

+0

il traceback proviene dal database. 'MyModel' ha un vincolo univoco su 2 campi. Quindi, quando l'attività viene eseguita per la prima volta e crea un nuovo oggetto, va tutto bene, ma poi la stessa attività viene eseguita nuovamente e tenta di creare nuovamente lo stesso oggetto e lancia 'IntegrityError'. – Neara

+0

Con il codice che hai postato è impossibile replicare il tuo problema. Penso che potresti provare a creare istanze TaskB separate per ogni attività in quanto potrebbe essere un problema. Prova: 'g = celery.group ([TaskB(). S (id) per id negli oggetti])' – daniula

risposta

2

Avete impostato il fanout_prefix e fanout_patterns come descritto nella documentazione Using Redis per Celery? Sto usando Celery con Redis e non sto vivendo questo problema.

+0

Ho riscontrato questo stesso problema, con un'attività che faceva la coda ad altre attività e quelle altre attività che venivano rilevate ed eseguite più volte. L'impostazione di 'fanout_prefix' e' fanout_patterns' come descritto sembra aver risolto il problema. Utilizzo di Celery 3.1.18 e Kombu 3.0.30 –