2016-03-10 16 views
13

Ambiente: NodeJS, espresso, DynamoDB (ma potrebbe essere qualsiasi database davvero)Come creare uno streaming leggibile con un'origine dati asincrona in NodeJs?

Scenario: bisogno di leggere un gran numero di record e tornare l'utente come un file scaricabile. Ciò significa che non è possibile eseguire il buffer di tutto il contenuto in una sola volta e quindi inviarlo in risposta da Express. Inoltre, potrebbe essere necessario eseguire la query più volte poiché tutti i dati potrebbero non essere restituiti in una query.

Soluzione proposta: Utilizzare un flusso leggibile che può essere reindirizzato allo stream di risposta in Express.

Ho iniziato creando un oggetto che eredita dallo stream. Disponibile e implementato un metodo _read() che spinge i risultati della query. Il problema è che la query del database richiamata in _read() è asincrona, ma stream.read() è un metodo di sincronizzazione.

Quando lo stream viene reindirizzato alla risposta del server, la lettura viene richiamata più volte prima che la query db abbia anche la possibilità di essere eseguita. Quindi la query viene invocata più volte e anche quando la prima istanza della query termina e fa un push (null), le altre query vengono completate e viene visualizzato un errore "push() dopo EOF".

  1. C'è un modo per farlo correttamente con _read()?
  2. Devo dimenticare _read() e basta eseguire la query e push() restituisce il costruttore?
  3. Devo eseguire la query ed emettere eventi di dati anziché push()?

Grazie

function DynamoDbResultStream(query, options){ 
    if(!(this instanceof DynamoDbResultStream)){ 
     return new DynamoDbResultStream(query, options); 
    } 

    Readable.call(this, options); 

    this.dbQuery = query; 
    this.done = false; 
} 
util.inherits(DynamoDbResultStream, Readable); 

DynamoDbResultStream.prototype._read = function(){ 
    var self = this; 
    if(!this.done){ 
     dynamoDB.query(this.dbQuery, function(err, data) { 
      if (!err) { 
       try{ 
        for(i=0;i<data.Items.length;i++){ 
         self.push(data.Items[i]); 
        } 
       }catch(err){ 
        console.log(err); 
       } 
       if (data.LastEvaluatedKey) { 
        //Next read() should invoke the query with a new start key 
        self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey; 
       }else{ 
        self.done=true; 
        self.push(null); 
       } 
      }else{ 
       console.log(err); 
       self.emit('error',err); 
      } 
     }); 
    }else{ 
     self.push(null); 
    } 
}; 

EDIT: Dopo aver postato questa domanda, ho trovato questo post con una risposta che mostra come farlo senza usare l'ereditarietà: How to call an asynchronous function inside a node.js readable stream

A il commento è stato fatto lì che all'interno di _read() ci dovrebbe essere solo un push(). E ogni push() di solito genera un'altra chiamata read().

+0

Si può fornire un esempio del codice che si sta scrivendo? – mikefrey

+0

Ho aggiunto il codice che ho finora – swbandit

+0

Probabilmente correlato: http://stackoverflow.com/questions/20058614/stream-from-a-mongodb-cursor-to-express-response-in-node-js – Tomalak

risposta

2

essere a conoscenza delle diverse modalità di Stream: https://nodejs.org/api/stream.html#stream_two_modes

const Readable = require('stream').Readable; 

// starts in paused mode 
const readable = new Readable(); 

let i = 0; 
fetchMyAsyncData() { 
    setTimeout(() => { 
    // still remains in paused mode 
    readable.push(++i); 

    if (i === 5) { 
     return readable.emit('end'); 
    } 

    fetchMyAsyncData(); 
    }, 500);  
} 

// "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods." 
app.get('/mystreamingresponse', (req, res) => { 

    // remains in paused mode 
    readable.on('readable',() => res.write(readable.read())); 

    fetchMyAsyncData(); 

    // closes the response stream once all external data arrived 
    readable.on('end',() => res.end()); 
}) 
+0

Provare a eseguire ciò mi dà che _read non è implementato – atlanteh