2016-02-29 10 views
20

Questa domanda non è nuova, tuttavia trovo un comportamento sorprendente in Spark. Devo aggiungere una colonna di ID di riga a un DataFrame. Ho usato il metodo DataFrame monotonically_increasing_id() e mi fornisce un ulteriore col di ID di riga univoci (che NON sono consecutivi a proposito, ma sono unici).Come si aggiunge una colonna persistente di ID di riga a Spark DataFrame?

Il problema che sto avendo è che quando filtro il DataFrame gli ID di riga nel DataFrame risultante vengono riassegnati. I due DataFrames sono mostrati di seguito.

  • il primo è il dataframe iniziale con ID di riga aggiunti come segue:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • secondo dataframe è quello ottenuto dopo il filtraggio sul colle P tramite df.filter(col("P")).

Il problema è illustrato dal rowId per IDCLIENTE 169, che era 5 nella dataframe iniziale, ma dopo il filtraggio che rowId (5) è stato riassegnato custmId 773 quando CustID 169 è stata filtrata su! Non so perché questo è il comportamento predefinito.

Vorrei che lo rowIds fosse "appiccicoso"; se rimuovo le righe da DataFrame non voglio che i loro ID "riutilizzati", voglio che siano andati troppo avanti con le loro righe. è possibile farlo? Non vedo alcun flag per richiedere questo comportamento dal metodo monotonically_increasing_id.

+---------+--------------------+-------+ 
| custId | features| P |rowId| 
+---------+--------------------+-------+ 
|806  |[50,5074,...| true| 0| 
|832  |[45,120,1...| true| 1| 
|216  |[6691,272...| true| 2| 
|926  |[120,1788...| true| 3| 
|875  |[54,120,1...| true| 4| 
|169  |[19406,21...| false| 5| 

after filtering on P: 
+---------+--------------------+-------+ 
| custId| features| P |rowId| 
+---------+--------------------+-------+ 
|  806|[50,5074,...| true| 0| 
|  832|[45,120,1...| true| 1| 
|  216|[6691,272...| true| 2| 
|  926|[120,1788...| true| 3| 
|  875|[54,120,1...| true| 4| 
|  773|[3136,317...| true| 5| 
+1

Potrebbe condividere il codice completo per la generazione dei due esempi di DataFrames? Per quello che vale, questo è probabilmente dovuto all'ottimizzazione della query SQL che ha luogo in cui le fasi di mappa "indipendenti" possono essere riorganizzate. –

+0

Hamel, non ci sono altre trasformazioni o azioni di quelle che ho pubblicato. I frame di dati mostrati sono il risultato di df.show(). È possibile ricreare facilmente questo comportamento, creare una cornice dati e aggiungere una colonna ID riga come sopra, quindi aggiungere una colonna booleana casualmente ad essa. Quindi filtra su quella colonna e vedi come gli ID di riga che ottieni da monotonicamente in aumento vengono "riutilizzati" come descrivo. – Kai

+0

@Kai Vorrei davvero aggiungere che il modo più semplice per riprodurlo è quello di utilizzare solo una singola partizione. – zero323

risposta

11

Spark 2,0

  • questo è problema è stato risolto in Spark 2.0 con SPARK-14241.

  • Un altro problema simile è stato risolto in Spark 2.1 con SPARK-14393

Spark 1.x

problema che l'esperienza è piuttosto sottile, ma può essere ridotto ad un semplice fatto monotonically_increasing_id è un funzione estremamente brutta. Non è chiaramente puro e il suo valore dipende da qualcosa che è completamente fuori controllo.

Non richiede alcun parametro, quindi da una prospettiva di ottimizzazione non importa quando viene chiamato e può essere premuto dopo tutte le altre operazioni. Da qui il comportamento che vedi.

Se osservate il codice, scoprirete che questo è esplicitamente contrassegnato dall'estensione dell'espressione MonotonicallyIncreasingID con Nondeterministic.

Non credo che esista una soluzione elegante, ma un modo per gestirlo è aggiungere una dipendenza artificiale dal valore filtrato.Per esempio con un UDF simili:

from pyspark.sql.types import LongType 
from pyspark.sql.functions import udf 

bound = udf(lambda _, v: v, LongType()) 

(df 
    .withColumn("rn", monotonically_increasing_id()) 
    # Due to nondeterministic behavior it has to be a separate step 
    .withColumn("rn", bound("P", "rn")) 
    .where("P")) 

In generale potrebbe essere più pulito aggiungere indici utilizzando zipWithIndex su un RDD e poi riconvertirlo in un DataFrame.


* soluzione mostrata in precedenza non è più una valida soluzione (non obbligatorio) in 2.x Spark dove pitone UDF sono oggetto delle ottimizzazioni piano di esecuzione.

3

Non ho potuto riprodurre questo. Sto usando Spark 2.0 anche se forse il comportamento è cambiato, o non sto facendo la tua stessa cosa.

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true)) 
.toDF("name", "value","flag") 
.withColumn("rowd", monotonically_increasing_id()) 

df.show 

val df2 = df.filter(col("flag")=== true) 

df2.show 

df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields] 
+-----+-----+-----+----+ 
| name|value| flag|rowd| 
+-----+-----+-----+----+ 
| one| 1| true| 0| 
| two| 2|false| 1| 
|three| 3| true| 2| 
| four| 4| true| 3| 
+-----+-----+-----+----+ 
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields] 
+-----+-----+----+----+ 
| name|value|flag|rowd| 
+-----+-----+----+----+ 
| one| 1|true| 0| 
|three| 3|true| 2| 
| four| 4|true| 3| 
+-----+-----+----+----+ 
+0

Non ho riscontrato alcun problema con il codice precedente – thebluephantom

+0

qual è l'equivalente di ** monotonically_increasing_id() ** in java – Yugerten

+0

Il pacchetto org.apache.spark.sql.functions è disponibile nell'API Java https: // spark. apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#monotonicallyIncreasingId-- – Davos

1

per aggirare la valutazione spostamento di monotonically_increasing_id(), si potrebbe provare a scrivere il dataframe su disco, e ri-lettura. Quindi la colonna id ora è semplicemente un campo dati che viene letto, piuttosto che calcolato dinamicamente in qualche punto della pipeline. Anche se è una soluzione piuttosto brutta, ha funzionato quando ho fatto un test rapido.

1

Questo ha funzionato per me. Creata un'altra colonna Identity e utilizzata la funzione window numero_riga

import org.apache.spark.sql.functions.{row_number} 
import org.apache.spark.sql.expressions.Window 

val df1: DataFrame = df.withColumn("Id",lit(1)) 

df1 
.select(
..., 
row_number() 
.over(Window 
.partitionBy("Id" 
.orderBy(col("...").desc)) 
) 
.alias("Row_Nbr") 
)