2016-02-05 29 views
8

Sono molto nuovo per Celery e questa è la domanda che ho:Celery: come limitare il numero di attività in coda e interrompere l'alimentazione quando è piena?

Supponiamo di avere uno script che deve costantemente recuperare i nuovi dati dal DB e inviarlo ai lavoratori che usano Celery.

tasks.py

# Celery Task 
from celery import Celery 

app = Celery('tasks', broker='amqp://[email protected]//') 

@app.task 
def process_data(x): 
    # Do something with x 
    pass 

fetch_db.py

# Fetch new data from DB and dispatch to workers. 
from tasks import process_data 

while True: 
    # Run DB query here to fetch new data from DB fetched_data 

    process_data.delay(fetched_data) 

    sleep(30); 

Qui è la mia preoccupazione: i dati vengono scaricati ogni 30 secondi. la funzione process_data() potrebbe richiedere molto più tempo e, a seconda della quantità di lavoratori (specialmente se troppo pochi), la coda potrebbe essere rallentata come ho capito.

  1. Non riesco ad aumentare il numero di lavoratori.
  2. Posso modificare il codice per evitare di alimentare la coda quando è piena.

La domanda è come impostare la dimensione della coda e come faccio a sapere che è piena? In generale, come affrontare questa situazione?

+0

Aggiungere più lavoratori per recuperare il ritardo con la coda – noorul

+0

@noorul: questo è stato non è la mia domanda. Non posso aggiungere più lavoratori. Posso solo astenermi dal dar da mangiare alla coda se è piena. La mia domanda è: come posso impostare le sue dimensioni e come faccio a sapere che è pieno. – jazzblue

+1

Non sono sicuro di come possiamo definire la pienezza di una coda di Celery. Può essere che puoi mettere un'altra coda davanti alla coda di Celery e controllarla. – noorul

risposta

6

È possibile impostare rabbitmq x-max-length in coda predichiarare utilizza kombu

esempio:

import time 
from celery import Celery 
from kombu import Queue, Exchange 

class Config(object): 
    BROKER_URL = "amqp://[email protected]//" 

    CELERY_QUEUES = (
     Queue(
      'important', 
      exchange=Exchange('important'), 
      routing_key="important", 
      queue_arguments={'x-max-length': 10} 
     ), 
    ) 

app = Celery('tasks') 
app.config_from_object(Config) 


@app.task(queue='important') 
def process_data(x): 
    pass 

o utilizzando Policies

rabbitmqctl set_policy Ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues 
+1

Cosa succede con la chiamata .delay() in questo caso? L'attività – RemcoGerlich

+0

verrà aggiunta alla coda, ma se il limite della coda ha raggiunto 'I messaggi verranno eliminati o senza lettere dalla parte anteriore della coda per lasciare spazio ai nuovi messaggi una volta raggiunto il limite. [Coniglio] (https://www.rabbitmq.com/maxlength.html). Per problema in [github] (https://github.com/rabbitmq/rabbitmq-server/issues/499) Blocca i publisher quando viene raggiunto il limite di lunghezza della coda non è possibile al momento –