2015-02-01 11 views
8

Attualmente sto cercando di utilizzare Spark Streaming per inserire le voci di logfile e per eseguire alcuni calcoli su di esse per ragioni statistiche.Streaming di Spark con una tabella di ricerca dinamica

Ci sono set di dati memorizzati su HDFS, accessibili da HBase e Hive in questo momento, necessari per cercare alcuni dati e trasformarli, come i mapping tra IP e nomi di macchine e proprietari di macchine.

L'applicazione di scintilla dovrebbe essere eseguita sul nostro cluster giorno dopo giorno, per settimane senza un riavvio. Tuttavia, queste tabelle di riferimento vengono aggiornate ogni poche ore.

Va bene se i dati utilizzati sono leggermente vecchi, ma non è corretto per i dati di due settimane. Pertanto, voglio sapere come posso visualizzare i dati per le trasformazioni e gli arricchimenti nella mia mappa e ridurre le fasi. Ho avuto un paio di idee.

  1. Le variabili di trasmissione possono leggere nel set di dati e trasmetterle in modo efficiente. Tuttavia, una volta impostata una variabile di trasmissione, non può essere modificata e recuperare nuovamente i dati nella classe di driver, annullando e trasmettendo il nuovo non funzionerà, perché i puntatori dei lavoratori puntano tutti al vecchio set di dati. Non so se c'è un modo per aggirare questo.

  2. Le query get() HBase possono essere eseguite. Se i dati vengono indirizzati a riduttori basati sulla chiave della ricerca, ciascun riduttore può contenere una cache di un sottoinsieme dell'insieme di dati complessivo e può contenere la propria cache locale. HBase dovrebbe avere una latenza minima nel recupero di singoli record.

  3. Qualcos'altro?

risposta

3

Hai due opzioni qui.

Per prima cosa utilizzare la trasformazione foreachRDD sopra il proprio DStream. foreachRDD viene eseguito dal lato del driver, il che significa che è possibile creare qualsiasi nuovo RDD lì. È possibile memorizzare il contatore del tempo e rileggere il file da HDFS ogni 10-15 minuti

Secondo è leggere alcuni file nella trasformazione transform sul DStream e salvare i risultati in memoria. Con questo approccio devi leggere l'intera tabella di ricerca di ciascuno degli esecutori, che non è efficiente

Ti consiglio di utilizzare il primo approccio. Per essere ancora più precisi, è possibile memorizzare da qualche parte il flag quando i dati sono stati aggiornati l'ultima volta e memorizzare lo stesso nell'applicazione Spark. Su ogni iterazione controlli il valore di questo flag (ad esempio, memorizzato in HBase o Zookeeper) e confrontalo con quello memorizzato localmente - se è diverso, rileggi la tabella di ricerca, in caso contrario - esegui l'operazione con quello vecchio

+0

Ho una domanda correlata. La mia tabella di ricerca è di circa 2 milioni di righe ed è statica. La chiave è una stringa di circa 100 caratteri e il valore una stringa di circa 10 caratteri. In questo momento ho questi dati memorizzati in una collezione indicizzata mongo db e faccio ricerche durante una fase di trasformazione. Effettuo la chiamata in batch, quindi è necessario un solo hit per trasformazione, ma è comunque una chiamata di rete. Ha senso fare una tabella di ricerca di questo grande var di Spark? –

+1

2 milioni di record 110 byte ciascuno è solo 220 MB di dati - non tanto per la variabile di trasmissione. Avere 1 executor per nodo HW dovrebbe garantire la quantità minima richiesta di copie di questo 220 MB memorizzato nel cluster. Se è statico, puoi caricarlo in memoria all'inizio dell'elaborazione e poi usarlo. Non ti consiglio di utilizzare elementi centralizzati come MongoDB, poiché con il crescere del cluster sarà il collo di bottiglia.Se i dati sono completamente statici, potresti considerare di archiviare i dati in un file su ciascun nodo o in un archivio locale su ciascun nodo (ad esempio redis) – 0x0FFF