2016-05-07 20 views
6

Questo dovrebbe essere facile ma .... utilizzando Spark 1.6.1 .... Ho DataFrame # 1 con le colonne A, B, C . con valori:Creazione di un nuovo Spark DataFrame con un nuovo valore di colonna basato sulla colonna nel primo dataframe Java

A B C 
1 2 A 
2 2 A 
3 2 B 
4 2 C 

ho quindi creare un nuovo dataframe con una nuova colonna D così:

DataFrame df2 = df1.withColumn("D", df1.col("C")); 

fin qui tutto bene, ma io in realtà voglio il valore nella colonna D di essere cioè condizionale:

// pseudo code 
if (col C = "A") the col D = "X" 
else if (col C = "B") the col D = "Y" 
else col D = "Z" 

Lascerò quindi la colonna C e rinominerò D in C. Ho provato a esaminare le funzioni Column ma nulla sembra adattarsi alla fattura; Ho pensato di usare df1.rdd(). Map() e scorrere le righe, ma a parte il fatto che non riuscivo davvero a farlo funzionare, pensavo che l'intero punto di DataFrames fosse quello di allontanarsi dall'astrazione RDD?

Purtroppo devo farlo in Java (e ovviamente Spark con Java non è ottimale !!). Sembra che mi manchi l'ovvio e sono felice di mostrarsi un idiota quando viene presentato con la soluzione!

risposta

12

Credo che si possa usare when per raggiungere questo obiettivo. Inoltre, probabilmente puoi sostituire direttamente la vecchia colonna. Per il vostro esempio, il codice sarebbe qualcosa di simile:

import static org.apache.spark.sql.functions.*; 

Column newCol = when(col("C").equalTo("A"), "X") 
    .when(col("C").equalTo("B"), "Y") 
    .otherwise("Z"); 

DataFrame df2 = df1.withColumn("C", newCol); 

Per ulteriori dettagli su when, controllare il Column Javadoc.

+1

Grazie per questo - ero davvero fissando l'ovvio in faccia: s - quello che mi mancava era l'importazione statica delle funzioni SQL cioè: import org.apache statica. spark.sql.functions. * – user1128482

+0

@ user1128482 Mi dispiace, ho dimenticato l'importazione. Buono a sapersi che l'hai scoperto alla fine. –

2

grazie a Daniel Ho risolto questo :)

Il pezzo che mancava era l'importazione statica delle funzioni SQL

import static org.apache.spark.sql.functions.*; 

Devo aver provato un milione di modi diversi di utilizzare quando, ma ha ottenuto la compilazione errori/errori di runtime perché non ho fatto l'importazione. Una volta importata la risposta di Daniel era azzeccata!

1

Si può anche usare udf per fare lo stesso lavoro. Basta scrivere un semplice, se poi il resto struttura

import org.apache.spark.sql.functions.udf 
val customFunct = udf { d => 
     //if then else construct 
    } 

val new_DF= df.withColumn(column_name, customFunct(df("data_column")))