38

Ho un DF con un enorme metadata parseable come una singola colonna di stringa in un Dataframe, consente di chiamarlo DFA, con ColmnA.Ricava più colonne da una singola colonna in Spark DataFrame

Vorrei rompere questa colonna, ColmnA in più colonne attraverso una funzione, ClassXYZ = Func1 (ColmnA). Questa funzione restituisce una classe ClassXYZ, con più variabili, e ognuna di queste variabili deve ora essere mappata alla nuova colonna, come ColmnA1, ColmnA2 ecc.

Come farei una tale trasformazione da 1 Dataframe a un'altra con questi colonne aggiuntive chiamando questo Func1 una sola volta e non doverle ripetere per creare tutte le colonne.

È facile da risolvere se dovessi chiamare questa enorme funzione ogni volta per aggiungere una nuova colonna, ma ciò che desidero evitare.

Si prega gentilmente di avvisare con un codice funzionante o pseudo.

Grazie

Sanjay

risposta

5

Se le colonne risultanti saranno della stessa lunghezza di quella originale, è possibile creare nuove colonne con withColumn funzione ed applicando un UDF. Dopo questo si può cadere la colonna originale, ad esempio:

val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn"))) 
.withColumn("newCol2", myFun2(myDf("originalColumn")) 
.drop(myDf("originalColumn")) 

dove myFun è un UDF definita in questo modo:

def myFun= udf(
    (originalColumnContent : String) => { 
     // do something with your original column content and return a new one 
    } 
) 
+0

Ciao Niemand, apprezzo la tua risposta ... ma non risolve il problema ... in te codice, si sta chiamando la funzione " myDF "più volte, mentre vorrei che quella funzione venisse chiamata una volta, generare una classe con più campi e ogni variabile di campo essere restituita come una nuova colonna – sshroff

+0

Beh, ho paura di aver presentato l'unico modo possibile per sapere, Non penso che esista un altro modo, ma spero di sbagliarmi;). Inoltre, non è che non chiami myFun più volte: puoi chiamare altre funzioni come myFun2, myFun3 ecc. Per creare colonne di cui hai bisogno. – Niemand

56

In generale ciò che si vuole non è direttamente possibile. UDF può restituire solo una singola colonna alla volta. Esistono due modi diversi per superare questa limitazione:

  1. Restituire una colonna di tipo complesso. La soluzione più generale è un StructType ma puoi considerare anche ArrayType o MapType.

    import org.apache.spark.sql.functions.udf 
    
    val df = Seq(
        (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") 
    ).toDF("x", "y", "z") 
    
    case class Foobar(foo: Double, bar: Double) 
    
    val foobarUdf = udf((x: Long, y: Double, z: String) => 
        Foobar(x * y, z.head.toInt * y)) 
    
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) 
    df1.show 
    // +---+----+---+------------+ 
    // | x| y| z|  foobar| 
    // +---+----+---+------------+ 
    // | 1| 3.0| a| [3.0,291.0]| 
    // | 2|-1.0| b|[-2.0,-98.0]| 
    // | 3| 0.0| c| [0.0,0.0]| 
    // +---+----+---+------------+ 
    
    df1.printSchema 
    // root 
    // |-- x: long (nullable = false) 
    // |-- y: double (nullable = false) 
    // |-- z: string (nullable = true) 
    // |-- foobar: struct (nullable = true) 
    // | |-- foo: double (nullable = false) 
    // | |-- bar: double (nullable = false) 
    

    Questo può essere facilmente appiattito in seguito, ma di solito non ce n'è bisogno.

  2. Passa a RDD, rimodellare e ricostruire DF:

    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.Row 
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
        Seq(x * y, z.head.toInt * y) 
    
    val schema = StructType(df.schema.fields ++ 
        Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) 
    
    val rows = df.rdd.map(r => Row.fromSeq(
        r.toSeq ++ 
        foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z")))) 
    
    val df2 = sqlContext.createDataFrame(rows, schema) 
    
    df2.show 
    // +---+----+---+----+-----+ 
    // | x| y| z| foo| bar| 
    // +---+----+---+----+-----+ 
    // | 1| 3.0| a| 3.0|291.0| 
    // | 2|-1.0| b|-2.0|-98.0| 
    // | 3| 0.0| c| 0.0| 0.0| 
    // +---+----+---+----+-----+ 
    
+2

Quando dici "di solito non c'è per [appiattire una colonna]", perché è così? Oppure accendi la maggior parte delle cose che fai con le colonne di livello superiore anche con dati gerarchici (come 'df1.foobar.foo')? – max

+2

@max Poiché 'structs' semplici possono essere usati in praticamente tutti i contesti quando si usa normalmente una struttura piatta (con semplice sintassi del punto' fooobar.foo'). Tuttavia non si applica ai tipi di raccolta. È inoltre possibile controllare http://stackoverflow.com/a/33850490/1560062 – zero323

2

ho optato per creare una funzione per appiattire una colonna e poi basta chiamare simultaneamente con l'UDF.

Prima definire questo:

implicit class DfOperations(df: DataFrame) { 

    def flattenColumn(col: String) = { 
    def addColumns(df: DataFrame, cols: Array[String]): DataFrame = { 
     if (cols.isEmpty) df 
     else addColumns(
     df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)), 
     cols.tail 
    ) 
    } 

    val field = df.select(col).schema.fields(0) 
    val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name) 

    addColumns(df, newCols).drop(col) 
    } 

    def withColumnMany(colName: String, col: Column) = { 
    df.withColumn(colName, col).flattenColumn(colName) 
    } 

} 

Poi utilizzo è molto semplice:

case class MyClass(a: Int, b: Int) 

val df = sc.parallelize(Seq(
    (0), 
    (1) 
)).toDF("x") 

val f = udf((x: Int) => MyClass(x*2,x*3)) 

df.withColumnMany("test", f($"x")).show() 

// +---+------+------+ 
// | x|test_a|test_b| 
// +---+------+------+ 
// | 0|  0|  0| 
// | 1|  2|  3| 
// +---+------+------+ 
+0

Non è necessario eseguire l'intera operazioneColumnMany. Basta usare select ("select. *") Per appiattirlo. –

13

Si supponga che, dopo la funzione ci sarà una sequenza di elementi, dando un esempio, come di seguito:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age") 
df.show 
+------------------+---+ 
|   infoComb|age| 
+------------------+---+ 
|Mike,1986,Toronto| 30| 
| Andre,1980,Ottawa| 36| 
| jill,1989,London| 27| 
+------------------+---+ 

ora quello che puoi fare con questo infoComb è che puoi iniziare a dividere la stringa e ottenere di più umns con:

df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show 
+-----+----------+-------+---+ 
| name|yearOfBorn| city|age| 
+-----+----------+-------+---+ 
|Mike|  1986|Toronto| 30| 
|Andre|  1980| Ottawa| 36| 
| jill|  1989| London| 27| 
+-----+----------+-------+---+ 

Spero che questo aiuti.

-2

Ciò può essere ottenuto facilmente utilizzando funzione pivot

df4.groupBy("year").pivot("course").sum("earnings").collect() 
+0

Pang, Grazie per la formattazione –

+0

Non vedo "anno", "corso" o "guadagni" in nessuna delle risposte o op .. di quale frame di dati stai parlando in questa risposta molto concisa (non) ? – Kai