È necessario creare una funzione per l'elaborazione di file CSV di grandi dimensioni da utilizzare in una chiamata bluebird.map(). Date le dimensioni potenziali del file, mi piacerebbe utilizzare lo streaming.NodeJS, promises, stream - elaborazione di file CSV di grandi dimensioni
Questa funzione deve accettare uno stream (un file CSV) e una funzione (che elabora i blocchi dallo stream) e restituire una promessa quando il file viene letto fino alla fine (risolto) o errori (rifiutato).
Così, ho iniziare con:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Ora, ho due questioni tra loro collegati:
- ho bisogno di strozzare la quantità effettiva di dati che vengono elaborati, in modo da non creare pressioni della memoria.
- La funzione passata come parametro
processor
è spesso asincrona, ad esempio il salvataggio del contenuto del file nel db tramite una libreria basata su promesse (in questo momento:pg-promise
). Come tale, creerà una promessa in memoria e andrà avanti, ripetutamente.
La biblioteca pg-promise
ha funzioni per gestire questo, come page(), ma io non sono in grado di avvolgere la mia avanti intorno come mescolare i gestori di eventi torrente con questi metodi promessa. In questo momento, restituisco una promessa al gestore per la sezione readable
dopo ogni read()
, il che significa che creo una quantità enorme di operazioni del database promesse e alla fine si verifica un errore perché ho raggiunto un limite di memoria del processo.
Qualcuno ha un esempio funzionante di questo che posso usare come punto di salto?
UPDATE: Probabilmente più di un modo per scuoiare il gatto, ma questo funziona:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Chiunque vede un potenziale problema con questo approccio?
Sembra pulito, e se funziona, allora ottimo lavoro! Sono contento che l'aggiunta più recente di 'page' in' pg-promise' non sia stata invano;) –
Semplicemente semplificata alla fine di readDataFromStream;) Non è necessario 'return undefined', ecco cosa succede quando non si restituisce nulla in ogni caso;) –
In realtà, potrebbe esserci un problema con questo ... quando si chiama db.task, non si gestisce il risultato da esso, quindi nel caso in cui rifiuti, ci sarà un errore lanciato dal prometti una libreria che il tuo rifiuto non è stato gestito. –