2012-06-03 8 views
14

Devo progettare un sistema di pianificazione delle attività scalabile basato su Redis.Esecuzione scalabile dell'attività ritardata con Redis

Requisiti:

  • più processi di lavoro.
  • Molte attività, ma sono possibili lunghi periodi di inattività.
  • Precisione di sincronizzazione ragionevole.
  • Scarto di risorse minime inattivo.
  • Utilizzare l'API Redis sincrona.
  • Dovrebbe funzionare per Redis 2.4 (ovvero nessuna funzionalità dal prossimo 2.6).
  • Non utilizzare altri mezzi di RPC rispetto a Redis.

Pseudo-API: schedule_task(timestamp, task_data). Il timestamp è in secondi interi.

idea di base:

  • Ascolta per le prossime attività su lista.
  • Mettere le attività ai bucket per timestamp.
  • Sospendi fino al timestamp più vicino.
  • Se una nuova attività appare con data e ora inferiore a quella più vicina, sveglia.
  • Elaborare tutte le attività imminenti con timestamp ≤ ora, in lotti (presupponendo che l'esecuzione dell'attività sia veloce).
  • Assicurarsi che il lavoratore concorrente non elabori le stesse attività. Allo stesso tempo, assicurati che nessuna attività venga persa in caso di arresto anomalo durante l'elaborazione.

Finora non riesco a capire come adattare questo primitive Redis ...

Degli indizi?

Si noti che esiste una domanda simile precedente: Delayed execution/scheduling with Redis? In questa nuova domanda introduco maggiori dettagli (soprattutto, molti lavoratori). Finora non ero in grado di capire come applicare le vecchie risposte qui - quindi, una nuova domanda.

+0

Vorrei ricordare esplicitamente che il polling di una chiave Redis in un ciclo violerebbe il requisito "spreco minimo di risorse quando inattivo". I lavoratori dovrebbero dormire quando non c'è niente da fare. Il polling –

+0

con BLPOP/BRPOP può bloccare fino a quando l'elenco è pieno ed è ciò che la maggior parte delle persone usa per farlo. Di solito blocchi per qualche secondo in un ciclo, ma in termini di tempo della CPU è trascurabile. È possibile utilizzare redis pub/sub, ma ciò è negativo perché se non c'è un lavoratore, le attività andranno perse. –

+0

@Not_a_Golfer: le cose sono un po 'più complicate di BLPOPping di una singola lista. Si noti che è necessario eseguire l'esecuzione ritardata dell'attività (ad esempio, l'utilità di pianificazione), non un semplice processore di attività. –

risposta

0

Un approccio combinato sembra plausibile:

  1. Nessun nuovo timestamp compito può essere inferiore a ora corrente (pinza se inferiore). Supponendo una sincronizzazione NTP affidabile.

  2. Tutte le attività vengono inserite in elenchi benna alle chiavi, con la data e l'ora del compito.

  3. Inoltre, tutti i timestamp delle attività vanno a un set z dedicato (chiave e punteggio - timestamp stesso).

  4. Nuove attività sono accettate dai client tramite l'elenco Redis separato.

  5. Ciclo: recupera i timestamp scaduti N più vecchi tramite il limite zrangebyscore ....

  6. BLPOP con timeout sul nuovo elenco attività ed elenchi per i timestamp recuperati.

  7. Se ha una vecchia attività, elaborarla. Se nuovo - aggiungi a bucket e zset.

  8. Controllare se le benne sono vuote. Se è così - cancella la lista ed entra da zset. Probabilmente non controllare i bucket scaduti di recente, per salvaguardare i problemi di sincronizzazione del tempo. Ciclo finale

Critica? Commenti? Alternative?

4

Non hai specificato la lingua che stai utilizzando. Hai almeno 3 alternative per farlo senza scrivere una sola riga di codice almeno in Python.

  1. Celery ha un broker redis opzionale. http://celeryproject.org/

  2. resque è una coda di operazioni redis estremamente popolare che utilizza redis. https://github.com/defunkt/resque

  3. RQ è una coda basata Redis semplici e piccoli che si propone di "prendere la roba buona da sedano e resque" e di essere molto più semplice con cui lavorare. http://python-rq.org/

Si può almeno guardare il loro disegno se non si possono usare.

Ma per rispondere alla tua domanda, quello che vuoi può essere fatto con redis. In realtà ho scritto più o meno quello in passato.

EDIT: Per quanto riguarda la modellazione quello che vuoi sul Redis, questo è quello che vorrei fare:

  1. accodamento un'attività con un timestamp sarà fatta direttamente dal cliente - si inserisce il compito in un ordinato set con il timestamp come punteggio e l'attività come valore (vedi ZADD).

  2. Un dispatcher centrale si sveglia ogni N secondi, controlla i primi timestamp su questo set e se ci sono task pronti per l'esecuzione, spinge l'attività in un elenco "da eseguire ORA". Questo può essere fatto con ZREVRANGEBYSCORE sul set ordinato "in attesa", ottenendo tutti gli elementi con data e ora < = ora, in modo da ottenere tutti gli elementi pronti contemporaneamente. spingere è fatto da RPUSH.

  3. i lavoratori utilizzano BLPOP nell'elenco "da eseguire ORA", svegliano quando c'è qualcosa su cui lavorare e fanno le loro cose. Questo è sicuro poiché redis è single threaded e nessun 2 worker avrà mai lo stesso compito.

  4. una volta terminato, gli operai riportano il risultato in una coda di risposta, che viene controllata dal dispatcher o da un altro thread. È possibile aggiungere un bucket "in sospeso" per evitare errori o qualcosa del genere.

modo che il codice sarà simile a questo (questo è solo pseudo codice):

cliente:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO> 

dispatcher:

while working: 
    tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now 
    for task in tasks: 

     #do the delete and queue as a transaction 
     MULTI 
     RPUSH "to_be_executed" task 
     ZREM "new_tasks" task 
     EXEC 

    sleep(1) 

io non aggiungere la gestione della coda di risposta, ma è più o meno come il lavoratore:

lavoratore:

while working: 
    task = BLPOP "to_be_executed" <TIMEOUT> 
    if task: 
     response = work_on_task(task) 
     RPUSH "results" response 

EDIT: stateless dispatcher atomica:

while working: 

    MULTI 
    ZREVRANGE "new_tasks" 0 1 
    ZREMRANGEBYRANK "new_tasks" 0 1 
    task = EXEC 

    #this is the only risky place - you can solve it by using Lua internall in 2.6 
    SADD "tmp" task 

    if task.timestamp <= now: 
     MULTI 
     RPUSH "to_be_executed" task 
     SREM "tmp" task 
     EXEC 
    else: 

     MULTI 
     ZADD "new_tasks" task.timestamp task 
     SREM "tmp" task 
     EXEC 

    sleep(RESOLUTION) 
+0

Sto usando Lua. :-) –

+0

interessante! vuoi usare effettivamente lua dentro redis? o semplicemente connettersi a redis con lua? il primo non funzionerà. –

+0

Grazie per i riferimenti, li controllerò. Dal momento che hai scritto qualcosa di simile sul passato, ti preoccupi di elaborare un po 'sui punti chiave del design? Non importa la lingua. –

8

Ecco un'altra soluzione che si basa su un paio di altri [1]. Usa il comando redis WATCH per rimuovere la condizione di gara senza usare lua in redis 2.6.

Lo schema di base è:

  • Utilizzare uno zset Redis per le operazioni pianificate e code Redis pronto per eseguire le operazioni.
  • Chiedi a un dispatcher di eseguire il polling su zset e spostare le attività pronte per l'esecuzione nelle code rosse. Potresti volere più di un dispatcher per ridondanza, ma probabilmente non ne hai bisogno o ne vuoi molti.
  • Hanno tanti lavoratori che desiderano bloccare i pop sulle code rosse.

non l'ho provato :-)

Il creatore di posti di lavoro foo avrebbe fatto:

def schedule_task(queue, data, delay_secs): 
    # This calculation for run_at isn't great- it won't deal well with daylight 
    # savings changes, leap seconds, and other time anomalies. Improvements 
    # welcome :-) 
    run_at = time.time() + delay_secs 

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at & 
    # the dict. 
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data}) 

schedule_task('foo_queue', foo_data, 60) 

Il dispatcher (s) sarebbe simile:

while working: 
    redis.watch(SCHEDULED_ZSET_KEY) 
    min_score = 0 
    max_score = time.time() 
    results = redis.zrangebyscore(
     SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False) 
    if results is None or len(results) == 0: 
     redis.unwatch() 
     sleep(1) 
    else: # len(results) == 1 
     redis.multi() 
     redis.rpush(results[0]['queue'], results[0]['data']) 
     redis.zrem(SCHEDULED_ZSET_KEY, results[0]) 
     redis.exec() 

Il foo worker sarebbe:

while working: 
    task_data = redis.blpop('foo_queue', POP_TIMEOUT) 
    if task_data: 
     foo(task_data) 

[1] Questa soluzione è basata su not_a_golfer's, uno su http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/ e sui documenti redis per le transazioni.

+1

Se è di interesse per qualcuno, ho creato un'implementazione Java di cui sopra ... completamente testata e funzionale https://github.com/davidmarquis/redis-scheduler –

1

Se stai cercando una soluzione pronta su Java. Redisson è giusto per te. Permette di pianificare ed eseguire attività (con supporto di espressioni cron) in modo distribuito su Redisson nodes usando l'api familiare ScheduledExecutorService e basato sulla coda Redis.

Ecco un esempio. Prima definire un'attività utilizzando l'interfaccia java.lang.Runnable. Ogni attività può accedere all'istanza Redis tramite l'oggetto iniettato RedissonClient.

public class RunnableTask implements Runnable { 

    @RInject 
    private RedissonClient redissonClient; 

    @Override 
    public void run() throws Exception { 
     RMap<String, Integer> map = redissonClient.getMap("myMap"); 
     Long result = 0; 
     for (Integer value : map.values()) { 
      result += value; 
     } 
     redissonClient.getTopic("myMapTopic").publish(result); 
    } 

} 

Ora è pronto a compila in ScheduledExecutorService:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); 
ScheduledFuture<?> future = executorService.schedule(new CallableTask(), 10, 20, TimeUnit.MINUTES); 

future.get(); 
// or cancel it 
future.cancel(true); 

Esempi con espressioni cron:

executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?")); 

executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5)); 

executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY)); 

Tutte le attività sono in esecuzione su Redisson node.