2016-03-21 31 views
7

Sto utilizzando un produttore RabbitMQ per inviare attività a esecuzione prolungata (30 minuti +) a un utente. Il problema è che il consumatore sta ancora lavorando su un'attività quando la connessione al server è chiusa e l'attività non riconosciuta viene riaccesa.RabbitMQ chiude la connessione durante l'elaborazione di attività in esecuzione prolungata e le impostazioni di timeout producono errori

Dalla ricerca capisco che sia possibile utilizzare uno heartbeat o uno increased connection timeout per risolvere questo problema. Entrambe queste soluzioni sollevano errori durante il tentativo. Leggendo le risposte a post simili, ho anche appreso che molte modifiche sono state implementate in RabbitMQ da quando sono state pubblicate le risposte (ad esempio, il timeout predefinito di heartbeat è passato da 60 a 580 prima di RabbitMQ 3.5.5).

Quando si specifica un battito cardiaco e bloccato timeout di connessione: viene visualizzata

credentials = pika.PlainCredentials('user', 'password') 
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000) 
connection = pika.BlockingConnection(parameters) 

channel = connection.channel() 

il seguente errore:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout' 

Quando si specifica heartbeat_interval=1000 nel collegamento parametri viene visualizzato un errore simile: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

Analogamente per socket_timeout = 1000 viene visualizzato il seguente errore: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

Sto eseguendo RabbitMQ 3.6.1, pika 0.10.0 e python 2.7 su Ubuntu 14.04.

  1. Perché gli approcci di cui sopra producono errori?
  2. È possibile utilizzare un approccio heartbeat in presenza di un'attività continua a esecuzione prolungata? Ad esempio, è possibile utilizzare heartbeat durante l'esecuzione di join di database di grandi dimensioni che richiedono più di 30 minuti? Sono favorevole all'approccio heartbeat poiché molte volte è difficile valutare la durata di un'attività come l'aggiunta al database.

ho letto attraverso le risposte alle domande simili

Aggiornamento: esecuzione code from the pika documentation produce lo stesso errore.

+0

C'è qualche tipo di bilanciamento del carico seduto davanti al server di coniglio mq? L'aspetto del tuo ambiente potrebbe essere rilevante per rispondere a questa domanda. – mschuett

+0

Le macchine di produzione e di consumo si trovano tutte sulla stessa rete privata. – Greg

+1

Il problema è che è necessario elaborare i dati durante l'attesa, anche se non si stanno consumando messaggi; connection.process_data_events(). Altrimenti il ​​pika non risponderà ai battiti del cuore. – eandersson

risposta

5

Ho incontrato lo stesso problema con i miei sistemi, che si sta vedendo, con connessione interrotta durante attività molto lunghe.

È possibile che l'heartbeat possa aiutare a mantenere in vita la connessione, se la configurazione di rete è tale che le connessioni TCP/IP inattive vengono forzatamente interrotte. Se questo non è il caso, però, cambiare il battito cardiaco non aiuterà.

La modifica del timeout della connessione non aiuta affatto. Questa impostazione viene utilizzata solo quando si crea inizialmente la connessione.

Sto utilizzando un produttore RabbitMQ per inviare attività a esecuzione prolungata (30 minuti +) a un utente. Il problema è che il consumatore sta ancora lavorando su un'attività quando la connessione al server è chiusa e l'attività non riconosciuta viene riaccesa.

ci sono due ragioni per questo, entrambi i quali si è incorrere in già:

  1. Connessioni cadono in modo casuale, anche nelle migliori circostanze
  2. Ri-avviare un processo a causa di una ri un messaggio inceppato può causare problemi

Dopo aver distribuito il codice RabbitMQ con attività che vanno da meno di un secondo, a diverse ore nel tempo, ho rilevato che il messaggio veniva immediatamente riconosciuto e aggiornato. il sistema con i messaggi di stato funziona meglio per compiti molto lunghi, come questo.

Sarà necessario disporre di un sistema di registrazione (probabilmente con un database) che tenga traccia dello stato di un dato lavoro.

Quando il consumatore riceve un messaggio e avvia il processo, deve riconoscere immediatamente il messaggio e inviare un messaggio di stato "avviato" al sistema di registrazione.

Al termine del processo, inviare un altro messaggio per dire che è stato fatto.

Questo non risolverà il problema di connessione interrotta, ma nulla lo risolverà al 100% comunque. Invece, impedirà che il problema di ri-accodamento del messaggio si verifichi quando viene interrotta una connessione.

Questa soluzione introduce un altro problema, tuttavia: quando il processo in esecuzione prolungata si arresta, come si riprende il lavoro?

La risposta di base è utilizzare il sistema di stato record (il database) per il lavoro per dirti che è necessario riprendere quel lavoro di nuovo. All'avvio dell'app, controlla il database per vedere se c'è lavoro incompleto. Se c'è, riprendi o riavvia quel lavoro in qualsiasi modo è appropriato.