2016-06-03 47 views
10

Ho un dataframe Spark con diverse colonne. Voglio aggiungere una colonna al dataframe che è la somma di un certo numero di colonne.Aggiunta di una colonna di rowsum attraverso un elenco di colonne in Spark Dataframe

Ad esempio, il mio dati assomiglia a questo:

ID var1 var2 var3 var4 var5 
a 5  7 9 12 13 
b 6  4 3 20 17 
c 4  9 4 6 9 
d 1  2 6 8 1 

Voglio una colonna aggiunto sommando le righe per colonne specifiche:

ID var1 var2 var3 var4 var5 sums 
a 5  7 9 12 13 46 
b 6  4 3 20 17 50 
c 4  9 4 6 9  32 
d 1  2 6 8 10 27 

So che è possibile aggiungere colonne insieme se conoscere le colonne specifiche da aggiungere:

val newdf = df.withColumn("sumofcolumns", df("var1") + df("var2")) 

Ma è possibile passare un elenco di colori mn nomi e aggiungerli insieme? In base al largo di questa risposta che è fondamentalmente quello che voglio, ma utilizza l'API di Python al posto di Scala (Add column sum as new column in PySpark dataframe) penso che qualcosa di simile potrebbe funzionare:

//Select columns to sum 
val columnstosum = ("var1", "var2","var3","var4","var5") 

// Create new column called sumofcolumns which is sum of all columns listed in columnstosum 
val newdf = df.withColumn("sumofcolumns", df.select(columstosum.head, columnstosum.tail: _*).sum) 

Questo getta la somma valore di errore non è membro di org.apache.spark.sql.DataFrame. C'è un modo per sommare le colonne?

Grazie in anticipo per il vostro aiuto.

risposta

15

Si dovrebbe provare la seguente:

import org.apache.spark.sql.functions._ 

val sc: SparkContext = ... 
val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._ 

val input = sc.parallelize(Seq(
    ("a", 5, 7, 9, 12, 13), 
    ("b", 6, 4, 3, 20, 17), 
    ("c", 4, 9, 4, 6 , 9), 
    ("d", 1, 2, 6, 8 , 1) 
)).toDF("ID", "var1", "var2", "var3", "var4", "var5") 

val columnsToSum = List(col("var1"), col("var2"), col("var3"), col("var4"), col("var5")) 

val output = input.withColumn("sums", columnsToSum.reduce(_ + _))) 

output.show() 

allora il risultato è:

+---+----+----+----+----+----+----+ 
| ID|var1|var2|var3|var4|var5|sums| 
+---+----+----+----+----+----+----+ 
| a| 5| 7| 9| 12| 13| 46| 
| b| 6| 4| 3| 20| 17| 50| 
| c| 4| 9| 4| 6| 9| 32| 
| d| 1| 2| 6| 8| 1| 18| 
+---+----+----+----+----+----+----+ 
7

Chiaro e semplice:

import org.apache.spark.sql.Column 
import org.apache.spark.sql.functions.{lit, col} 

def sum_(cols: Column*) = cols.foldLeft(lit(0))(_ + _) 

val columnstosum = Seq("var1", "var2", "var3", "var4", "var5").map(col _) 
df.select(sum_(columnstosum: _*)) 

con Python equivalente:

from functools import reduce 
from operator import add 
from pyspark.sql.functions import lit, col 

def sum_(*cols): 
    return reduce(add, cols, lit(0)) 

columnstosum = [col(x) for x in ["var1", "var2", "var3", "var4", "var5"]] 
select("*", sum_(*columnstosum)) 

Entrambi verranno impostati su NA se nella riga è presente un valore mancante. È possibile utilizzare la funzione DataFrameNaFunctions.fill o coalesce per evitarlo.

0

Ecco una soluzione elegante con python:

NewDF = OldDF.withColumn('sums', sum(OldDF[col] for col in OldDF.columns[1:])) 

Speriamo che questo influenzerà qualcosa di simile in Spark ... qualcuno ?.

0

Suppongo che tu abbia un datafame df. Quindi puoi sommare tutti i cols eccetto il tuo ID col. Questo è utile quando si hanno molte colonne e non si vuole menzionare manualmente i nomi di tutte le colonne come quelle sopra menzionate. This post ha la stessa risposta.

val sumAll = df.columns.collect{ case x if x != "ID" => col(x) }.reduce(_ + _) 
df.withColumn("sum", sumAll)