2013-03-15 5 views
7
import celery 
def temptask(n): 
    header=list(tempsubtask.si(i) for i in range(n)) 
    callback=templink.si('printed at last?') 
    r = celery.chord(celery.group(header))(callback) 
    return r 

@task() 
def tempsubtask(i): 
    print i  
    for x in range(i): 
     time.sleep(2) 
     current_task.update_state(
      state='PROGRESS', meta={'completed': x, 'total': i }) 

@task() 
def templink(x): 
    print 'this should be run at last %s'%x 

#executing temptask 
r = temptask(100) 

Desidero accedere allo stato di avanzamento aggiornato da tempsubtask. Come posso raggiungerlo?Come tenere traccia dell'avanzamento delle singole attività all'interno di un gruppo che forma l'intestazione di un accordo in sedano?

risposta

4

Dopo ore di googling mi sono imbattuto su www.manasupo.com/2012/03/chord-progress-in-celery.html. Anche se la soluzione non ha funzionato per me fuori dalla scatola, mi ha ispirato a provare qualcosa di simile.

from celery.utils import uuid 
from celery import chord 

class ProgressChord(chord): 

    def __call__(self, body=None, **kwargs): 
     _chord = self.Chord 
     body = (body or self.kwargs['body']).clone() 
     kwargs = dict(self.kwargs, body=body, **kwargs) 
     if _chord.app.conf.CELERY_ALWAYS_EAGER: 
      return self.apply((), kwargs) 
     callback_id = body.options.setdefault('task_id', uuid()) 
     r= _chord(**kwargs) 
     return _chord.AsyncResult(callback_id), r 

, invece di eseguire celery.chord io uso ProgressChord come segue:

def temptask(n): 
    header=list(tempsubtask.si(i) for i in range(n)) 
    callback=templink.si('printed at last?') 
    r = celery.Progresschord(celery.group(header))(callback) 
    return r 

valore restituito di r conteneva una tupla avente sia, asyncResult di callback e un risultato di gruppo. Così il successo sembrava qualcosa di simile:

In [3]: r 
Out[3]: 
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>, 
<GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>) 

ho ereditato e calpestato [celery.chord][1] invece di [celery.task.chords.Chord][2] perché non riuscivo a trovare la sua fonte ovunque.