2012-03-22 5 views
34

È possibile scoprire se esiste un'attività con un determinato ID attività? Quando cerco di ottenere lo stato, sarò sempre in attesa.Verificare se esiste un'attività di sedano

>>> AsyncResult('...').status 
'PENDING' 

Desidero sapere se un determinato ID di attività è un vero ID di compito celery e non una stringa casuale. Voglio risultati diversi a seconda che esista un'attività valida per un determinato ID.

Potrebbe essersi verificato un compito valido in passato con lo stesso ID ma i risultati potrebbero essere stati eliminati dal back-end.

risposta

27

sedano non scrive uno stato in cui l'attività viene inviato, ciò è in parte un'ottimizzazione (vedi http://docs.celeryproject.org/en/latest/userguide/tasks.html#state).

Se si ha realmente bisogno, è semplice aggiungere:

from celery import current_app 
# `after_task_publish` is available in celery 3.1+ 
# for older versions use the deprecated `task_sent` signal 
from celery.signals import after_task_publish 

@after_task_publish.connect 
def update_sent_state(sender=None, body=None, **kwargs): 
    # the task may not exist if sent using `send_task` which 
    # sends tasks by name, so fall back to the default result backend 
    # if that is the case. 
    task = current_app.tasks.get(sender) 
    backend = task.backend if task else current_app.backend 

    backend.store_result(body['id'], None, "SENT") 

Quindi è possibile verificare lo stato di ATTESA per rilevare che un compito non è (apparentemente) stato inviato:

>>> result.state != "PENDING" 
+2

Vale la pena ricordare che l'eliminazione della coda non rimuove il meta dell'attività (almeno quando si usa Redis come back-end). Pertanto questo metodo non può essere utilizzato in modo affidabile per determinare se l'attività esiste ancora. – sleepycal

-3

Prova

AsyncResult('blubb').state 

che può funzionare.

Dovrebbe restituire qualcosa di diverso.

+1

Voglio ottenere risultati diversi a seconda che l'ID dell'attività sia o sia stato un vero ID attività. Il problema è che ricevo sempre PENDING anche se uso un ID falso come il blubb. – dominik

+0

stato == stato? – dominik

+0

'.status' è un alias deprecato di stato' attributo' – Igor

0

È necessario chiamare .get() sull'oggetto AsyncTask creato per recuperare effettivamente il risultato dal back-end.

Vedere Celery FAQ.


Per chiarire ulteriormente la mia risposta.

Qualsiasi stringa è tecnicamente un ID valido, non esiste un modo per convalidare l'ID dell'attività. L'unico modo per scoprire se esiste un'attività consiste nel chiedere al back-end se ne è a conoscenza e per farlo è necessario utilizzare .get().

Questo introduce il problema che i blocchi .get() quando il back-end non ha alcuna informazione sull'ID di attività che hai fornito, questo è di progettazione per consentire di avviare un'attività e quindi attendere il suo completamento.

Nel caso della domanda originale, assumerò che l'OP desideri ottenere lo stato di un'attività precedentemente completata. Per fare questo si può passare un piccolissimo timeout e timeout cattura errori:

from celery.exceptions import TimeoutError 
try: 
    # fetch the result from the backend 
    # your backend must be fast enough to return 
    # results within 100ms (0.1 seconds) 
    result = AsyncResult('blubb').get(timeout=0.1) 
except TimeoutError: 
    result = None 

if result: 
    print "Result exists; state=%s" % (result.state,) 
else: 
    print "Result does not exist" 

Va da sé che questa unica opera se il back-end è la memorizzazione dei risultati, se non è non c'è modo di sapere se un ID attività è valido o no perché nulla tiene traccia di loro.


Ancora più chiarificazione.

Non è possibile eseguire ciò che si desidera fare utilizzando il backend AMQP perché it does not store results, it forwards them.

Il mio suggerimento sarebbe di passare a un back-end del database in modo che i risultati siano in un database che è possibile interrogare al di fuori dei moduli di sedano esistenti. Se nessuna attività esiste nel database dei risultati, è possibile assumere che l'ID non è valido.

+1

'.get()' bloccherà fino a quando il sistema non avrà ricevuto il risultato. In caso di ID inesistente questo bloccherà solo l'applicazione. È possibile passare un argomento 'timeout' ma non è ancora possibile determinare se l'ID attività è errato – Igor

+0

A destra, è necessario passare un valore di timeout e rilevare l'errore di timeout. Questo è l'unico modo per determinare se un ID attività è "valido" in base al tuo back-end. Qualsiasi ID è tecnicamente "valido", ma solo gli ID di cui il tuo backend sa che effettivamente restituiranno dati. –

+1

Le mie attività normalmente durano circa 30 secondi. Quindi non è un'opzione, giusto? – dominik

-3

Per favore correggimi se sbaglio.

if built_in_status_check(task_id) == 'pending' 
    if registry_exists(task_id) == true 
     print 'Pending' 
    else 
     print 'Task does not exist' 
+0

Cosa sono 'built_in_status_check' e' registry_exists'? Come implementeresti questo? – dominik

+0

Bene, sono venuto a sapere che ci sono 6 stati di attività (PENDING, STARTED, SUCCESS, FAILURE, RETRY e REVOKED). Quindi, ho pensato che potremmo avere un codice per verificare se l'attività è in "PENDING" o meno. E se è in stato 'PENDING' di quanto potremmo verificare quella particolare attività con voci di registro per esistenza. – pravin

+1

No, so che lo stato è in sospeso ma non conosco il motivo per cui è in sospeso. Sto cercando un intelligente 'registry_exists'. – dominik

7

AsyncResult.state restituisce PENDING in caso di ID attività sconosciute.

ATTESA

un'attività è in attesa per l'esecuzione o sconosciuto. Qualsiasi ID attività che non sia noto è implicito nello stato in sospeso.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

È possibile fornire ID di attività personalizzati, se avete bisogno di distinguere gli ID sconosciuti da quelli esistenti:

>>> from tasks import add 
>>> from celery.utils import uuid 
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid()) 
>>> id = r.task_id 
>>> id 
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd' 
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id" 
... 
Unknown task id 
>>> if not id.startswith("celery-task-id-"): print "Unknown task id" 
... 
+3

Il problema è che ho solo un ID. Ogni id era una volta un valido ID ma alcuni non lo sono più perché i risultati sono stati cancellati dal back-end. Quindi avrò sempre un id che inizia con 'celery-task-id-', ma un compito potrebbe ancora non essere valido. – dominik

+2

In tal caso, è necessario tenere traccia della cronologia ID esternamente. i backend di sedano non garantiscono di mantenere tutti i risultati per sempre. Ad esempio, il backend amqp può essere interrogato solo una volta. – mher

+1

@ 0x00mh: il problema è che avendo un ID attività, come posso sapere se l'attività è effettivamente PENDENTE o è stata eliminata dal back-end (forse perché ho impostato il sedano per dimenticarlo dopo un po 'di tempo)? –

2

Proprio ora sto usando seguente schema:

  1. Ottieni un ID attività.
  2. Impostato su chiave memcache come 'task_% s'% task.id messaggio 'Avviato'.
  3. Passare ID attività al client.
  4. Ora dal client posso monitorare lo stato dell'attività (impostato dai messaggi di attività a memcache).
  5. Dall'attività alla consegna - impostare il messaggio chiave memcache "Pronto".
  6. Da client su attività pronta: avvia un'attività speciale che cancellerà la chiave da memcache ed eseguirà le azioni di pulizia necessarie.
+0

Questo era il modo in cui volevo farlo ma non sembrava essere il modo pulito di farlo. – dominik