2014-12-12 11 views
18

Ho bisogno di unire due ordinari RDDs su una o più colonne. Logicamente questa operazione è equivalente all'operazione di unione del database di due tabelle. Mi chiedo se questo sia possibile solo attraverso Spark SQL o ci sono altri modi per farlo.Unire due ordinari RDD con/senza Spark SQL

Come esempio concreto, consideriamo RDD r1 con chiave primaria ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID) 

e RDD r2 con chiave primaria COMPANY_ID:

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY) 

voglio unirmi r1 e r2.

Come si può fare?

risposta

8

Qualcosa di simile dovrebbe funzionare.

scala> case class Item(id:String, name:String, unit:Int, companyId:String) 

scala> case class Company(companyId:String, name:String, city:String) 

scala> val i1 = Item("1", "first", 2, "c1") 

scala> val i2 = i1.copy(id="2", name="second") 

scala> val i3 = i1.copy(id="3", name="third", companyId="c2") 

scala> val items = sc.parallelize(List(i1,i2,i3)) 
items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20 

scala> val c1 = Company("c1", "company-1", "city-1") 

scala> val c2 = Company("c2", "company-2", "city-2") 

scala> val companies = sc.parallelize(List(c1,c2)) 

scala> val groupedItems = items.groupBy(x => x.companyId) 
groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22 

scala> val groupedComp = companies.groupBy(x => x.companyId) 
groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20 

scala> groupedItems.join(groupedComp).take(10).foreach(println) 

14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s 
(c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1)))) 
(c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2)))) 
23

Soumya Simanta ha dato una buona risposta. Tuttavia, i valori nell'RDD unito sono Iterable, pertanto i risultati potrebbero non essere molto simili alla normale unione delle tabelle.

In alternativa, è possibile:

val mappedItems = items.map(item => (item.companyId, item)) 
val mappedComp = companies.map(comp => (comp.companyId, comp)) 
mappedItems.join(mappedComp).take(10).foreach(println) 

l'output sarà:

(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1))) 
(c1,(Item(2,second,2,c1),Company(c1,company-1,city-1))) 
(c2,(Item(3,third,2,c2),Company(c2,company-2,city-2))) 
+1

Le tue mappe sono le stesse di 'items.keyBy {_. CompanyId}', 'com panies.keyBy {_. CompanyID} '. Dal momento che fanno parte di Spark, c'è una possibilità che sarebbe più efficiente? –

+0

@Paul Questo è il codice sorgente della scintilla per keyBy: 'def keyBy [K] (f: T => K): RDD [(K, T)] = {' 'map (x => (f (x), x)) ' '} ' quindi la soluzione e la soluzione @virya sono identiche – jlopezmat

+1

OK :) Ancora, forse l'intento è leggermente più chiaro con keyBy. Non è un punto importante, anche se –

2

Spark SQL può eseguire join su RDDs Spark.

Sotto codice esegue SQL unirsi sulla Società e Voci RDDs

object SparkSQLJoin { 

case class Item(id:String, name:String, unit:Int, companyId:String) 
case class Company(companyId:String, name:String, city:String) 

def main(args: Array[String]) { 

    val sparkConf = new SparkConf() 
    val sc= new SparkContext(sparkConf) 
    val sqlContext = new SQLContext(sc) 

    import sqlContext.createSchemaRDD 

    val i1 = Item("1", "first", 1, "c1") 
    val i2 = Item("2", "second", 2, "c2") 
    val i3 = Item("3", "third", 3, "c3") 
    val c1 = Company("c1", "company-1", "city-1") 
    val c2 = Company("c2", "company-2", "city-2") 

    val companies = sc.parallelize(List(c1,c2)) 
    companies.registerAsTable("companies") 

    val items = sc.parallelize(List(i1,i2,i3)) 
    items.registerAsTable("items") 

    val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect 

    result.foreach(println) 

    } 
} 

uscita viene visualizzata come

 [c1,company-1,city-1,1,first,1,c1] 
    [c2,company-2,city-2,2,second,2,c2] 
+0

ho diverse colonne, quindi ho bisogno di specificare lo schema a livello di programmazione. Anche gli RDD sono creati da file di testo di grandi dimensioni su HDFS. Credo che l'approccio sopra funziona ancora, giusto? Per favore fatemi sapere se sono necessarie modifiche. –

+0

Sì, questo approccio funziona bene anche per i dati enormi. Per definire lo schema a livello di programmazione, consultare http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema –

8

(Usando Scala) Diciamo che avete due RDDs:

  • emp: (empid, ename, dept)

  • reparto: (dname, dept)

seguito è un altro modo:

//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) 
val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30))) 

val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40))) 

//val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2)) 
val shifted_fields_emp = emp.map(t => (t._2, t._1)) 

val shifted_fields_dept = dept.map(t => (t._2,t._1)) 

shifted_fields_emp.join(shifted_fields_dept) 
// Create emp RDD 
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) 

// Create dept RDD 
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40))) 

// Establishing that the third field is to be considered as the Key for the emp RDD 
val manipulated_emp = emp.keyBy(t => t._3) 

// Establishing that the second field need to be considered as the Key for dept RDD 
val manipulated_dept = dept.keyBy(t => t._2) 

// Inner Join 
val join_data = manipulated_emp.join(manipulated_dept) 
// Left Outer Join 
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept) 
// Right Outer Join 
val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept) 
// Full Outer Join 
val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept) 

// Formatting the Joined Data for better understandable (using map) 
val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1)) 

Questo darà l'uscita come:

// Stampa le cleaned_joined_data uscita sulla console

scala> cleaned_joined_data.collect() 
res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))