2015-05-04 4 views
6

Forse la questione di fondo è come il modulo node-kafka sto usando ha implementato le cose, ma forse non, così qui andiamo ...eventi Node.js EventEmitter non condividono ciclo di eventi

Utilizzando la libreria nodo kafa, Sto affrontando un problema con la sottoscrizione agli eventi consumer.on('message'). La libreria utilizza il modulo standard events, quindi penso che questa domanda potrebbe essere abbastanza generica.

La mia struttura del codice reale è grande e complicata, quindi ecco uno pseudo-esempio del layout di base per evidenziare il mio problema. (Nota: Questo frammento di codice è testato quindi potrei avere errori qui, ma la sintassi non è in discussione qui comunque)

var messageCount = 0; 
var queryCount = 0; 

// Getting messages via some event Emitter 
consumer.on('message', function(message) { 
    message++; 
    console.log('Message #' + message); 

    // Making a database call for each message 
    mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) { 
     queryCount++; 
     console.log('Query #' + queryCount); 
    }); 
}) 

quello che sto vedendo qui è quando comincio il mio server, ci sono 100.000 o giù di lì backlog messaggi che kafka vorrà darmi e lo fa attraverso l'emettitore di eventi. Quindi inizio a ricevere messaggi. Per ottenere e registrare tutti i messaggi occorrono circa 15 secondi.

questo è quello che ci si aspetterebbe di vedere di un'uscita assumendo la query MySQL è ragionevolmente veloce:

Message #1 
Message #2 
Message #3 
... 
Message #500 
Query #1 
Message #501 
Message #502 
Query #2 
... and so on in some intermingled fashion 

Mi aspetterei questo perché il mio primo risultato mysql dovrebbe essere pronto molto rapidamente e mi aspetterei il risultato (s) a svolgere il proprio turno nel ciclo degli eventi per elaborare la risposta. Quello che sto in realtà sempre è:

Message #1 
Message #2 
... 
Message #100000 
Query #1 
Query #2 
... 
Query #100000 

sto ottenendo ogni singolo messaggio prima di una risposta mysql è in grado di elaborare. Quindi la mia domanda è, perché? Perché non riesco a ottenere un singolo risultato del database finché tutti gli eventi del messaggio non sono completi?

Un'altra nota: ho impostato un punto di interruzione a .emit('message') in node-kafka e a mysql.query() nel mio codice e li sto colpendo a turno. Quindi sembra che tutti i 100.000 emetti non si accumulino in primo piano prima di entrare nel mio sottoscrittore dell'evento. Quindi è andata la mia prima ipotesi sul problema.

idee e conoscenze sarebbe molto apprezzato :)

+0

Cosa succede se si aumenta il numero di messaggi memorizzati su un numero molto più grande?È possibile che il tuo mysql sia semplicemente così lento? – Avery

+0

@ Mi ero meravigliato di questo, ma quando eseguo la replica con un solo messaggio da elaborare, non riesco nemmeno a percepire il ritardo della risposta mysql. Anche questo è in esecuzione localmente. E la query mysql attuale è estremamente semplice (solo un SELECT per ~ 8 campi da una singola riga della tabella e quella tabella ha solo circa 60 righe al momento) –

+0

Se questo esempio è effettivamente rappresentativo del tuo codice, allora sono perso anche io . Puoi davvero produrre questo risultato con questo esempio? Non ho un'istanza MySQL disponibile per testare con. – Avery

risposta

2

Il driver node-kafka utilizza una dimensione di buffer piuttosto liberale (1M), il che significa che otterrà come molti messaggi da Kafka che si inserisce nel buffer. Se il server è backloggato e, a seconda delle dimensioni del messaggio, questo può significare (decine di) migliaia di messaggi in arrivo con una richiesta.

Poiché EventEmitter è sincrono (non utilizza il loop di eventi nodo), ciò significa che il driver emetterà (decine di) migliaia di eventi per i suoi ascoltatori e poiché è sincrono, non cederà al Ciclo di eventi nodo fino a quando tutti i messaggi sono stati consegnati.

Non penso che si possa aggirare il flusso delle consegne di eventi, ma non penso che specificamente la consegna degli eventi sia problematica. Il problema più probabile è l'avvio di un'operazione asincrona (in questo caso una query MySQL) per ogni evento, che potrebbe inondare il database di query.

Una possibile soluzione sarebbe quella di utilizzare una coda invece di eseguire le query direttamente dai gestori di eventi. Ad esempio, con async.queue è possibile limitare il numero di attività simultanee (asincrone). La parte "worker" della coda eseguiva la query MySQL e nei gestori di eventi si limitava a spingere il messaggio in coda.

+0

Grazie @robertklep. Darò una prova a async.queue. Sto passando la mia coda in modo che ci sia solo una query mysql e mem-caching i risultati per le richieste in attesa da utilizzare, ma sospetto che un modulo ben mantenuto/testato sarà migliore :) –