2015-08-05 5 views
8

Vorrei ripetere un RDD di stringhe e "fare qualcosa" su ogni stringa. L'output dovrebbe essere double[][]. Ecco un esempio con un ciclo for. Capisco che ho bisogno di utilizzare (credo) la funzione foreach per gli RDD Java. Tuttavia, non ho idea di come capire la sintassi. La documentazione non è particolarmente utile. Non ho Java 8.Iterate attraverso un RDD Java per riga

Ecco un esempio di cosa mi piacerebbe fare se potessi utilizzare un ciclo regolare for.

public class PCA { 

    public static void main(String[] args) { 
     SparkConf conf = new SparkConf().setAppName("PCA Example"); 
     SparkContext sc = new SparkContext(conf); 

     RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0); 

     // here is the "type" of code I would like to execute 
     // 30 because I have 30 variables 
     double[][] vals = new double[data.count()][30]; 

     double[] temp; 
     for (int i = 0; i < data.count(); i++) { 
      temp = splitStringtoDoubles(data[i]); 
      vals[i] = temp; 
     } 
    } 

    private static double[] splitStringtoDoubles(String s) { 
     String[] splitVals = s.split("\\t"); 
     Double[] vals = new Double[splitVals.length]; 
     for (int i = 0; i < splitVals.length; i++) { 
      vals[i] = Double.parseDouble(splitVals[i]); 
     } 
    } 

} 

Capisco che foreach sembra richiedere una funzione che ha un tipo di ritorno vuoto. Non sono sicuro di come lavorare con quello. Ecco quello che ho cercato finora (ovviamente la sintassi è sbagliata):

double[][] matrix = new double[data.count()][30]; 
    foreach(String s : data) { 
     String[] splitvals = s.split("\\t"); 
     double[] vals = Double.parseDouble(splitvals); 
     matrix[s] = vals; 
    } 
+2

Dal momento che si desidera un tipo di ritorno, e non nullo, si dovrebbe usare 'map' invece di 'foreach' e restituisce una nuova matrice per ogni stringa. – mattinbits

+0

Sareste in grado di fornire un esempio, @mattinbits? Come ho detto, non ho familiarità con Spark Java e la maggior parte degli esempi di mappe che ho visto sono stati in Java 8 con funzioni lambda, che non posso usare. –

+1

Potrei fornirne uno in Scala ma non Java la risposta qui sotto da @Balduz sembra buona anche se – mattinbits

risposta

3

Come mattinbits detto nei commenti, si desidera un map invece di un foreach, dal momento che si vuole restituire valori. Quello che è un map consiste nel trasformare i dati: per ogni riga del tuo RDD esegui un'operazione e restituisci un valore per ogni riga. Quello che vi serve si può ottenere in questo modo:

import org.apache.spark.api.java.function.Function; 

... 

SparkConf conf = new SparkConf().setAppName("PCA Example"); 
SparkContext sc = new SparkContext(conf); 

JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD(); 
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() { 
    @Override 
    public double[] call(String row) throws Exception { 
     return splitStringtoDoubles(row); 
    } 

    private double[] splitStringtoDoubles(String s) { 
     String[] splitVals = s.split("\\t"); 
     Double[] vals = new Double[splitVals.length]; 
     for(int i=0; i < splitVals.length; i++) { 
      vals[i] = Double.parseDouble(splitVals[i]); 
     } 
     return vals; 
    } 
}); 

List<double[]> whatYouWant = whatYouWantRdd.collect(); 

in modo da sapere come funziona Spark, si esegue azioni o trasformazioni sul RDD. Ad esempio, qui stiamo trasformando il nostro RDD usando una funzione map. È necessario creare questa funzione da soli, questa volta con un anonimo org.apache.spark.api.java.function.Function che obbliga a ignorare il metodo call, in cui si riceve una riga del proprio RDD e si restituisce un valore.

+0

L'azione di raccolta non è un'azione consigliata poiché trasferisce tutti i dati al programma del driver che è molto costoso. – BDR

3

Solo perché è interessante confrontare la verboseness del Java vs Scala API per Spark, ecco una versione Scala:

import org.apache.spark.{SparkContext, SparkConf} 

class example extends App { 
    val conf = new SparkConf().setMaster("local").setAppName("Spark example") 
    val sc = new SparkContext(conf) 

    val inputData = List(
    "1.2\t2.7\t3.8", 
    "4.3\t5.1\t6.3" 
) 

    val inputRDD = sc.parallelize(inputData) 
    val arrayOfDoubleRDD = inputRDD.map(_.split("\t").map(_.toDouble)) 
} 
+0

Non posso essere più d'accordo con te sul fatto che Scala sia migliore. Ho lavorato su Python per Spark ed è incredibile quanto sia facile usare funzioni come 'map'. Ahimè, devo usare Java per questo caso. –

+1

Buona fortuna quindi :) – mattinbits

+0

Anch'io preferisco Scala per Spark, in quanto Java è ** molto ** più prolisso ... Sfortunatamente le mie abilità di Scala sono inesistenti :( – Balduz