2014-09-25 6 views
12

Sono nuovo ad Apache Spark e so che la struttura dei dati di base è RDD. Ora sto scrivendo alcune app che richiedono informazioni posizionali sugli elementi. Ad esempio, dopo aver convertito un ArrayList in un RDD (Java), per ogni intero in RDD, ho bisogno di conoscere il suo indice (globale). È possibile farlo?Come posso ottenere una posizione dell'elemento nell'RDD di Spark?

Come noto, c'è una funzione take (int) per RDD, quindi credo che le informazioni sulla posizione siano ancora mantenute in RDD.

risposta

11

In sostanza, il metodo zipWithIndex() di RDD sembra farlo, ma non preserverà l'ordine originale dei dati da cui è stato creato il RDD. Almeno avrai un ordine stabile.

val orig: RDD[String] = ... 
val indexed: RDD[(String, Long)] = orig.zipWithIndex() 

Il motivo è improbabile di trovare qualcosa che preserva l'ordine nei dati originali è sepolto nella documentazione delle API per zipWithIndex():

"Zip questa RDD con i suoi indici di elementi. L'ordinamento si basa per la prima volta su sull'indice delle partizioni e quindi sull'ordinamento degli elementi all'interno di ciascuna partizione . Quindi il primo elemento nella prima partizione ottiene l'indice 0 e l'ultimo elemento nell'ultima partizione riceve l'indice più grande. Questo simile a zipWithIndex di Scala ma utilizza Long invece di Int come il tipo di indice. Questo metodo ha bisogno di innescare un lavoro scintilla quando questo RDD contiene più di un partizioni."

Quindi sembra che l'ordine originale viene scartato. Se preservare l'ordine originale è importante per voi, sembra che tu abbia bisogno per aggiungere l'indice prima di creare la RDD.

+0

Sì, aggiungere l'indice di matrice come attributo aggiuntivo prima di creare RDD può risolvere questo problema. Tuttavia, ci sono due limitazioni serie: 1) Ovviamente, questo attributo indice aggiuntivo raddoppia almeno il costo di archiviazione, e tale costo può essere anche più, ad esempio, in un array intero/mobile, per l'indice viene aggiunto un campo int lungo. 2) Poiché non è possibile caricare ulteriori valori di indice in Spark, tale conversione dei dati non può essere parallelizzata da Spark. Quindi, devo coinvolgere altre tecniche parallele per aggiungere l'indice. – SciPioneer

14

credo nella maggior parte dei casi, zipWithIndex() farà il trucco, e sarà preservare l'ordine. Continua a leggere di nuovo i commenti. la mia comprensione è che significa esattamente mantenere l'ordine nel RDD

scala> val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3) 
scala> val r2 = r1.zipWithIndex 
scala> r2.foreach(println) 
(c,2) 
(d,3) 
(e,4) 
(f,5) 
(g,6) 
(a,0) 
(b,1) 

Sopra l'esempio confermarlo. Il rosso ha 3 partizioni e una con indice 0, b con indice 1, ecc.

+0

Grazie per la tua risposta! Nella maggior parte dei casi questo metodo non è male, poiché l'elemento nella matrice/lista di input può essere un oggetto relativamente grande. Tuttavia, potrebbe essere un problema per gli array di tipo primitivo, ad esempio un array intero, poiché questa soluzione apparentemente unica è piuttosto inefficiente, in termini sia di costi di calcolo che di archiviazione. Ad ogni modo, sono molto soddisfatto della tua risposta. Spero che un giorno naturalmente mantenere l'indice senza (zipWithIndex) possa diventare vero per RDD di Spark. – SciPioneer

+0

Basato sul design di Spark, non riesco a immaginare un buon modo per mantenere l'indice dell'elemento senza sacrificare lo spazio di archiviazione. –