2016-07-14 103 views
5

Ho effettuato la seguente pipeline: Task manager -> SQS -> worker raschietto (la mia app) -> AWS Firehose -> S3 file -> Spark -> (?) Redshift.Come elaborare i file S3 incrementali in Spark

Alcune cose che sto cercando di risolvere/migliorare e sarei felice di orientamento:

  1. Il raschietto potrebbe potenzialmente ottenere i dati duplicati, e lavare di nuovo a firehose, che si tradurrà in dups a scintilla. Devo risolvere questo nella scintilla usando la funzione Distinct PRIMA di iniziare i miei calcoli?
  2. Non sto eliminando i file S3 elaborati, quindi i dati continuano a diventare grandi e grandi. È una buona pratica? (Avendo s3 come database di input) O devo elaborare ogni file ed eliminarlo dopo che la scintilla è finita? Attualmente sto facendo sc.textFile("s3n://...../*/*/*") - che raccoglierà TUTTI i miei file bucket ed eseguirò calcoli.
  3. Per posizionare i risultati in Redshift (o s3) -> come posso farlo in modo incrementale? cioè, se lo s3 diventa sempre più grande, il redshift avrà dati duplicati ... Devo sempre sciacquarlo prima? Come?
+0

puoi avere il tuo bucket per gli elementi da elaborare e una volta che sono stati spinti, li sposti in un altro bucket in modo da conservare una copia se necessario ma non li rielaborerà una seconda volta –

risposta

0

Ho riscontrato questi problemi prima sebbene non sia in una singola pipeline. Ecco cosa ho fatto.

  1. Rimozione duplicazioni

    a. Ho usato BloomFilter per rimuovere le duplicazioni locali. Nota che il documento è relativamente incompleto, ma puoi salvare/caricare/unione/intersecare facilmente gli oggetti filtro bloom. Puoi anche fare reduce sui filtri.

    b. Se si salvano i dati direttamente da Spark a RedShift, è probabile che sia necessario dedicare un po 'di tempo e sforzi per aggiornare BloomFilter per il batch corrente, trasmetterlo, quindi filtrarlo per garantire che non vi siano duplicazioni a livello globale. Prima ho usato un vincolo UNIQUE in RDS e ignorato l'errore, ma sfortunatamente RedShift does not honour the constraint.

  2. e 3. I dati sempre più grande

Ho usato un cluster di EMR per eseguire s3-dist-cp command per spostare & dati di unione (perché ci sono di solito un sacco di file di log di piccole dimensioni, che le prestazioni di Spark impatto). Se si utilizza EMR per ospitare il cluster Spark, è sufficiente aggiungere un passaggio prima dell'analisi per spostare i dati da un segmento all'altro. Il passo prende il command-runner.jar come il barattolo personalizzato, e il comando si presenta come

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess 

Nota che l'originale distcp non supporta i file fusione.

Generalmente, si dovrebbe cercare di evitare di avere dati elaborati e non elaborati insieme nello stesso bucket (o almeno, percorso).