2014-06-24 33 views
5

Come posso estrarre il risultato di un'attività se non conosco in precedenza quale attività è stata eseguita? Ecco il programma di installazione: Dato il seguente sorgente ('tasks.py'):Recupera il risultato da 'task_id' in Celery da un'attività sconosciuta

from celery import Celery 

app = Celery('tasks', backend="db+mysql://u:[email protected]/db", broker = 'amqp://guest:[email protected]:5672//') 

@app.task 
def add(x,y): 
    return x + y 


@app.task 
def mul(x,y): 
    return x * y 

con RabbitMQ 3.3.2 in esecuzione a livello locale:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server 

       RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc. 
    ## ##  Licensed under the MPL. See http://www.rabbitmq.com/ 
    ## ## 
    ########## Logs: /usr/local/var/log/rabbitmq/[email protected] 
    ###### ##  /usr/local/var/log/rabbitmq/[email protected] 
    ########## 
       Starting broker... completed with 10 plugins. 

con sedano 3.1.12 in esecuzione a livello locale:

-------------- [email protected] v3.1.12 (Cipater) 
---- **** ----- 
--- * *** * -- Darwin-13.2.0-x86_64-i386-64bit 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   tasks:0x105dea3d0 
- ** ---------- .> transport: amqp://guest:**@localhost:5672// 
- ** ---------- .> results:  disabled 
- *** --- * --- .> concurrency: 8 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 

posso quindi importare il metodo e recuperare il risultato con il 'task_id':

from tasks import add, mul 
from celery.result import AsyncResult 

result = add.delay(2,2) 
task_id = result.task_id 
result.get() # 4 

result = AsyncResult(id=task_id) 
result.get() # 4 

result = add.AsyncResult(id=task_id) 
result.get() # 4 

# and the same for the 'mul' task. Just imagine I put it here 

Nell'esempio seguente ho suddiviso questi passaggi tra i processi. In un processo che ho recuperare il 'task_id' in questo modo:

from tasks import add 

result = add.delay(5,5) 
task_id = result.task_id 

E in un altro processo se uso lo stesso 'task_id' (copiato e incollato ad un altro REPL, o in una richiesta HTTP diverso) in questo modo:

from celery.result import AsyncResult 

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:[email protected]/db") 
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta' 
result.state # AttributeError: 'str' object has no attribute 'get_task_meta' 
result.status # AttributeError: 'str' object has no attribute 'get_task_meta' 

E in un altro processo, se faccio:

from task import add # in this instance I know that an add task was performed 

result = add.AsyncResult(id="copied_task_id") 
result.status # "SUCCESSFUL" 
result.state # "SUCCESSFUL" 
result.get() # 10 

mi piacerebbe essere in grado di ottenere il risultato senza sapere prima mano ciò che compito sta generando il risultato. Nel mio ambiente reale ho intenzione di restituire questo task_id al client e lasciare che interrogino lo stato del loro lavoro tramite una richiesta HTTP.

risposta

11

Ok - così mi è stato alla ricerca di una soluzione per un lungo periodo di tempo, e ora che ho finalmente formalmente postato questo e guardato oltre il documentation ho trovato this gem:

classe celery.result .AsyncResult (id, backend = None, task_name = Nessuno, app = Nessuno, padre = nessuno)

Stato attività query.

Parametri:

id - vedi id.

backend - vedere backend.

eccezioneTimeoutError

errore sollevato per i timeout.

AsyncResult.App = Nessuno

Così, invece di fornire il parametro di backend ho fornito l'argomento "app", invece in questo modo:

from celery.result import AsyncResult 
from task import app 

# Assuming add.delay(10,10) was called in another process 
# and that I'm using a 'task_id' I retrieved from that process 

result = AsyncResult(id='copied_task_id', app=app) 
result.state # 'SUCCESSFUL' 
result.get() # 20 

Questo è probabilmente evidente a molti. Non era per me. Per ora tutto quello che posso dire è che questa soluzione "funziona", ma mi sentirei più a mio agio se sapessi che era il modo sanzionato di farlo. Se si conosce una sezione della documentazione che rende questo più chiaro, si prega di postarlo nei commenti o come risposta e selezionerò la risposta se possibile.

+0

Esattamente quello che stavo cercando; Condivido il tuo punto di vista sul fatto che questo è molto poco chiaro dalla documentazione, quindi il tuo post mi ha aiutato molto. – markjan

+0

È possibile che si desideri impostare un piccolo timeout per questa chiamata, poiché alcune chiamate 'get' di Celery potrebbero non tornare per un tempo molto lungo in caso di ID di attività non valido o un'attività che non è più nota al broker. Vedi http://stackoverflow.com/a/10074280/992887 – RichVel

+0

Grazie per questa correzione. Mi ha fatto tirare i capelli per 2 settimane. – Pant