2015-07-11 23 views
11

A prima vista mi è piaciuta molto la funzione "Batch" in Celery perché devo raggruppare una quantità di ID prima di chiamare un'API (altrimenti potrei essere cacciato fuori).La catena di sedici non funziona con i lotti

Sfortunatamente, quando si esegue un test un po ', le attività in batch non sembrano giocare bene con il resto delle primitive Canvas, in questo caso, le catene. Per esempio:

@a.task(base=Batches, flush_every=10, flush_interval=5) 
def get_price(requests): 
    for request in requests: 
     a.backend.mark_as_done(request.id, 42, request=request) 
     print "filter_by_price " + str([r.args[0] for r in requests]) 

@a.task 
def completed(): 
    print("complete") 

Così, con questo semplice flusso di lavoro:

chain(get_price.s("ID_1"), completed.si()).delay() 

vedo questa uscita:

[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0 
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors 
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone 
[2015-07-11 16:16:21,449: WARNING/MainProcess] [email protected] ready. 
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1'] 

dopo 5 secondi, filter_by_price() viene innescato proprio come previsto. Il problema è che completed() non viene mai richiamato.

Qualche idea di cosa potrebbe succedere qui? Se non si usano i lotti, quale potrebbe essere un approccio decente per risolvere questo problema?

PS: Ho impostato CELERYD_PREFETCH_MULTIPLIER=0 come dicono i documenti.

+0

Solo per la cronaca, ho bisogno così male la cosa dosaggio che ho finito per usare RabbitMQ + Pika da solo con un modello molto semplice operaio che memorizza i messaggi. Se qualcuno fosse interessato, ho a disposizione il codice sorgente, evviva. –

risposta

5

Sembra che il comportamento delle attività batch sia significativamente diverso dalle normali attività. Le attività batch non emettono nemmeno segnali come task_success.

Poiché è necessario chiamare l'attività completed dopo get_price, è possibile chiamarlo direttamente dallo get_price stesso.

@a.task(base=Batches, flush_every=10, flush_interval=5) 
def get_price(requests): 
    for request in requests: 
     # do something 
    completed.delay()