2015-10-14 13 views
7

È necessario creare una funzione per l'elaborazione di file CSV di grandi dimensioni da utilizzare in una chiamata bluebird.map(). Date le dimensioni potenziali del file, mi piacerebbe utilizzare lo streaming.NodeJS, promises, stream - elaborazione di file CSV di grandi dimensioni

Questa funzione deve accettare uno stream (un file CSV) e una funzione (che elabora i blocchi dallo stream) e restituire una promessa quando il file viene letto fino alla fine (risolto) o errori (rifiutato).

Così, ho iniziare con:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    // use readable or data event? 
    parser.on('readable', function() { 
    // call processor, which may be async 
    // how do I throttle the amount of promises generated 
    }); 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 

} 

Ora, ho due questioni tra loro collegati:

  1. ho bisogno di strozzare la quantità effettiva di dati che vengono elaborati, in modo da non creare pressioni della memoria.
  2. La funzione passata come parametro processor è spesso asincrona, ad esempio il salvataggio del contenuto del file nel db tramite una libreria basata su promesse (in questo momento: pg-promise). Come tale, creerà una promessa in memoria e andrà avanti, ripetutamente.

La biblioteca pg-promise ha funzioni per gestire questo, come page(), ma io non sono in grado di avvolgere la mia avanti intorno come mescolare i gestori di eventi torrente con questi metodi promessa. In questo momento, restituisco una promessa al gestore per la sezione readable dopo ogni read(), il che significa che creo una quantità enorme di operazioni del database promesse e alla fine si verifica un errore perché ho raggiunto un limite di memoria del processo.

Qualcuno ha un esempio funzionante di questo che posso usare come punto di salto?

UPDATE: Probabilmente più di un modo per scuoiare il gatto, ma questo funziona:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    // some checks trimmed out for example 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 
    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    var readDataFromStream = function(index, data, delay) { 
    var records = []; 
    var record; 
    do { 
     record = parser.read(); 
     if(record != null) 
     records.push(record); 
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency)) 
    parser.pause(); 

    if(records.length) 
     return records; 
    }; 

    var processData = function(index, data, delay) { 
    console.log('processData(' + index + ') > data: ', data); 
    parser.resume(); 
    }; 

    parser.on('readable', function() { 
    db.task(function(tsk) { 
     this.page(readDataFromStream, processData); 
    }); 
    }); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 
} 

Chiunque vede un potenziale problema con questo approccio?

+0

Sembra pulito, e se funziona, allora ottimo lavoro! Sono contento che l'aggiunta più recente di 'page' in' pg-promise' non sia stata invano;) –

+0

Semplicemente semplificata alla fine di readDataFromStream;) Non è necessario 'return undefined', ecco cosa succede quando non si restituisce nulla in ogni caso;) –

+0

In realtà, potrebbe esserci un problema con questo ... quando si chiama db.task, non si gestisce il risultato da esso, quindi nel caso in cui rifiuti, ci sarà un errore lanciato dal prometti una libreria che il tuo rifiuto non è stato gestito. –

risposta

3

Trova qui sotto un'applicazione completa che esegue correttamente lo stesso tipo di attività desiderata: legge un file come flusso, lo analizza come un CSV e inserisce ciascuna riga nel database.

const fs = require('fs'); 
const promise = require('bluebird'); 
const csv = require('csv-parse'); 
const pgp = require('pg-promise')({promiseLib: promise}); 

const cn = "postgres://postgres:[email protected]:5432/test_db"; 
const rs = fs.createReadStream('primes.csv'); 

const db = pgp(cn); 

function receiver(_, data) { 
    function source(index) { 
     if (index < data.length) { 
      // here we insert just the first column value that contains a prime number; 
      return this.none('insert into primes values($1)', data[index][0]); 
     } 
    } 

    return this.sequence(source); 
} 

db.task(t => { 
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver); 
}) 
    .then(data => { 
     console.log('DATA:', data); 
    } 
    .catch(error => { 
     console.log('ERROR:', error); 
    }); 

Nota che l'unica cosa che ho cambiato: usando la libreria csv-parse invece di csv, come un'alternativa migliore.

Aggiunto l'uso del metodo stream.read dalla libreria spex, che serve correttamente uno stream Readable da utilizzare con promesse.

+0

Non proverebbe a leggere l'elemento successivo dal' parser' dopo che 'query (" INSERT ... ")' è stata eseguita, irregardless se l'elemento successivo è già leggibile? Oppure 'parser.read()' restituisce una promessa? – Bergi

+0

Inoltre, che cosa è successo alla funzione di callback del 'processore di ritorno promessa 'che OP stava cercando? – Bergi

+0

@Bergi La mia comprensione è che parser.read() è sincrono, come è stato mostrato. E se si scopre che non lo è, allora dovrà essere avvolto in una promessa, ovviamente. E 'leggibile' viene sparato una volta, non per ogni operazione di lettura, questa è la mia comprensione. Per quanto riguarda il processore che restituisce promesse, stava semplicemente cercando una soluzione quando l'elaborazione dei dati è terminata e uno scarto in caso di fallimento, che il mio esempio fornisce, cioè l'attività verrà risolta/rifiutata di conseguenza. –

1

Quindi, per dire che non vuoi lo streaming ma una specie di blocchi di dati? ;-)

Lo sai https://github.com/substack/stream-handbook?

Penso che l'approccio più semplice senza modificare l'architettura sarebbe una sorta di promessa. per esempio. https://github.com/timdp/es6-promise-pool

+0

Bene, ho pensato di usare 'async.queue' nella funzione, restituendo una promessa di finire il file (o meno). Tuttavia, mi stavo chiedendo come si leghi una libreria di promessa come Bluebird con la tipica elaborazione basata sul flusso di file di grandi dimensioni. ('pg-promise' include 'spex', che fornisce funzioni promesse di livello superiore) – alphadogg

6

Si potrebbe desiderare di guardare promise-streams

var ps = require('promise-streams'); 
passedStream 
    .pipe(csv.parse({trim: true})) 
    .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row))) 
    .wait().then(_ => { 
    console.log("All done!"); 
    }); 

Opere con contropressione e tutto.