2013-05-08 11 views
13
  • 2 flussi:concatenare due (o n) flussi

    Dato leggibile streamsstream1 e stream2, che cosa è un modo (condensato) idiomatica per ottenere uno stream contenente stream1 e stream2 concatenato?

    Non riesco a fare stream1.pipe(outStream); stream2.pipe(outStream), perché quindi i contenuti del flusso sono mescolati insieme.

  • n flussi:

    Dato un EventEmitter che emette un numero indeterminato di flussi, ad esempio

    eventEmitter.emit('stream', stream1) 
    eventEmitter.emit('stream', stream2) 
    eventEmitter.emit('stream', stream3) 
    ... 
    eventEmitter.emit('end') 
    

    che cosa è un modo (condensato) idiomatica per ottenere un ruscello con tutti i flussi concatenati insieme?

risposta

11

Il pacchetto combined-stream concatena flussi. Esempio dal README:

var CombinedStream = require('combined-stream'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 

Credo che sia necessario aggiungere tutti i flussi contemporaneamente. Se la coda è vuota, lo combinedStream termina automaticamente. Vedi issue #5.

La libreria stream-stream è un'alternativa che ha un esplicito .end, ma è molto meno popolare e presumibilmente non altrettanto collaudato. Utilizza l'API stream2 del nodo 0.10 (vedere this discussion).

3

Potreste essere in grado di renderlo più conciso, ma qui è quello che funziona:

var util = require('util'); 
var EventEmitter = require('events').EventEmitter; 

function ConcatStream(streamStream) { 
    EventEmitter.call(this); 
    var isStreaming = false, 
    streamsEnded = false, 
    that = this; 

    var streams = []; 
    streamStream.on('stream', function(stream){ 
    stream.pause(); 
    streams.push(stream); 
    ensureState(); 
    }); 

    streamStream.on('end', function() { 
    streamsEnded = true; 
    ensureState(); 
    }); 

    var ensureState = function() { 
    if(isStreaming) return; 
    if(streams.length == 0) { 
     if(streamsEnded) 
     that.emit('end'); 
     return; 
    } 
    isStreaming = true; 
    streams[0].on('data', onData); 
    streams[0].on('end', onEnd); 
    streams[0].resume(); 
    }; 

    var onData = function(data) { 
    that.emit('data', data); 
    }; 

    var onEnd = function() { 
    isStreaming = false; 
    streams[0].removeAllListeners('data'); 
    streams[0].removeAllListeners('end'); 
    streams.shift(); 
    ensureState(); 
    }; 
} 

util.inherits(ConcatStream, EventEmitter); 

teniamo traccia di stato con streams (la coda dei flussi; push alla schiena e shift da la parte anteriore), isStreaming e streamsEnded. Quando otteniamo un nuovo stream, lo spingiamo e quando termina un flusso, smettiamo di ascoltarlo e lo spostiamo. Quando il flusso di stream termina, impostiamo streamsEnded.

Su ciascuno di questi eventi, controlliamo lo stato in cui ci troviamo. Se siamo già in streaming (convogliare un flusso), non facciamo nulla. Se la coda è vuota e viene impostato streamsEnded, emetteremo l'evento end. Se c'è qualcosa in coda, lo riprendiamo e ascoltiamo i suoi eventi.

* Nota che pause e resume sono di consulenza, quindi alcuni flussi potrebbero non funzionare correttamente e richiedere il buffering. Questo esercizio è lasciato al lettore.

Dopo aver fatto tutto questo, vorrei fare il caso n=2 costruendo un EventEmitter, creando un ConcatStream con esso, ed emettendo due stream eventi seguiti da un evento end. Sono sicuro che potrebbe essere fatto in modo più conciso, ma possiamo anche usare quello che abbiamo.

+0

Grazie Aaron! Speravo un po 'che ci fosse una libreria esistente, quindi posso risolverla in tre righe. Se non c'è, penso che potrei estrarre la tua soluzione in un pacchetto. Posso usare il tuo codice con una licenza MIT? –

+0

Ah, trovato la libreria stream-stream. Vedi la mia risposta. –

+0

@JoLiss Ho anche cercato qualcosa prima, ma non ho trovato questa opzione. Puoi sicuramente usare il mio codice in una libreria se lo vuoi ancora. –

1

streamee.js è un insieme di trasformatori di flusso e compositori basati sul nodo 1.0+ ruscelli e includono un metodo di concatenazione:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]); 
+0

Grazie, controllerò. Quello è il nodo 0.10 presumo? –

+0

Sì Nodo 0.10, ma è possibile eseguire il wrapping di flussi vecchio stile in più di 0,10 flussi come indicato nel README – atamborrino

2

https://github.com/joepie91/node-combined-stream2 è un rimpiazzo compatibile Streams2 per il modulo combinato-stream (descritta sopra). Si avvolge automaticamente Streams1 flussi.

codice di esempio per combinato stream2:

var CombinedStream = require('combined-stream2'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 
3

questo può essere fatto con vaniglia nodejs

import { PassThrough } from 'stream' 
const merge = (...streams) => { 
    let pass = new PassThrough() 
    let waiting = streams.length 
    for (let stream of streams) { 
     pass = stream.pipe(pass, {end: false}) 
     stream.once('end',() => --waiting === 0 && pass.emit('end')) 
    } 
    return pass 
} 
5

Una semplice operazione reduce dovrebbe andare bene in nodejs!

const {PassThrough} = require('stream') 

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { 
    s.pipe(pt, {end: false}) 
    s.once('end',() => a.every(s => s.ended) && pt.emit('end')) 
    return pt 
}, new PassThrough()) 

Saluti;)

+0

Non si dovrebbe restituire qualcosa da ridurre? Sembra che "unito" non sia definito. –

+1

corretto. grazie;) @Mark_M – Ivo

+0

AVVISO: questo farà sì che tutti gli stream si colleghino allo stream PassThrough in parallelo, senza alcun riguardo verso l'ordinamento dei dati, più che probabile corrompere i dati. –