2015-06-26 2 views
12

Per chiarire, io non sono alla ricerca di RDD da un array/lista comeCome creare Spark RDD da un iteratore?

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample 
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list); 

Come si crea un RDD scintilla da un iteratore Java senza completamente buffer nella memoria?

Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration 
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question 

domanda supplementare:

E 'un requisito per la sorgente di essere ri-leggibile (o in grado di leggere molte volte) per offrire resilienza per RDD? In altre parole, dal momento che gli iteratori sono fondamentalmente di sola lettura, è persino possibile creare resilient dataset distribuiti (RDD) dagli iteratori?

+0

"senza bufferarlo completamente nella memoria".? Il tuo Iterator <> non è già in memoria? –

+2

I dati verranno comunque caricati in memoria. Ma mi sembra che tu possa usare Spark Streaming per leggere l'input, perché il tuo iteratore di sola lettura può essere considerato come un flusso di dati. – vanekjar

+3

@ KcDoD no. Quello usato in questa domanda è per le illustrazioni. –

risposta

8

Come qualcuno ha detto, si potrebbe fare qualcosa con la scintilla in streaming, ma per quanto riguarda la scintilla pura, non è possibile, e la ragione è che ciò che stai chiedendo va contro il modello di scintilla. Lasciatemi spiegare. Per distribuire e parallelizzare il lavoro, la scintilla deve dividerla in blocchi. Durante la lettura da HDFS, quel "chunking" è fatto per Spark di HDFS, poiché i file HDFS sono organizzati in blocchi. Spark genererà generalmente un'attività per blocco. Ora, gli iteratori forniscono solo accesso sequenziale ai dati, quindi è impossibile per la scintilla organizzarla in blocchi senza leggerli tutti in memoria.

Potrebbe essere possibile creare un RDD con una singola partizione iterabile, ma anche in questo caso, è impossibile dire se l'implementazione di Iterable possa essere inviata ai lavoratori. Quando si utilizza sc.parallelize() spark crea partizioni che implementano serializable in modo che ciascuna partizione possa essere inviata a un altro worker. L'iterabile potrebbe essere su una connessione di rete, o file nella FS locale, quindi non possono essere inviati ai lavoratori a meno che non siano memorizzati in memoria.

+2

Esatto. È una vecchia domanda ma sì l'ho capito cercando di implementare un RDD personalizzato. Quello che hai detto ha perfettamente senso perché le partizioni devono essere serializzabili per ottenere un RDD. La serializzazione di un iteratore non ha senso. Grazie per la conferma. –