2015-02-05 13 views
8

Sono molto nuovo ad Apache Spark. Mi piacerebbe davvero concentrarmi sulle specifiche delle API Spark di base e voglio capire e scrivere alcuni programmi usando Spark API. Ho scritto un programma java usando Apache Spark per implementare il concetto di Joins.Apache Spark unisce esempio con Java

Quando uso Left Outer Join - leftOuterJoin() o Right Outer Join - rightOuterJoin(), entrambi i metodi restituiscono un JavaPairRDD che contiene un tipo speciale di Opzioni Google. Ma non so come estrarre i valori originali da Tipo facoltativo.

In ogni caso mi piacerebbe sapere posso utilizzare gli stessi metodi di join che restituiscono i dati nel mio formato. Non ho trovato alcun modo per farlo. Il significato è quando sto usando Apache Spark, non sono in grado di personalizzare il codice nel mio stile poiché hanno già dato tutte le cose predefinite.

Si prega di trovare il codice qui sotto

my 2 sample input datasets 

customers_data.txt: 
4000001,Kristina,Chung,55,Pilot 
4000002,Paige,Chen,74,Teacher 
4000003,Sherri,Melton,34,Firefighter 

and 

trasaction_data.txt 
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit 
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit 
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash 
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit 
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit 

Ecco il mio codice Java

**SparkJoins.java:** 

public class SparkJoins { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) throws FileNotFoundException { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local")); 
     JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt"); 
     JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] customerSplit = s.split(","); 
       return new Tuple2<String, String>(customerSplit[0], customerSplit[1]); 
      } 
     }).distinct(); 

     JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt"); 
     JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] transactionSplit = s.split(","); 
       return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]); 
      } 
     }); 

     //Default Join operation (Inner join) 
     JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs); 
     System.out.println("Joins function Output: "+joinsOutput.collect()); 

     //Left Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect()); 

     //Right Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect()); 

     sc.close(); 
    } 
} 

E qui l'output che sto ottenendo

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))] 

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])] 

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])] 

Sto facendo funzionare questo programma su Piattaforma Windows

Si prega di osservare l'output sopra e mi aiuta a estrarre i valori dal tipo opzionale

Grazie in anticipo

+0

Perché non usare Scala, invece? – maasg

+0

Ciao @maasg, sono fondamentalmente uno sviluppatore java .. Non conosco davvero Scala .. Ma penso che Apache Spark sia più adatto per la programmazione Scala e poi Java. –

+0

@ShekarPatel puoi aggiornare il tuo codice con come hai rimosso quell'opzione .. che sarà utile per gli altri. – Shankar

risposta

8

Quando si esegue join esterno sinistro e destro outer join, si potrebbe avere valori nulli. destra!

Quindi la scintilla restituisce Oggetto opzionale. dopo aver ottenuto quel risultato, puoi mappare quel risultato nel tuo formato.

è possibile utilizzare il metodo isPresent() di Opzionale per mappare i dati.

Ecco l'esempio:

JavaPairRDD<String,String> firstRDD = .... 
JavaPairRDD<String,String> secondRDD =.... 
// join both rdd using left outerjoin 
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD); 


// mapping of join result 
JavaPairRDD<String, String> mappedRDD = rddWithJoin 
      .mapToPair(tuple -> { 
       if (tuple._2()._2().isPresent()) { 
        //do your operation and return 
        return new Tuple2<String, String>(tuple._1(), tuple._2()._1()); 
       } else { 
        return new Tuple2<String, String>(tuple._1(), "not present"); 
       } 
      }); 
+0

Grazie amico .. Funziona bene .. –

+0

@ sms_1190 come mappare quel risultato nel nostro formato? Anch'io sto affrontando lo stesso problema. – Shankar

+0

@ Shankar: ho aggiunto l'esempio nella risposta di cui sopra. mappedRDD è il tuo formato. –