è possibile scrivere un loader personalizzato, o utilizzare i segnali.
Pale hanno il metodo on_task_init
, che è chiamato quando un'attività sta per essere eseguita, e on_worker_init
che è chiamato dal celerybeat processo principale sedano +.
Usando segnali è probabilmente il più facile, i segnali disponibili sono:
0.8.x:
task_prerun(task_id, task, args, kwargs)
Inviato quando un compito è in procinto di essere giustiziato dal lavoratore (o localmente se si utilizza apply
/o se è stato impostato CELERY_ALWAYS_EAGER
).
task_postrun(task_id, task, args, kwargs, retval)
Inviato dopo che un'attività è stata eseguita nelle stesse condizioni di cui sopra.
task_sent(task_id, task, args, kwargs, eta, taskset)
Chiamato quando viene applicato un compito (non va bene per operazioni di lunga durata)
segnali aggiuntivi disponibili in 0.9.x (ramo attuale maestro su GitHub):
worker_init()
Chiamato quando Celeryd è stato avviato (prima che l'attività sia inizializzata, quindi se su un sistema che supporta fork
, qualsiasi modifica di memoria verrebbe copiata nei processi di lavoro figlio ).
worker_ready()
Chiamato quando celeryd è in grado di ricevere i compiti.
worker_shutdown()
Chiamato quando celeryd si sta spegnendo.
Ecco un esempio il ricalcolo qualcosa la prima volta un'attività viene eseguita nel processo:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Se si desidera che la funzione da eseguire per tutte le attività, basta saltare l'argomento sender
.
quale tipo di inizializzazione personalizzata è necessario eseguire? – diegueus9
Ho bisogno di caricare una struttura dati di ~ 10 MB necessaria per l'elaborazione di ogni attività (la struttura è la stessa per tutte le attività). – xelk