2016-04-11 8 views
7

Sto usando Scala e voglio creare la mia funzione DataFrame. Ad esempio, voglio trattare una colonna come una matrice, scorrere ogni elemento e fare un calcolo.Spark Build Custom Column Function, funzione definita dall'utente

Per iniziare, sto cercando di implementare il mio metodo getMax. Così colonna x avrebbe i valori [3,8,2,5,9], e l'uscita prevista del metodo sarebbe 9.

Ecco come appare a Scala

def getMax(inputArray: Array[Int]): Int = { 
    var maxValue = inputArray(0) 
    for (i <- 1 until inputArray.length if inputArray(i) > maxValue) { 
    maxValue = inputArray(i) 
    } 
    maxValue 
} 

Questo è quello che ho finora, e ottengo questo errore

"value length is not a member of org.apache.spark.sql.column", 

e non so in quale altro modo per scorrere attraverso la colonna.

def getMax(col: Column): Column = { 
var maxValue = col(0) 
for (i <- 1 until col.length if col(i) > maxValue){ 
    maxValue = col(i) 
} 
maxValue 

}

Una volta che sono in grado di attuare il mio metodo, creerò una funzione di colonna

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”) 

E poi spero di essere in grado di utilizzare questo in un'istruzione SQL, per esempio

val sample = sqlContext.sql("SELECT value_max(x) FROM table") 

e l'output previsto sarebbe 9, data colonna di input [3,8,2,5,9]

Sto seguendo una risposta da un altro thread Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame dove creano un metodo privato per la deviazione standard. I calcoli che farò saranno più complessi di questo, (ad esempio confronterò ogni elemento nella colonna), sto andando nelle direzioni corrette o dovrei cercare più nelle funzioni definite dall'utente?

+0

Si prega di mostrare il proprio ingresso e di uscita/dataframes attesi. Usa 'spettacolo'. –

+0

Ciao @JacekLaskowski grazie per il commento, ho modificato la domanda per rendere più chiaro ciò che vorrei ottenere. – other15

risposta

13

In uno Spark DataFrame, non è possibile eseguire un'iterazione tra gli elementi di una Colonna utilizzando gli approcci a cui si pensava in quanto una Colonna non è un oggetto iterabile.

Tuttavia, per elaborare i valori di una colonna, avete alcune opzioni e quello di destra dipende dal vostro compito:

1) Uso delle funzioni built-in esistenti

Spark SQL è già molte funzioni utili per l'elaborazione di colonne, incluse funzioni di aggregazione e trasformazione. Molti di questi si trovano nel pacchetto functions (documentation here). Alcuni altri (funzioni binarie in generale) si possono trovare direttamente nell'oggetto Column (documentation here). Quindi, se puoi usarli, di solito è l'opzione migliore. Nota: non dimenticare lo Window Functions.

2) Creazione di un UDF

Se non è possibile completare l'operazione con le funzioni built-in, si può considerare la definizione di un UDF (User Defined Function). Sono utili quando puoi elaborare indipendentemente ogni elemento di una colonna e ti aspetti di produrre una nuova colonna con lo stesso numero di righe di quella originale (non una colonna aggregata). Questo approccio è abbastanza semplice: in primo luogo, si definisce una funzione semplice, quindi la si registra come UDF, quindi la si utilizza.Esempio:

def myFunc: (String => String) = { s => s.toLowerCase } 

import org.apache.spark.sql.functions.udf 
val myUDF = udf(myFun) 

val newDF = df.withColumn("newCol", myUDF(df("oldCol"))) 

Per ulteriori informazioni, here's un bell'articolo.

3) L'utilizzo di un UDAF

Se il vostro compito è quello di creare dati aggregati, è possibile definire un UDAF (User Defined Function Aggregation). Non ho molta esperienza con questo, ma si può puntare ad un bel tutorial:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) Ripristinare l'elaborazione RDD

Se davvero non si può usa le opzioni sopra, o se l'elaborazione dell'attività dipende da diverse righe per l'elaborazione di una e non è un'aggregazione, allora penso che dovresti selezionare la colonna che vuoi ed elaborarla usando l'RDD corrispondente. Esempio:

val singleColumnDF = df("column") 

val myRDD = singleColumnDF.rdd 

// process myRDD 

Quindi, c'è stata la possibilità mi veniva in mente. Spero possa essere d'aiuto.

+0

Grazie Daniel, molto istruttivo. Quindi la differenza principale tra UDF e UDAF è che un UDAF restituisce un valore basato sul calcolo della colonna? Spero che le funzioni integrate siano sufficienti per quello che voglio fare, ma sarebbe bene sapere come implementare le mie funzioni. – other15

+0

@ other15 Un UDAF viene solitamente applicato con 'groupBy', quindi può restituire un valore aggregato per ogni valore distinto nelle colonne passate a' groupBy' (simile a come un semplice 'df.groupBy (" chiave "). avg ("value")) 'funziona). Tuttavia, se non si utilizza groupBy, l'UDAF restituirà un solo valore. –

0

Un esempio semplice è dato nel excellent documentation, in cui un'intera sezione è dedicata a UDF:

import org.apache.spark.sql._ 

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") 
val spark = df.sparkSession 
spark.udf.register("simpleUDF", (v: Int) => v * v) 
df.select($"id", callUDF("simpleUDF", $"value"))