2011-12-16 4 views
19

Ho lavorare (magazzino) script da nodeNodeJS | Cluster: come inviare dati dal master a tutti o a un bambino/lavoratore?

var cluster = require('cluster'); 
var http = require('http'); 
var numReqs = 0; 

if (cluster.isMaster) { 
    // Fork workers. 
    for (var i = 0; i < 2; i++) { 
    var worker = cluster.fork(); 

    worker.on('message', function(msg) { 
     if (msg.cmd && msg.cmd == 'notifyRequest') { 
     numReqs++; 
     } 
    }); 
    } 

    setInterval(function() { 
    console.log("numReqs =", numReqs); 
    }, 1000); 
} else { 
    // Worker processes have a http server. 
    http.Server(function(req, res) { 
    res.writeHead(200); 
    res.end("hello world\n"); 
    // Send message to master process 
    process.send({ cmd: 'notifyRequest' }); 
    }).listen(8000); 
} 

Nello script precedente posso inviare i dati dal lavoratore di processo principale con facilità. Ma come inviare dati dal master al lavoratore/lavoratore? Con esempi, se possibile.

risposta

1

Si dovrebbe essere in grado di inviare un messaggio dal master al lavoratore in questo modo:

worker.send({message:'hello'}) 

perché "cluster.fork è implementato in cima child_process.fork" (cluster.fork è implementato in cima di child_process.fork)

+0

Sì, funziona, grazie! In altre parole: mentre forking i lavoratori dovrei archiviarli in un array. E itera questa matrice per inviare dati a ogni bambino. È l'altro modo per inviare dati a tutti i lavoratori senza memorizzazione e iterazione. – htonus

+0

Se non si desidera archiviare i lavoratori in un array e iterare attraverso di essi per inviare messaggi, è possibile utilizzare un socket dominio unix per comunicare i messaggi dal master ai lavoratori. – alessioalex

+0

Suppongo che tu possa creare EventEmitter nel master, emettere un evento ogni volta che viene ricevuto un messaggio. Dopo la creazione di ciascun lavoratore, è sufficiente aggiungere un listener all'EventEmitter che invierà il messaggio all'operatore. Naturalmente questo è ancora implementato memorizzando i riferimenti degli ascoltatori (quindi anche dell'operatore) nell'EventEmitter, ma hey, almeno non è necessario guardarlo – cheng81

36

Perché cluster.fork è implementato in cima child_process.fork, è possibile inviare messaggi da un maestro al lavoratore utilizzando worker.send({ msg: 'test' }), e da un lavoratore di un maestro da process.send({ msg: 'test' });. Riceverai i messaggi in questo modo: worker.on('message', callback) (da worker a master) e process.on('message', callback); (da master a worker).

Ecco il mio esempio completo, è possibile verificare che naviga http://localhost:8000/ Quindi il lavoratore invierà un messaggio al master e il maestro risponderà:

var cluster = require('cluster'); 
var http = require('http'); 
var numReqs = 0; 
var worker; 

if (cluster.isMaster) { 
    // Fork workers. 
    for (var i = 0; i < 2; i++) { 
    worker = cluster.fork(); 

    worker.on('message', function(msg) { 
     // we only want to intercept messages that have a chat property 
     if (msg.chat) { 
     console.log('Worker to master: ', msg.chat); 
     worker.send({ chat: 'Ok worker, Master got the message! Over and out!' }); 
     } 
    }); 

    } 
} else { 
    process.on('message', function(msg) { 
    // we only want to intercept messages that have a chat property 
    if (msg.chat) { 
     console.log('Master to worker: ', msg.chat); 
    } 
    }); 
    // Worker processes have a http server. 
    http.Server(function(req, res) { 
    res.writeHead(200); 
    res.end("hello world\n"); 
    // Send message to master process 
    process.send({ chat: 'Hey master, I got a new request!' }); 
    }).listen(8000); 
} 
+0

In realtà voglio creare un server push per (web/flash) client socket. La versione attuale si impila su 1000 connessioni simultanee. Così ho deciso di creare diversi lavoratori con gli ascoltatori socket.io. Ciò significa che ho bisogno di passare dati ai lavoratori in modo asincrono. – htonus

+2

Sembra ok, assicurati di usare Socket.IO con RedisStore. – alessioalex

+1

Questo non funzionerà. 'var' dentro' for'? 'worker' manterrà l'ultimo worker biforcuto, non tutti (specialmente all'interno della callback dell'evento). O non ti importa di tutto e devi solo includere la tua richiamata o tenere tutti i lavoratori in Array. – dresende

8

ho trovato questa discussione mentre alla ricerca di un modo per inviare un messaggio per tutti i processi figlio ed è stato fortunatamente in grado di capirlo grazie ai commenti sugli array. Volevo solo illustrare una potenziale soluzione per l'invio di un messaggio a tutti i processi figlio che utilizzano questo approccio.

var cluster = require('cluster'); 
var http = require('http'); 
var numReqs = 0; 
var workers = []; 

if (cluster.isMaster) { 
    // Broadcast a message to all workers 
    var broadcast = function() { 
    for (var i in workers) { 
     var worker = workers[i]; 
     worker.send({ cmd: 'broadcast', numReqs: numReqs }); 
    } 
    } 

    // Fork workers. 
    for (var i = 0; i < 2; i++) { 
    var worker = cluster.fork(); 

    worker.on('message', function(msg) { 
     if (msg.cmd) { 
     switch (msg.cmd) { 
      case 'notifyRequest': 
      numReqs++; 
      break; 
      case 'broadcast': 
      broadcast(); 
      break; 
     } 
    }); 

    // Add the worker to an array of known workers 
    workers.push(worker); 
    } 

    setInterval(function() { 
    console.log("numReqs =", numReqs); 
    }, 1000); 
} else { 
    // React to messages received from master 
    process.on('message', function(msg) { 
    switch(msg.cmd) { 
     case 'broadcast': 
     if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs); 
     break; 
    } 
    }); 

    // Worker processes have a http server. 
    http.Server(function(req, res) { 
    res.writeHead(200); 
    res.end("hello world\n"); 
    // Send message to master process 
    process.send({ cmd: 'notifyRequest' }); 
    process.send({ cmd: 'broadcast' }); 
    }).listen(8000); 
} 
3

Ecco come ho implementato una soluzione per un problema simile. Agganciandosi a cluster.on('fork'), è possibile associare i gestori di messaggi ai lavoratori mentre sono biforcati (anziché archiviarli in un array), il che ha il vantaggio di trattare i casi in cui i lavoratori muoiono o si disconnettono e un nuovo operatore è biforcuto.

Questo snippet invia un messaggio dal master a tutti gli operatori.

if (cluster.isMaster) { 
    for (var i = 0; i < require('os').cpus.length; i++) { 
     cluster.fork(); 
    } 

    cluster.on('disconnect', function(worker) { 
     cluster.fork(); 
    } 

    // When a new worker process is forked, attach the handler 
    // This handles cases where new worker processes are forked 
    // on disconnect/exit, as above. 
    cluster.on('fork', function(worker) { 
     worker.on('message', messageRelay); 
    } 

    var messageRelay = function(msg) { 
     Object.keys(cluster.workers).forEach(function(id) { 
      cluster.workers[id].send(msg); 
     }); 
    }; 
} 
else { 
    process.on('message', messageHandler); 

    var messageHandler = function messageHandler(msg) { 
     // Worker received message--do something 
    }; 
} 
1

Capisco il vostro scopo della diffusione a tutti i processi di lavoro nodo in un cluster, anche se non è possibile inviare componente presa in quanto tale, ma c'è un lavoro in giro per lo scopo per essere servito. Cercherò di spiegare con un esempio:

Passo 1: Quando un'azione cliente richiede una trasmissione:

Child.js (Process that has been forked) : 

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{ 
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message}); 
}) 

Fase 2: Dal lato creazione del cluster

Server.js (Place where cluster forking happens): 

if (cluster.isMaster) { 

    for (var i = 0; i < numCPUs; i++) { 

var worker = cluster.fork();

worker.on('message', function (data) { 
    if (data.cmd === "BROADCAST_TO_ALL_WORKERS") { 
     console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id); 
     Object.keys(cluster.workers).forEach(function(id) { 
      cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message}); 
     }); 
     } 
    }); 
    } 

    cluster.on('exit', function (worker, code, signal) { 
    var newWorker = cluster.fork(); 
    newWorker.on('message', function (data) { 
     console.log(data); 
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") { 
     console.log(data.cmd,data); 
     Object.keys(cluster.workers).forEach(function(id) { 
      cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message}); 
     }); 
     } 
    }); 
    }); 
} 
else { 
    //Node Js App Entry 
    require("./Child.js"); 
} 

Fase 3: Per trasmettere nel processo figlio -

-> Metti questa prima io.on ("collegamento") in bambino.js

process.on("message", function(data){ 
    if(data.cmd === "BROADCAST_TO_WORKER"){ 
     io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id }); 
    } 
}); 

Spero che questo aiuti. Per favore fatemi sapere se sono richiesti ulteriori chiarimenti.