Ecco come puoi farlo con Mongo.
NOTA: ho dovuto rendere il disegno un po 'più indulgente, poiché le attività di Celery non sono garantite per eseguire il momento esatto di eta
è soddisfatta o countdown
si esaurisce.
Inoltre, gli indici di scadenza di Mongo vengono solo ripuliti ogni minuto o così; Pertanto non è possibile basare il disegno sui record eliminati nel momento in cui lo eta
è attivo.
In ogni caso, il flusso è qualcosa di simile:
- codice client chiama
my_task
.
preflight
incrementa un contatore chiamata, e restituisce come flight_id
_my_task
è impostato da eseguire dopo TTL
secondi.
- Quando
_my_task
viene eseguito, controlla se è flight_id
è ancora in corso. Se non lo è, si interrompe.
- ... qualche tempo dopo ... mongo pulisce voci obsolete della collezione, via an expiring index.
@celery.task(track_started=False, ignore_result=True)
def my_task(my_arg):
flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL)
_my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL)
@celery.task(track_started=False, ignore_result=True)
def _my_task(my_arg, flight_id=None):
if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id):
return
# ... actual work ... #
Code Library:
TTL = 5 * 60 # Run tasks after 5 minutes
EXPIRY = 6 * TTL # This needs to be much larger than TTL.
# We need to store a list of task-executions currently pending
inflight_collection = db['celery_In_Flight']
inflight_collection.create_index([('fn', pymongo.ASCENDING,),
('key', pymongo.ASCENDING,)])
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY)
def preflight(collection, fn, key, ttl):
eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
result = collection.find_one_and_update({
'fn': fn,
'key': key,
}, {
'$set': {
'eta': eta
},
'$inc': {
'flightId': 1
}
}, upsert=True, return_document=pymongo.ReturnDocument.AFTER)
print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId'])
return result['flightId']
def check_for_takeoff(collection, fn, key, flight_id):
result = collection.find_one({
'fn': fn,
'key': key
})
ready = result is None or result['flightId'] == flight_id
print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready)
return ready
È possibile utilizzare una cache. Molti negozi di valori-chiave hanno registrazioni temporizzate, provano a recuperare il risultato dall'archivio, se non ci sono risultati, quindi eseguono l'attività e memorizzano il risultato con una scadenza prima di tornare. Usa solo un lavoratore in modo che le attività vengano eseguite in sequenza. Evitare schemi di blocco a meno che non si desideri gestire le serrature obsolete. –
Oh, assolutamente. Ma preferirei evitare i pezzetti di implementare il debouncing me stesso (controllando gli argomenti, tenendo traccia dei risultati, ecc.) E mi chiedo se c'è un modo standard per farlo. –
È semplice scrivere un decoratore cache in Python (può essere 4 righe), vorrei avere il tempo di pubblicare una risposta completa. –