2010-01-25 7 views
16

Ho un progetto Django e sto tentando di utilizzare Celery per inviare attività per l'elaborazione in background (http://ask.github.com/celery/introduction.html). Celery si integra bene con Django e sono stato in grado di inviare i miei compiti personalizzati e ottenere risultati.Come posso impostare Celery per chiamare una funzione di inizializzazione personalizzata prima di eseguire le mie attività?

L'unico problema è che non riesco a trovare un modo corretto per eseguire l'inizializzazione personalizzata nel processo daemon. Devo chiamare una funzione costosa che carica molta memoria prima di iniziare l'elaborazione delle attività e non posso permettermi di chiamare quella funzione ogni volta.

Qualcuno ha avuto questo problema prima? Qualche idea su come aggirare il problema senza modificare il codice sorgente di Celery?

Grazie

+0

quale tipo di inizializzazione personalizzata è necessario eseguire? – diegueus9

+0

Ho bisogno di caricare una struttura dati di ~ 10 MB necessaria per l'elaborazione di ogni attività (la struttura è la stessa per tutte le attività). – xelk

risposta

15

è possibile scrivere un loader personalizzato, o utilizzare i segnali.

Pale hanno il metodo on_task_init, che è chiamato quando un'attività sta per essere eseguita, e on_worker_init che è chiamato dal celerybeat processo principale sedano +.

Usando segnali è probabilmente il più facile, i segnali disponibili sono:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Inviato quando un compito è in procinto di essere giustiziato dal lavoratore (o localmente se si utilizza apply/o se è stato impostato CELERY_ALWAYS_EAGER).

  • task_postrun(task_id, task, args, kwargs, retval) Inviato dopo che un'attività è stata eseguita nelle stesse condizioni di cui sopra.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Chiamato quando viene applicato un compito (non va bene per operazioni di lunga durata)

segnali aggiuntivi disponibili in 0.9.x (ramo attuale maestro su GitHub):

  • worker_init()

    Chiamato quando Celeryd è stato avviato (prima che l'attività sia inizializzata, quindi se su un sistema che supporta fork, qualsiasi modifica di memoria verrebbe copiata nei processi di lavoro figlio ).

  • worker_ready()

    Chiamato quando celeryd è in grado di ricevere i compiti.

  • worker_shutdown()

    Chiamato quando celeryd si sta spegnendo.

Ecco un esempio il ricalcolo qualcosa la prima volta un'attività viene eseguita nel processo:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

Se si desidera che la funzione da eseguire per tutte le attività, basta saltare l'argomento sender.