Voglio analizzare le colonne della data in un DataFrame
e, per ciascuna colonna della data, la risoluzione per la data potrebbe cambiare (ad esempio 2011/01/10 => 2011/01 se la risoluzione è impostata su "Mese").Come posso passare parametri extra alle UDF in SparkSql?
ho scritto il seguente codice:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
Tuttavia non funziona. Sembra che io possa solo passare Column
s alle UDF. E mi chiedo se sarà molto lento se converto il DataFrame
in RDD
e applichiamo la funzione su ogni riga.
Qualcuno conosce la soluzione corretta? Grazie!
Grazie per la risposta e l'intuizione di accattivarsi! – DarkZero
Ho scritto un tutorial su come utilizzare il currying per creare Spark UDF che accetta parametri aggiuntivi al momento dell'invocazione. https://gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 –