2015-08-21 2 views
5

Mi piacerebbe confrontare due file consecutive i con i-1 di col2 (ordinate per col1).Come confrontare più righe?

Se item_i della fila -esimo i e item_[i-1]_row sono differenti, vorrei incrementare il conteggio di item_[i-1] da 1.

+--------------+ 
| col1 col2 | 
+--------------+ 
| row_1 item_1 | 
| row_2 item_1 | 
| row_3 item_2 | 
| row_4 item_1 | 
| row_5 item_2 | 
| row_6 item_1 | 
+--------------+ 

Nell'esempio precedente, se scandiamo due righe alla tempo al ribasso, vediamo che row_2 e row_3 sono diversi quindi aggiungiamo uno a item_1. Successivamente, vediamo che row_3 è diverso da row_4, quindi aggiungi uno a item_2. Continua fino alla fine:

+-------------+ 
| col2 col3 | 
+-------------+ 
| item_1 2 | 
| item_2 2 | 
+-------------+ 

risposta

8

È possibile utilizzare una combinazione di una funzione di finestra e un aggregato per eseguire questa operazione. La funzione finestra viene utilizzata per ottenere il valore successivo di col2 (utilizzando col1 per l'ordine). L'aggregato conta allora i tempi in cui incontriamo una differenza. Questo è implementato nel seguente codice:

val data = Seq(
    ("row_1", "item_1"), 
    ("row_2", "item_1"), 
    ("row_3", "item_2"), 
    ("row_4", "item_1"), 
    ("row_5", "item_2"), 
    ("row_6", "item_1")).toDF("col1", "col2") 

import org.apache.spark.sql.expressions.Window 
val q = data. 
    withColumn("col2_next", 
    coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")). 
    groupBy($"col2"). 
    agg(sum($"col2" =!= $"col2_next" cast "int") as "col3") 

scala> q.show 
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 
+------+----+ 
| col2|col3| 
+------+----+ 
|item_1| 2| 
|item_2| 2| 
+------+----+