2012-04-09 7 views
5

Ho due processi separati di celeryd in esecuzione sul mio server, gestiti da supervisor. Essi sono impostati per l'ascolto su code separate in quanto tale:Instradare il task di sedano su una coda specifica

[program:celeryd1] 
command=/path/to/celeryd --pool=solo --queues=queue1 
... 

[program:celeryd2] 
command=/path/to/celeryd --pool=solo --queues=queue2 
... 

E il mio celeryconfig simile a questa:

from celery.schedules import crontab 

BROKER_URL = "amqp://guest:[email protected]:5672//" 

CELERY_DISABLE_RATE_LIMITS = True 
CELERYD_CONCURRENCY = 1 
CELERY_IGNORE_RESULT = True 

CELERY_DEFAULT_QUEUE = 'default' 
CELERY_QUEUES = { 
    'default': { 
     "exchange": "default", 
     "binding_key": "default", 
    }, 
    'queue1': { 
     'exchange': 'queue1', 
     'routing_key': 'queue1', 
    }, 
    'queue2': { 
     'exchange': 'queue2', 
     'routing_key': 'queue2', 
    }, 
} 

CELERY_IMPORTS = ('tasks',) 

CELERYBEAT_SCHEDULE = { 
    'first-queue': { 
     'task': 'tasks.sync', 
     'schedule': crontab(hour=02, minute=00), 
     'kwargs': {'client': 'client_1'}, 
     'options': {'queue': 'queue1'}, 
    }, 
    'second-queue': { 
     'task': 'tasks.sync', 
     'schedule': crontab(hour=02, minute=00), 
     'kwargs': {'client': 'client_2'}, 
     'options': {'queue': 'queue1'}, 
    }, 
} 

Tutti tasks.sync operazioni devono essere indirizzati a una coda specifica (e il progresso di conseguenza celeryd). Ma quando provo a eseguire manualmente l'attività con sync.apply_async(kwargs={'client': 'value'}, queue='queue1'), entrambi i lavoratori del sedano riprendono l'attività. Come posso rendere il percorso delle attività alla coda corretta e solo essere eseguito dal worker che è associato alla coda?

risposta

6

Si sta eseguendo solo un'istanza di celerybeat, giusto?

Forse hai dei vecchi binding di coda che si scontrano con questo? Provare a fare funzionare rabbitmqctl list_queues e rabbitmqctl list_bindings, forse resettare i dati nel broker per iniziare da zero.

L'esempio che hai qui dovrebbe funzionare e funziona per me quando l'ho appena provato.

Suggerimento: poiché si utilizza lo stesso valore di scambio e chiave_chiave come nome coda, non è necessario elencarli in modo esplicito in CELERY_QUEUES. Quando CELERY_CREATE_MISSING_QUEUES è attivo (che è per impostazione predefinita) le code verranno automaticamente create esattamente come si dispone di se si esegue solo celeryd -Q queue1 o si invia un'attività a una coda non definita.