2 flussi:concatenare due (o n) flussi
Dato leggibile streams
stream1
estream2
, che cosa è un modo (condensato) idiomatica per ottenere uno stream contenentestream1
estream2
concatenato?Non riesco a fare
stream1.pipe(outStream); stream2.pipe(outStream)
, perché quindi i contenuti del flusso sono mescolati insieme.n flussi:
Dato un EventEmitter che emette un numero indeterminato di flussi, ad esempio
eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end')
che cosa è un modo (condensato) idiomatica per ottenere un ruscello con tutti i flussi concatenati insieme?
risposta
Il pacchetto combined-stream concatena flussi. Esempio dal README:
var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
Credo che sia necessario aggiungere tutti i flussi contemporaneamente. Se la coda è vuota, lo combinedStream
termina automaticamente. Vedi issue #5.
La libreria stream-stream è un'alternativa che ha un esplicito .end
, ma è molto meno popolare e presumibilmente non altrettanto collaudato. Utilizza l'API stream2 del nodo 0.10 (vedere this discussion).
Potreste essere in grado di renderlo più conciso, ma qui è quello che funziona:
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream) {
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on('stream', function(stream){
stream.pause();
streams.push(stream);
ensureState();
});
streamStream.on('end', function() {
streamsEnded = true;
ensureState();
});
var ensureState = function() {
if(isStreaming) return;
if(streams.length == 0) {
if(streamsEnded)
that.emit('end');
return;
}
isStreaming = true;
streams[0].on('data', onData);
streams[0].on('end', onEnd);
streams[0].resume();
};
var onData = function(data) {
that.emit('data', data);
};
var onEnd = function() {
isStreaming = false;
streams[0].removeAllListeners('data');
streams[0].removeAllListeners('end');
streams.shift();
ensureState();
};
}
util.inherits(ConcatStream, EventEmitter);
teniamo traccia di stato con streams
(la coda dei flussi; push
alla schiena e shift
da la parte anteriore), isStreaming
e streamsEnded
. Quando otteniamo un nuovo stream, lo spingiamo e quando termina un flusso, smettiamo di ascoltarlo e lo spostiamo. Quando il flusso di stream termina, impostiamo streamsEnded
.
Su ciascuno di questi eventi, controlliamo lo stato in cui ci troviamo. Se siamo già in streaming (convogliare un flusso), non facciamo nulla. Se la coda è vuota e viene impostato streamsEnded
, emetteremo l'evento end
. Se c'è qualcosa in coda, lo riprendiamo e ascoltiamo i suoi eventi.
* Nota che pause
e resume
sono di consulenza, quindi alcuni flussi potrebbero non funzionare correttamente e richiedere il buffering. Questo esercizio è lasciato al lettore.
Dopo aver fatto tutto questo, vorrei fare il caso n=2
costruendo un EventEmitter
, creando un ConcatStream
con esso, ed emettendo due stream
eventi seguiti da un evento end
. Sono sicuro che potrebbe essere fatto in modo più conciso, ma possiamo anche usare quello che abbiamo.
streamee.js è un insieme di trasformatori di flusso e compositori basati sul nodo 1.0+ ruscelli e includono un metodo di concatenazione:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
Grazie, controllerò. Quello è il nodo 0.10 presumo? –
Sì Nodo 0.10, ma è possibile eseguire il wrapping di flussi vecchio stile in più di 0,10 flussi come indicato nel README – atamborrino
https://github.com/joepie91/node-combined-stream2 è un rimpiazzo compatibile Streams2 per il modulo combinato-stream (descritta sopra). Si avvolge automaticamente Streams1 flussi.
codice di esempio per combinato stream2:
var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
questo può essere fatto con vaniglia nodejs
import { PassThrough } from 'stream'
const merge = (...streams) => {
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams) {
pass = stream.pipe(pass, {end: false})
stream.once('end',() => --waiting === 0 && pass.emit('end'))
}
return pass
}
Una semplice operazione reduce dovrebbe andare bene in nodejs!
const {PassThrough} = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
s.pipe(pt, {end: false})
s.once('end',() => a.every(s => s.ended) && pt.emit('end'))
return pt
}, new PassThrough())
Saluti;)
Non si dovrebbe restituire qualcosa da ridurre? Sembra che "unito" non sia definito. –
corretto. grazie;) @Mark_M – Ivo
AVVISO: questo farà sì che tutti gli stream si colleghino allo stream PassThrough in parallelo, senza alcun riguardo verso l'ordinamento dei dati, più che probabile corrompere i dati. –
Grazie Aaron! Speravo un po 'che ci fosse una libreria esistente, quindi posso risolverla in tre righe. Se non c'è, penso che potrei estrarre la tua soluzione in un pacchetto. Posso usare il tuo codice con una licenza MIT? –
Ah, trovato la libreria stream-stream. Vedi la mia risposta. –
@JoLiss Ho anche cercato qualcosa prima, ma non ho trovato questa opzione. Puoi sicuramente usare il mio codice in una libreria se lo vuoi ancora. –