2015-01-05 11 views
6

Abbiamo uno script node.js che esegue un server socket.io i cui client consumano messaggi da una coda RabbitMQ. Di recente abbiamo migrato ad Amazon AWS e RabbitMQ ora è un cluster di due macchine (istanze ridondanti). La connessione AMQP viene persa di volta in volta (è una limitazione che arriva da un ambiente ad alta disponibilità con VM ridondanti e dobbiamo affrontarla) e se viene effettuato un tentativo di riconnessione, il DNS sceglie a quale istanza connettersi (è un cluster con la replica dei dati, quindi non importa a quale istanza connettersi).amqp.node non rileverà una caduta di connessione

Il problema è che il tentativo di riconnettersi non viene mai eseguito; dopo un po ', quando la connessione viene persa, amqp.node apparentemente non riesce a notare che la connessione è stata persa. Inoltre, gli utenti smettono di ricevere messaggi e il server socket.io smette semplicemente di accettare nuove connessioni.

Abbiamo un timeout di heartbeat di 55 secondi (da non confondere con il timeout heartbeat socket.io) impostato sull'URL RabbitMQ e stiamo verificando gli eventi 'error' e 'close' con l'API callback di amqp.node ma sono apparentemente mai rilasciato. Le code si aspettano che i messaggi consumati vengano acchiappati. Vogliamo che lo script del nodo rilevi una connessione persa e finisca da solo, quindi l'ambiente avvierà automaticamente un nuovo processo e ristabilirà una connessione.

Ecco il codice, forse stiamo facendo qualcosa di sbagliato con l'API di callback amqp.node o qualcos'altro.

var express = require('express'); 
app = express(); 
var http = require('http'); 
var serverio = http.createServer(app); 
var io = require('socket.io').listen(serverio, { log: false }); 
var socket; 
var allcli = []; 
var red, blue, green, magenta, reset; 
red = '\033[31m'; 
blue = '\033[34m'; 
green = '\033[32m'; 
magenta = '\033[35m'; 
orange = '\033[43m'; 
reset = '\033[0m'; 

var queue = 'ha.atualizacao_mobile'; 
var urlRabbit = 'amqp://login:[email protected]?heartbeat=55' // Amazon 
var amqp = require('amqplib/callback_api'); 
var debug = true; 

console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds."); 
io.set('heartbeat interval', 10 * 60); 
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients."); 

console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds."); 
io.set('heartbeat timeout', 11 * 60); 
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds."); 


io.sockets.on('connection', function(socket){ 

    socket.on('error', function (exc) { 
     console.log(orange+"Ignoring exception: " + exc + reset); 
    }); 

    socket.on('send-indice', function (data) { 
     // Some business logic 
    }); 

    socket.on('disconnect', function() { 
     // Some business logic 
    }); 

}); 

function updatecli(data){ 
    // Some business logic 
} 

amqp.connect(urlRabbit, null, function(err, conn) { 
    if (err !== null) { 
     return console.log("Error creating connection: " + err); 
    } 

    conn.on('error', function(err) { 
     console.log("Generated event 'error': " + err); 
    }); 

    conn.on('close', function() { 
     console.log("Connection closed."); 
     process.exit(); 
    }); 

    processRabbitConnection(conn, function() { 
     conn.close(); 
    }); 
}); 

function processRabbitConnection(conn, finalize) { 
    conn.createChannel(function(err, channel) { 

     if (err != null) { 
      console.log("Error creating channel: " + err); 
      return finalize(); 
     } 

     channel.assertQueue(queue, null, function(err, ok) { 
      if (err !== null) { 
        console.log("Error asserting queue " + queue + ": " + err); 
        return finalize(); 
      } 

      channel.consume(queue, function (msg) { 
       if (msg !== null) { 
        try { 
         var dataObj = JSON.parse(msg.content); 
         if (debug == true) { 
          //console.log(dataObj); 
         } 
         updatecli(dataObj); 
        } catch(err) { 
         console.log("Error in JSON: " + err); 
        } 
        channel.ack(msg); 
       } 
      }, null, function(err, ok) { 
       if (err !== null) { 
        console.log("Error consuming message: " + err); 
        return finalize(); 
       } 
      }); 
     }); 
    }); 
} 

serverio.listen(9128, function() { 
    console.log('Server: Socket IO Online - Port: 9128 - ' + new Date()); 
}); 

risposta

7

A quanto pare il problema è stato risolto. Il battito cardiaco vicino a 60 secondi è stato il problema. È in conflitto con il bilanciamento del carico RabbitMQ che verifica ogni 1 minuto circa se i dati sono passati attraverso la connessione o meno (se non sono passati dati, interrompe la connessione). La connessione AMQP smette di ricevere messaggi e la libreria apparentemente non reagisce. Per evitare questa situazione è necessario un battito cardiaco inferiore (ad esempio 30 secondi).