2016-04-14 28 views
5

Ho un caso d'uso in cui avrei bisogno di eliminare righe duplicate di un dataframe (in questo caso duplicato significa che hanno lo stesso campo 'id') mantenendo la riga con il campo 'timestamp' (unix timestamp) più alto.scintilla: Come fare un dropDuplicates su un dataframe mantenendo la riga con il valore più elevato

Ho trovato il metodo drop_duplicate (sto usando pyspark), ma uno non ha il controllo su quale elemento verrà conservato.

Chiunque può aiutare? Thx in anticipo

risposta

6

Potrebbe essere necessaria una mappa e una riduzione manuale per fornire la funzionalità desiderata.

def selectRowByTimeStamp(x,y): 
    if x.timestamp > y.timestamp: 
     return x 
    return y 

dataMap = data.map(lambda x: (x.id, x)) 
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp) 

Qui stiamo raggruppando tutti i dati in base all'ID. Quindi, quando stiamo riducendo i raggruppamenti, lo facciamo mantenendo il record con il timestamp più alto. Quando il codice è terminato, verrà lasciato solo 1 record per ciascun ID.

+0

Qual è la scopo di 'dataMap'? – zero323

+1

In realtà dovrebbe essere 'data.map (lambda x: (x.id, x))' (o 'keyBy'). Lasciamolo solo aggiustarlo – zero323

+0

Assolutamente corretto, bella cattura – David

2

Si può fare qualcosa di simile:

val df = Seq(
    (1,12345678,"this is a test"), 
    (1,23456789, "another test"), 
    (2,2345678,"2nd test"), 
    (2,1234567, "2nd another test") 
).toDF("id","timestamp","data") 

+---+---------+----------------+ 
| id|timestamp|   data| 
+---+---------+----------------+ 
| 1| 12345678| this is a test| 
| 1| 23456789| another test| 
| 2| 2345678|  2nd test| 
| 2| 1234567|2nd another test| 
+---+---------+----------------+ 

df.join(
    df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"), 
    $"id" === $"r_id" && $"timestamp" === $"r_timestamp" 
).drop("r_id").drop("r_timestamp").show 
+---+---------+------------+ 
| id|timestamp|  data| 
+---+---------+------------+ 
| 1| 23456789|another test| 
| 2| 2345678| 2nd test| 
+---+---------+------------+ 

Se vi aspettate ci potrebbe essere un ripetuto timestamp per id (vedi commenti qui sotto), si potrebbe fare questo:

df.dropDuplicates(Seq("id", "timestamp")).join(
    df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"), 
    $"id" === $"r_id" && $"timestamp" === $"r_timestamp" 
).drop("r_id").drop("r_timestamp").show 
+1

È vicino ma non garantisce una singola riga per ID. – zero323

+0

Hmm, vuoi dire se il timestamp è lo stesso? –

+0

Sì, esattamente. Dovrebbe essere possibile prendere semplicemente un arbitrario qui, immagino. – zero323