2014-11-01 23 views
18

Aggiungerei più attività alla coda di celery e attendere i risultati. Ho varie idee su come ottenerlo utilizzando una qualche forma di storage condiviso (memcached, redis, db, ecc.), Tuttavia, avrei pensato che fosse qualcosa che Celery può gestire automaticamente ma non trovo alcuna risorsa online.Aggiungere n attività alla coda di sedano e attendere i risultati

Codice esempio

def do_tasks(b): 
    for a in b: 
     c.delay(a) 

    return c.all_results_some_how() 

risposta

21

Per Sedano> = 3.0, taskset è deprecated a favore di group.

from celery import group 
from tasks import add 

job = group([ 
      add.s(2, 2), 
      add.s(4, 4), 
      add.s(8, 8), 
      add.s(16, 16), 
      add.s(32, 32), 
]) 

Attendere:

result = job.apply_async() 
result.join() 
16

Task.delay rendimenti AsyncResult. Utilizzare AsyncResult.get per ottenere il risultato di ogni attività.

Per fare ciò è necessario mantenere i riferimenti alle attività.

def do_tasks(b): 
    tasks = [] 
    for a in b: 
     tasks.append(c.delay(a)) 
    return [t.get() for t in tasks] 

Oppure si può utilizzare ResultSet:

def do_tasks(b): 
    rs = ResultSet([]) 
    for a in b: 
     rs.add(c.delay(a)) 
    return rs.get() 
+0

funzionato come un fascino Salva 'ResultSet' che richiede un elenco di risultati (o lista vuota) in esso del costruttore. Ho inviato una modifica al post per correggerla. – Prydie

+1

@Prydie, grazie per il vostro feedback e correzione. – falsetru

2

ho la sensazione non siete veramente vogliono il ritardo ma la caratteristica asincrona di sedano.

Penso che si vuole veramente un TaskSet:

from celery.task.sets import TaskSet 
from someapp.tasks import sometask 

def do_tasks(b): 
    job = TaskSet([sometask.subtask((a,)) for a in b]) 
    result = job.apply_async() 
    # might want to handle result.successful() == False 
    return result.join()