5

Ho due DataFramea e b. a è comeCome comprimere due (o più) DataFrame in Spark

Column 1 | Column 2 
abc  | 123 
cde  | 23 

b è come

Column 1 
1  
2  

voglio zip a e b (o anche più) DataFrames che diventa qualcosa di simile a:

Column 1 | Column 2 | Column 3 
abc  | 123  | 1 
cde  | 23  | 2 

Come posso fare vero?

+0

E 'lecito ritenere che i due hanno lo stesso dataframes # di righe? –

risposta

16

Un'operazione come questa non è supportata da un'API DataFrame. È possibile aggiungere due oggetti RDD a zip ma per farlo funzionare è necessario associare sia il numero di partizioni sia il numero di elementi per partizione. Supponendo che questo è il caso:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, StructType, LongType} 

val a: DataFrame = sc.parallelize(Seq(
    ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") 
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") 

// Merge rows 
val rows = a.rdd.zip(b.rdd).map{ 
    case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} 

// Merge schemas 
val schema = StructType(a.schema.fields ++ b.schema.fields) 

// Create new data frame 
val ab: DataFrame = sqlContext.createDataFrame(rows, schema) 

Se le condizioni di cui sopra non sono soddisfatte l'unica opzione che viene in mente è l'aggiunta di un indice e unisciti:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
    // Add index 
    df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, 
    // Create schema 
    StructType(df.schema.fields :+ StructField("_index", LongType, false)) 
) 

// Add indices 
val aWithIndex = addIndex(a) 
val bWithIndex = addIndex(b) 

// Join and clean 
val ab = aWithIndex 
    .join(bWithIndex, Seq("_index")) 
    .drop("_index") 
+0

Che ne dici di 'withColumn' su' DataFrame'? – Reactormonk

+0

@Reactormonk Come lo useresti qui? – zero323

+0

Utilizzare .column per ottenere la colonna da df b e quindi con Colonna per aggiungerla a un? Non l'ho provato e posso ben immaginare che Spark non supporti questo. –

1

In attuazione della Scala di Dataframes, non v'è semplice modo di concatenare due dataframe in uno solo. Possiamo semplicemente aggirare questa limitazione aggiungendo indici a ciascuna riga dei dataframes. Quindi, possiamo fare un join interno da questi indici. Questo è il mio codice stub di questa implementazione:

val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") 
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) 

val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") 
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) 

aWithId.join(bWithId, "id") 

A little light reading - Check out how Python does this!