2015-08-28 8 views
6

Ho una pipeline di streaming collegata a pub/sub che pubblica nomi di file GCS. Da lì voglio leggere ogni file e analizzare gli eventi su ciascuna riga (gli eventi sono ciò che in definitiva voglio elaborare).Leggi i file da una raccolta di nomi di file GCS in Pipeline?

Posso utilizzare TextIO? Potete usarlo in una pipeline di streaming quando il nome del file viene definito durante l'esecuzione (al contrario di usare TextIO come origine e il nome/i file sono noti al momento della costruzione). Se non sto pensando di fare qualcosa di simile al seguente:

Prendi il tema da pub/sub Pardo per leggere ogni file e ottenere le linee di processo le righe del file ...

Potrei utilizzare FileBasedReader o qualcosa di simile in questo caso per leggere i file? I file non sono troppo grandi quindi non avrei bisogno di parallelizzare la lettura di un singolo file, ma avrei bisogno di leggere molti file.

+0

Siamo vicini ad avere un supporto API sufficiente per creare un'implementazione efficiente di questo. Si prega di seguire https://issues.apache.org/jira/browse/BEAM-2511 TextIO dovrebbe supportare la lettura di una raccolta di nomi di file. – jkff

+0

Ho modificato la mia risposta per riflettere la nuova API. – jkff

risposta

3

È possibile utilizzare la trasformazione TextIO.readAll(), che è stata recentemente aggiunta a Beam in #3443. Ad esempio:

PCollection<String> filenames = p.apply(PubsubIO.readStrings()...); 
PCollection<String> lines = filenames.apply(TextIO.readAll()); 

Questo leggerà tutte le righe in ogni file in arrivo su pubsub.