2015-01-26 19 views
6

Esiste un metodo standard per il debouncing delle attività di Celery?Attività di rimbalzo di Celery?

Per esempio, in modo che un compito può essere "ha iniziato" più volte, ma verrà eseguito solo una volta dopo un certo ritardo:

def debounce_task(task): 
    if task_is_queued(task): 
     return 
    task.apply_async(countdown=30) 
+0

È possibile utilizzare una cache. Molti negozi di valori-chiave hanno registrazioni temporizzate, provano a recuperare il risultato dall'archivio, se non ci sono risultati, quindi eseguono l'attività e memorizzano il risultato con una scadenza prima di tornare. Usa solo un lavoratore in modo che le attività vengano eseguite in sequenza. Evitare schemi di blocco a meno che non si desideri gestire le serrature obsolete. –

+0

Oh, assolutamente. Ma preferirei evitare i pezzetti di implementare il debouncing me stesso (controllando gli argomenti, tenendo traccia dei risultati, ecc.) E mi chiedo se c'è un modo standard per farlo. –

+0

È semplice scrivere un decoratore cache in Python (può essere 4 righe), vorrei avere il tempo di pubblicare una risposta completa. –

risposta

4

Ecco come lo facciamo con i contatori Redis. Tutto questo può probabilmente essere generalizzato in un decoratore ma lo usiamo solo per un compito specifico (webhook)

Il tuo compito pubblico è ciò che chiami da altre funzioni. Avrà bisogno di incrementare una chiave in Redis. La chiave è formata dagli argomenti della vostra funzione, qualunque esse siano (questo assicura il contatore è unico tra le singole attività)

@task 
def your_public_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    get_redis().incr(cache_key) 
    _your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY) 

Nota le funzioni chiave di cache sono condivise (si desidera che la stessa chiave di cache in ogni funzione) e l'impostazione countdown.

Quindi, il compito effettivo esecuzione del codice esegue le seguenti operazioni:

@task 
def _your_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    counter = get_redis().getset(cache_key, 0) 
    # redis makes the zero a string. 
    if counter == '0': 
     return 

    ... execute your actual task code. 

Ciò consente di colpire your_public_task.delay(..) tutte le volte che si vuole, all'interno del vostro QUEUE_DELAY, e sarà fuoco solo fuori una volta.

1

Ecco come puoi farlo con Mongo.

NOTA: ho dovuto rendere il disegno un po 'più indulgente, poiché le attività di Celery non sono garantite per eseguire il momento esatto di eta è soddisfatta o countdown si esaurisce.

Inoltre, gli indici di scadenza di Mongo vengono solo ripuliti ogni minuto o così; Pertanto non è possibile basare il disegno sui record eliminati nel momento in cui lo eta è attivo.

In ogni caso, il flusso è qualcosa di simile:

  1. codice client chiama my_task.
  2. preflight incrementa un contatore chiamata, e restituisce come flight_id
  3. _my_task è impostato da eseguire dopo TTL secondi.
  4. Quando _my_task viene eseguito, controlla se è flight_id è ancora in corso. Se non lo è, si interrompe.
  5. ... qualche tempo dopo ... mongo pulisce voci obsolete della collezione, via an expiring index.

@celery.task(track_started=False, ignore_result=True) 
def my_task(my_arg): 
    flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL) 
    _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL) 

@celery.task(track_started=False, ignore_result=True) 
def _my_task(my_arg, flight_id=None): 
    if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id): 
     return 
    # ... actual work ... # 

Code Library:

TTL = 5 * 60  # Run tasks after 5 minutes 
EXPIRY = 6 * TTL # This needs to be much larger than TTL. 

# We need to store a list of task-executions currently pending 
inflight_collection = db['celery_In_Flight'] 
inflight_collection.create_index([('fn', pymongo.ASCENDING,), 
            ('key', pymongo.ASCENDING,)]) 
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY) 


def preflight(collection, fn, key, ttl): 
    eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl) 
    result = collection.find_one_and_update({ 
     'fn': fn, 
     'key': key, 
    }, { 
     '$set': { 
      'eta': eta 
     }, 
     '$inc': { 
      'flightId': 1 
     } 
    }, upsert=True, return_document=pymongo.ReturnDocument.AFTER) 
    print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId']) 
    return result['flightId'] 


def check_for_takeoff(collection, fn, key, flight_id): 
    result = collection.find_one({ 
     'fn': fn, 
     'key': key 
    }) 
    ready = result is None or result['flightId'] == flight_id 
    print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready) 
    return ready 
0

Bartek ha l'idea , usa contatori rossi che sono atomici (e dovrebbero essere facilmente disponibili se il tuo broker è redis). Anche se la sua soluzione è la limitazione, non il debouncing. La differenza è comunque piccola (getset vs decr).

coda il compito:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

Poi nel compito:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is still queued 
    return 
# continue on to rest of task 

E 'difficile rendere un decoratore in quanto è necessario per decorare il compito e chiamando il compito stesso. Quindi avrai bisogno di un decoratore prima del sedatore @task decorator e uno dopo di esso.

Per ora mi sono appena creato alcune funzioni che mi aiutano a chiamare l'attività e quella che controlla l'inizio dell'attività.

+0

http://stackoverflow.com/a/43625455/4391298 è una soluzione alla quale sono finalmente arrivato, compresa una certa scadenza chiave per gestirlo in fase di fusione (non è un problema nella soluzione originale). –

0

Ecco la soluzione mi è venuta: https://gist.github.com/wolever/3cf2305613052f3810a271e09d42e35c

e copiato qui, per i posteri:

import time 

import redis 


def get_redis_connection(): 
    return redis.connect() 

class TaskDebouncer(object): 
    """ A simple Celery task debouncer. 

     Usage:: 

      def debounce_process_corpus(corpus): 
       # Only one task with ``key`` will be allowed to execute at a 
       # time. For example, if the task was resizing an image, the key 
       # might be the image's URL. 
       key = "process_corpus:%s" %(corpus.id,) 
       TaskDebouncer.delay(
        key, my_taks, args=[corpus.id], countdown=0, 
       ) 

      @task(bind=True) 
      def process_corpus(self, corpus_id, debounce_key=None): 
       debounce = TaskDebouncer(debounce_key, keepalive=30) 

       corpus = Corpus.load(corpus_id) 

       try: 
        for item in corpus: 
         item.process() 

         # If ``debounce.keepalive()`` isn't called every 
         # ``keepalive`` interval (the ``keepalive=30`` in the 
         # call to ``TaskDebouncer(...)``) the task will be 
         # considered dead and another one will be allowed to 
         # start. 
         debounce.keepalive() 

       finally: 
        # ``finalize()`` will mark the task as complete and allow 
        # subsequent tasks to execute. If it returns true, there 
        # was another attempt to start a task with the same key 
        # while this task was running. Depending on your business 
        # logic, this might indicate that the task should be 
        # retried. 
        needs_retry = debounce.finalize() 

       if needs_retry: 
        raise self.retry(max_retries=None) 

    """ 

    def __init__(self, key, keepalive=60): 
     if key: 
      self.key = key.partition("!")[0] 
      self.run_key = key 
     else: 
      self.key = None 
      self.run_key = None 
     self._keepalive = keepalive 
     self.cxn = get_redis_connection() 
     self.init() 
     self.keepalive() 

    @classmethod 
    def delay(cls, key, task, args=None, kwargs=None, countdown=30): 
     cxn = get_redis_connection() 
     now = int(time.time()) 
     first = cxn.set(key, now, nx=True, ex=countdown + 10) 
     if not first: 
      now = cxn.get(key) 

     run_key = "%s!%s" %(key, now) 
     if first: 
      kwargs = dict(kwargs or {}) 
      kwargs["debounce_key"] = run_key 
      task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

     return (first, run_key) 

    def init(self): 
     self.initial = self.key and self.cxn.get(self.key) 

    def keepalive(self, expire=None): 
     if self.key is None: 
      return 
     expire = expire if expire is not None else self._keepalive 
     self.cxn.expire(self.key, expire) 

    def is_out_of_date(self): 
     if self.key is None: 
      return False 
     return self.cxn.get(self.key) != self.initial 

    def finalize(self): 
     if self.key is None: 
      return False 
     with self.cxn.pipeline() as pipe: 
      while True: 
       try: 
        pipe.watch(self.key) 
        if pipe.get(self.key) != self.initial: 
         return True 
        pipe.multi() 
        pipe.delete(self.key) 
        pipe.execute() 
        break 
       except redis.WatchError: 
        continue 
     return False 
0

Ecco una soluzione più compilato in base al largo https://stackoverflow.com/a/28157498/4391298 ma trasformato in un decoratore e raggiungendo nel Kombu pool di connessioni per riutilizzare il contatore Redis.

import logging 
from functools import wraps 

# Not strictly required 
from django.core.exceptions import ImproperlyConfigured 
from django.core.cache.utils import make_template_fragment_key 

from celery.utils import gen_task_name 


LOGGER = logging.getLogger(__name__) 


def debounced_task(**options): 
    """Debounced task decorator.""" 

    try: 
     countdown = options.pop('countdown') 
    except KeyError: 
     raise ImproperlyConfigured("Debounced tasks require a countdown") 

    def factory(func): 
     """Decorator factory.""" 
     try: 
      name = options.pop('name') 
     except KeyError: 
      name = gen_task_name(app, func.__name__, func.__module__) 

     @wraps(func) 
     def inner(*args, **kwargs): 
      """Decorated function.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       depth = channel.client.decr(key) 

       if depth <= 0: 
        try: 
         func(*args, **kwargs) 
        except: 
         # The task failed (or is going to retry), set the 
         # count back to where it was 
         channel.client.set(key, depth) 
         raise 
       else: 
        LOGGER.debug("%s calls pending to %s", 
           depth, name) 

     task = app._task_from_fun(inner, **options, name=name + '__debounced') 

     @wraps(func) 
     def debouncer(*args, **kwargs): 
      """ 
      Debouncer that calls the real task. 
      This is the task we are scheduling.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       # Mark this key to expire after the countdown, in case our 
       # task never runs or runs too many times, we want to clean 
       # up our Redis to eventually resolve the issue. 
       channel.client.expire(key, countdown + 10) 
       depth = channel.client.incr(key) 

      LOGGER.debug("Requesting %s in %i seconds (depth=%s)", 
         name, countdown, depth) 
      task.si(*args, **kwargs).apply_async(countdown=countdown) 

     return app._task_from_fun(debouncer, **options, name=name) 

    return factory