2016-05-04 13 views
15

Ho un dataframe in Spark con molte colonne e un udf che ho definito. Voglio lo stesso dataframe indietro, tranne che con una colonna trasformata. Inoltre, il mio udf accetta una stringa e restituisce un timestamp. C'è un modo semplice per farlo? Ho provatoSpark Scala: Come trasformare una colonna in un DF

val test = myDF.select("my_column").rdd.map(r => getTimestamp(r)) 

ma questo restituisce un RDD e solo con la colonna trasformata.

risposta

27

Se davvero bisogno di usare la funzione, posso suggerire due opzioni:

1) usando la mappa/todf:

import org.apache.spark.sql.Row 
import sqlContext.implicits._ 

def getTimestamp: (String => java.sql.Timestamp) = // your function here 

val test = myDF.select("my_column").rdd.map { 
    case Row(string_val: String) => (string_val, getTimestamp(string_val)) 
}.toDF("my_column", "new_column") 

2) Utilizzo di UDF (UserDefinedFunction):

import org.apache.spark.sql.functions._ 

def getTimestamp: (String => java.sql.Timestamp) = // your function here 

val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column 
val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF 

Ulteriori dettagli sulle UDF di Spark SQL sono disponibili in this nice article by Bill Chambers.


alternativa,

Se si desidera solo trasformare una colonna StringType in una colonna TimestampType è possibile utilizzare il unix_timestampcolumn function disponibile dal Spark SQL 1.5:

val test = myDF 
    .withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp")) 

Nota: per scintilla 1.5.x, è necessario moltiplicare il risultato di unix_timestamp entro il 1000 prima di eseguire il cast in data e ora (iss ue SPARK-11724). Il codice risultante sarebbe:

val test = myDF 
    .withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp")) 

Modifica: Aggiunta l'opzione UDF

+1

Grazie per l'aiuto. L'unico problema che sto avendo è che quando faccio df.withColumn ("start_date", unix_timestamp (df1 ("start_date"), "yyyy-MM-dd HH: mm: ss"). Cast ("timestamp")) , le mie date si stanno convertendo in modo errato. Ad esempio: 2013-08-12 06:40:54 viene convertito in 1970-01-16 22: 18: 09.654. Ti capita di sapere cosa potrebbe succedere? – mt88

+1

Per la scintilla 1.5 devi moltiplicare per 1000 prima del cast –

+0

grazie! ha funzionato – mt88