Ho 2 attività personalizzate (TaskA
e TaskB
), entrambe ereditate da celery.Task
. Scheduler avvia TaskA
di tanto in tanto, e TaskA
avvia volte TaskB
con argomenti diversi ogni volta. Ma per qualche ragione, a volte lo stesso TaskB
, con gli stessi argomenti, viene eseguito due volte allo stesso tempo e ciò causa problemi diversi con il database.Celery/Redis stesso task eseguito più volte in parallelo
class TaskA(celery.Task):
def run(self, *args, **kwargs):
objects = MyModel.objects.filter(processed=False)\
.values_list('id', flat=True)
task_b = TaskB()
for o in objects:
o.apply_async(args=[o, ])
class TaskB(celery.Task):
def run(self, obj_id, *args, **kwargs):
obj = MyModel.objects.get(id=obj_id)
# do some stuff with obj
cose che ho provato
Ho provato ad utilizzare celery.group
nella speranza che possa risolvere questi problemi, ma tutto quello che ho sono stati errori, dicendo che run
prende 2 argomenti e nessuno sono stati forniti.
Ecco come ho cercato di lanciare TaskB
utilizzando celery.group
:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()
ho provato anche in questo modo:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()
che esegue i compiti proprio lì, prima di g.apply_async()
.
Domanda
fa il problema viene da come mi lancio compiti o è qualcos'altro? È un comportamento normale?
Altre Informazioni
sulla mia macchina locale corro celery 3.1.13
con RabbitMQ 3.3.4
, e sul server celery 3.1.13
piste con Redis 2.8.9
. Sul computer locale non vedo alcun comportamento del genere, ogni operazione viene eseguita una volta. Sul server vedo da qualche parte tra 1 e 10 di questi compiti che vengono eseguiti due volte di seguito.
Questo è come io corro sedano su macchina locale e sul server:
celery_beat: celery -A proj beat -l info
celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50
celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
soluzione che funziona
ho introdotto un blocco su TaskB
sulla base di quali argomenti essa ricevuto. Dopo circa 10 ore di test, vedo cosa viene eseguito esattamente due volte, ma il blocco impedisce la collisione sul database. Questo risolve il mio problema, ma vorrei ancora capire perché sta accadendo.
Il tuo codice dovrebbe funzionare correttamente. Ho copiato [il tuo codice in un file come questo] (http://pastebin.com/f1gAf4R4) e tutte le attività sono state eseguite dopo aver chiamato 'TaskA(). Apply_async()'. Potresti postare il tuo traceback per vedere dove è un problema? – daniula
il traceback proviene dal database. 'MyModel' ha un vincolo univoco su 2 campi. Quindi, quando l'attività viene eseguita per la prima volta e crea un nuovo oggetto, va tutto bene, ma poi la stessa attività viene eseguita nuovamente e tenta di creare nuovamente lo stesso oggetto e lancia 'IntegrityError'. – Neara
Con il codice che hai postato è impossibile replicare il tuo problema. Penso che potresti provare a creare istanze TaskB separate per ogni attività in quanto potrebbe essere un problema. Prova: 'g = celery.group ([TaskB(). S (id) per id negli oggetti])' – daniula