2012-06-27 16 views
10

Ho bisogno di elaborare il modo migliore per leggere i dati che vengono scritti su un file, usando node.js, in tempo reale. Il problema è che Node è una nave in rapido movimento che rende difficile trovare il metodo migliore per affrontare un problema.Leggere un file in tempo reale usando Node.js

quello che voglio fare
Ho un processo Java che sta facendo qualcosa e poi a scrivere i risultati di questa cosa che fa un file di testo. In genere, per eseguire l'elaborazione sono necessari da 5 a 5 ore, con i dati scritti per tutto il tempo e può arrivare a un numero di throughput piuttosto elevato (circa 1000 righe/sec).

Mi piacerebbe leggere questo file, in tempo reale, e quindi, utilizzando il nodo aggregare i dati e scriverlo in un socket in cui può essere rappresentato graficamente sul client.

Il client, i grafici, i socket e la logica di aggregazione sono tutti eseguiti, ma sono confuso dall'approccio migliore per leggere il file.

quello che ho provato (o almeno suonato con)
FIFO - posso dire il mio processo Java di scrivere a una FIFO e leggere questo usando nodo, questo è in realtà il modo in cui abbiamo questo momento implemted utilizzando Perl , ma poiché tutto il resto è in esecuzione nel nodo, ha senso eseguire il porting del codice.

Unix Sockets - Come sopra.

fs.watchFile - funzionerà per ciò di cui abbiamo bisogno?

fs.createReadStream - è meglio di watchFile?

fs & tail -f - sembra un hack.

Che, in realtà, è la mia domanda
sto tendente usando Sockets Unix, questo sembra l'opzione più veloce. Ma il nodo ha migliori funzionalità integrate per la lettura dei file dalla fs in tempo reale?

risposta

6

Se si desidera mantenere il file come un archivio permanente dei dati per evitare una perdita di flusso in caso di un crash di sistema o di uno dei membri della rete dei processi in esecuzione muore, puoi ancora continuare a scrivere su un file e leggerne il contenuto.

Se non si ha bisogno di questo file come memoria permanente dei risultati prodotti dal proprio processo Java, andare con un socket Unix è molto meglio sia per la facilità che per le prestazioni.

fs.watchFile() non è quello che ti serve perché funziona su statistiche di file come il filesystem lo segnala e dal momento che vuoi leggere il file così come è già stato scritto, questo non è quello che vuoi.

BREVE UPDATE: Mi dispiace molto per rendersi conto che anche se avevo accusato fs.watchFile() per l'utilizzo di statistiche di file nel paragrafo precedente, avevo fatto la stessa cosa me stesso nel mio codice di esempio qui sotto! Anche se avevo già avvertito i lettori di "prendersi cura!" perché l'avevo scritto in pochi minuti senza nemmeno testare bene; ancora, può essere fatto meglio usando fs.watch() invece di watchFile o fstatSync se il sistema sottostante lo supporta.

per la lettura/scrittura da un file, ho appena scritto qui di seguito per divertimento nella mia pausa:

test-fs-writer.js: [Non avrete bisogno di questo dato si scrive file nel Java processo]

var fs = require('fs'), 
    lineno=0; 

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); 

stream.on('open', function() { 
    console.log('Stream opened, will start writing in 2 secs'); 
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); 
}); 

prova-FS-reader.js: [Fate attenzione, questa è solo la dimostrazione, controllare gli oggetti ERR]

var fs = require('fs'), 
    bite_size = 256, 
    readbytes = 0, 
    file; 

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); 

function readsome() { 
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense! 
    if(stats.size<readbytes+1) { 
     console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); 
     setTimeout(readsome, 3000); 
    } 
    else { 
     fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); 
    } 
} 

function processsome(err, bytecount, buff) { 
    console.log('Read', bytecount, 'and will process it now.'); 

    // Here we will process our incoming data: 
     // Do whatever you need. Just be careful about not using beyond the bytecount in buff. 
     console.log(buff.toString('utf-8', 0, bytecount)); 

    // So we continue reading from where we left: 
    readbytes+=bytecount; 
    process.nextTick(readsome); 
} 
!

È possibile evitare in modo sicuro l'utilizzo di nextTick e chiamare direttamente readsome(). Poiché stiamo ancora lavorando qui, non è necessario in nessun senso. Mi piace. : P

EDIT Oliver Lloyd

Prendendo l'esempio precedente, ma si estende per leggere dati CSV dà:

var lastLineFeed, 
    lineArray; 
function processsome(err, bytecount, buff) { 
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); 

    if(lastLineFeed > -1){ 

     // Split the buffer by line 
     lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); 

     // Then split each line by comma 
     for(i=0;i<lineArray.length;i++){ 
      // Add read rows to an array for use elsewhere 
      valueArray.push(lineArray[i].split(',')); 
     } 

     // Set a new position to read from 
     readbytes+=lastLineFeed+1; 
    } else { 
     // No complete lines were read 
     readbytes+=bytecount; 
    } 
    process.nextTick(readFile); 
} 
+0

Questo è un buon esempio che affronta direttamente la mia domanda. Richiederà comunque un miglioramento solo per elaborare una linea alla volta, ma probabilmente questa è una buona cosa; la mancanza di un'interfaccia fs esistente del nodo significa che è completamente personalizzabile, quindi anche se devo scrivere codice extra, posso ottenere esattamente ciò di cui ho bisogno. –

+0

Ho esteso l'esempio sopra per lavorare con un file CSV. –

+0

Funziona assolutamente quando viene eseguito come nodo ma come posso inserire questo codice in app.js e ottenere il risultato nella pagina html? – sand

4

Perché pensi che lo tail -f sia un hack?

Mentre capisco che ho trovato un buon esempio, vorrei fare qualcosa di simile. reale tempo esempio monitorare l'attività online con node.js e WebSocket:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

Giusto per rendere la risposta completa, ti ho scritto un codice di esempio che sarebbe eseguito in 0.8.0 - (il server HTTP è un hack forse).

Un processo figlio viene generato in esecuzione con la coda, e dal momento che un processo figlio è un EventEmitter con tre flussi (usiamo stdout nel nostro caso) si può semplicemente aggiungere l'un ascoltatore con on

filename: tailServer.js

utilizzo: node tailServer /var/log/filename.log

var http = require("http"); 
var filename = process.argv[2]; 


if (!filename) 
    return console.log("Usage: node tailServer filename"); 

var spawn = require('child_process').spawn; 
var tail = spawn('tail', ['-f', filename]); 

http.createServer(function (request, response) { 
    console.log('request starting...'); 

    response.writeHead(200, {'Content-Type': 'text/plain' }); 

    tail.stdout.on('data', function (data) { 
     response.write('' + data);     
    }); 
}).listen(8088); 

console.log('Server running at http://127.0.0.1:8088/'); 
+0

La mia preoccupazione con tail -f è che richiede che il processo di lettura sia attivo prima che il file viene scritto, se non è i dati sono perduto. Il mio caso d'uso è tale che la lettura potrebbe accadere molto tempo dopo che i dati sono stati scritti. +1 per l'aggiornamento a 0.8 sebbene questa sia una buona soluzione per dove la scrittura e la lettura sono controllate dalla stessa fonte. –

+0

watchFile è anche gestito da eventi, ma è stabile secondo la documentazione. L'esempio sopra il file handels cambia con il polling nel codice di alto livello. Per me sembra un trucco. Ma finché funziona per te è bello farlo. Altrimenti si potrebbe "toccare" il file se non esiste e non si perderebbe alcun dato e si potrebbero aggiungere righe di file con 'wc -l message.text | awk '{print $ 1}' 'e consegnalo a' tail -f -n' – vik

1

questo modulo è un'implementazione del principio @hasanyasin suggerisce:

https://github.com/felixge/node-growing-file

+0

Grazie, sembra che funzionerebbe bene qui e gli altri progetti di felixge sono solidi quindi sono felice di provare questo modulo. –

0

Ho preso la risposta da @hasanyasin e l'ho avvolta in una promessa modulare. L'idea di base è che si passa un file e una funzione di gestione che fa qualcosa con il buffer stringificato che viene letto dal file. Se la funzione del gestore restituisce true, il file smetterà di essere letto. È anche possibile impostare un timeout che interrompe la lettura se il gestore non restituisce true abbastanza velocemente.

Il promiser restituirà true se il resolve() è stato chiamato a causa del timeout, altrimenti restituirà false.

Vedere la parte inferiore per l'esempio di utilizzo.

// https://stackoverflow.com/a/11233045 

var fs = require('fs'); 
var Promise = require('promise'); 

class liveReaderPromiseMe { 
    constructor(file, buffStringHandler, opts) { 
     /* 
      var opts = { 
       starting_position: 0, 
       byte_size: 256, 
       check_for_bytes_every_ms: 3000, 
       no_handler_resolution_timeout_ms: null 
      }; 
     */ 

     if (file == null) { 
      throw new Error("file arg must be present"); 
     } else { 
      this.file = file; 
     } 

     if (buffStringHandler == null) { 
      throw new Error("buffStringHandler arg must be present"); 
     } else { 
      this.buffStringHandler = buffStringHandler; 
     } 

     if (opts == null) { 
      opts = {}; 
     } 

     if (opts.starting_position == null) { 
      this.current_position = 0; 
     } else { 
      this.current_position = opts.starting_position; 
     } 

     if (opts.byte_size == null) { 
      this.byte_size = 256; 
     } else { 
      this.byte_size = opts.byte_size; 
     } 

     if (opts.check_for_bytes_every_ms == null) { 
      this.check_for_bytes_every_ms = 3000; 
     } else { 
      this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; 
     } 

     if (opts.no_handler_resolution_timeout_ms == null) { 
      this.no_handler_resolution_timeout_ms = null; 
     } else { 
      this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; 
     } 
    } 


    startHandlerTimeout() { 
     if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { 
      var that = this; 
      this._handlerTimer = setTimeout(
       function() { 
        that._is_handler_timed_out = true; 
       }, 
       this.no_handler_resolution_timeout_ms 
      ); 
     } 
    } 

    clearHandlerTimeout() { 
     if (this._handlerTimer != null) { 
      clearTimeout(this._handlerTimer); 
      this._handlerTimer = null; 
     } 
     this._is_handler_timed_out = false; 
    } 

    isHandlerTimedOut() { 
     return !!this._is_handler_timed_out; 
    } 


    fsReadCallback(err, bytecount, buff) { 
     try { 
      if (err) { 
       throw err; 
      } else { 
       this.current_position += bytecount; 
       var buff_str = buff.toString('utf-8', 0, bytecount); 

       var that = this; 

       Promise.resolve().then(function() { 
        return that.buffStringHandler(buff_str); 
       }).then(function(is_handler_resolved) { 
        if (is_handler_resolved) { 
         that.resolve(false); 
        } else { 
         process.nextTick(that.doReading.bind(that)); 
        } 
       }).catch(function(err) { 
        that.reject(err); 
       }); 
      } 
     } catch(err) { 
      this.reject(err); 
     } 
    } 

    fsRead(bytecount) { 
     fs.read(
      this.file, 
      new Buffer(bytecount), 
      0, 
      bytecount, 
      this.current_position, 
      this.fsReadCallback.bind(this) 
     ); 
    } 

    doReading() { 
     if (this.isHandlerTimedOut()) { 
      return this.resolve(true); 
     } 

     var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; 
     if (max_next_bytes) { 
      this.fsRead((this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size); 
     } else { 
      setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); 
     } 
    } 


    promiser() { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.resolve = resolve; 
      that.reject = reject; 
      that.doReading(); 
      that.startHandlerTimeout(); 
     }).then(function(was_resolved_by_timeout) { 
      that.clearHandlerTimeout(); 
      return was_resolved_by_timeout; 
     }); 
    } 
} 


module.exports = function(file, buffStringHandler, opts) { 
    try { 
     var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); 
     return live_reader.promiser(); 
    } catch(err) { 
     return Promise.reject(err); 
    } 
}; 

quindi utilizzare il codice di cui sopra in questo modo:

var fs = require('fs'); 
var path = require('path'); 
var Promise = require('promise'); 
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); 

var ending_str = '_THIS_IS_THE_END_'; 
var test_path = path.join('E:/tmp/test.txt'); 

var s_list = []; 
var buffStringHandler = function(s) { 
    s_list.push(s); 
    var tmp = s_list.join(''); 
    if (-1 !== tmp.indexOf(ending_str)) { 
     // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms 
     // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true 
     return true; 
     // you can also return a promise: 
     // return Promise.resolve().then(function() { return true; }); 
    } 
}; 

var appender = fs.openSync(test_path, 'a'); 
try { 
    var reader = fs.openSync(test_path, 'r'); 
    try { 
     var options = { 
      starting_position: 0, 
      byte_size: 256, 
      check_for_bytes_every_ms: 3000, 
      no_handler_resolution_timeout_ms: 10000, 
     }; 

     liveReadAppendingFilePromiser(reader, buffStringHandler, options) 
     .then(function(did_reader_time_out) { 
      console.log('reader timed out: ', did_reader_time_out); 
      console.log(s_list.join('')); 
     }).catch(function(err) { 
      console.error('bad stuff: ', err); 
     }).then(function() { 
      fs.closeSync(appender); 
      fs.closeSync(reader); 
     }); 

     fs.write(appender, '\ncheck it out, I am a string'); 
     fs.write(appender, '\nwho killed kenny'); 
     //fs.write(appender, ending_str); 
    } catch(err) { 
     fs.closeSync(reader); 
     console.log('err1'); 
     throw err; 
    } 
} catch(err) { 
    fs.closeSync(appender); 
     console.log('err2'); 
    throw err; 
}