2016-06-04 29 views
10

mio set di dati si presenta così:Come gruppo da intervallo di tempo in Spark SQL

KEY |Event_Type | metric | Time 
001 |event1  | 10  | 2016-05-01 10:50:51 
002 |event2  | 100 | 2016-05-01 10:50:53 
001 |event3  | 20  | 2016-05-01 10:50:55 
001 |event1  | 15  | 2016-05-01 10:51:50 
003 |event1  | 13  | 2016-05-01 10:55:30 
001 |event2  | 12  | 2016-05-01 10:57:00 
001 |event3  | 11  | 2016-05-01 11:00:01 

voglio ottenere tutti quando le chiavi che verificare questo:

"SUM di metrica per un evento specifico ">soglia durante 5 minuti.

Questo mi sembra un candidato perfetto per l'utilizzo delle funzioni di scorrimento .

Come posso farlo con Spark SQL?

Grazie.

risposta

13

Spark> = 2.0

Tu ca usa window (da non errato con le funzioni della finestra). Seconda variante assegna timestamp, ad un altro, secchi potenzialmente sovrapposti:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") 

// +---+---------------------------------------------+-----------+ 
// |KEY|window          |sum(metric)| 
// +---+---------------------------------------------+-----------+ 
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45   | 
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12   | 
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13   | 
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11   | 
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100  | 
// +---+---------------------------------------------+-----------+ 

Spark < 2,0

Iniziamo con dati Esempio:

import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0 

val df = Seq(
    ("001", "event1", 10, "2016-05-01 10:50:51"), 
    ("002", "event2", 100, "2016-05-01 10:50:53"), 
    ("001", "event3", 20, "2016-05-01 10:50:55"), 
    ("001", "event1", 15, "2016-05-01 10:51:50"), 
    ("003", "event1", 13, "2016-05-01 10:55:30"), 
    ("001", "event2", 12, "2016-05-01 10:57:00"), 
    ("001", "event3", 11, "2016-05-01 11:00:01") 
).toDF("KEY", "Event_Type", "metric", "Time") 

assumo che Evento identificato da KEY. In caso contrario, puoi modificare le clausole GROUP BY/PARTITION BY in base alle tue esigenze.

Se siete interessati a un'aggregazione con statico finestra indipendente dei dati di convertire timestamp per valori numerici e rotondo

import org.apache.spark.sql.functions.{round, sum} 

// cast string to timestamp 
val ts = $"Time".cast("timestamp").cast("long") 

// Round to 300 seconds interval 
val interval = (round(ts/300L) * 300.0).cast("timestamp").alias("interval") 

df.groupBy($"KEY", interval).sum("metric") 

// +---+---------------------+-----------+ 
// |KEY|interval    |sum(metric)| 
// +---+---------------------+-----------+ 
// |001|2016-05-01 11:00:00.0|11   | 
// |001|2016-05-01 10:55:00.0|12   | 
// |001|2016-05-01 10:50:00.0|45   | 
// |003|2016-05-01 10:55:00.0|13   | 
// |002|2016-05-01 10:50:00.0|100  | 
// +---+---------------------+-----------+ 

Se siete interessati a una finestra relativa alle funzioni della finestra di utilizzo riga corrente:

import org.apache.spark.sql.expressions.Window 

// Partition by KEY 
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row 
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150) 
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w)) 

// +---+----------+------+-------------------+----------+----------+ 
// |KEY|Event_Type|metric|Time    |ts  |window_sum| 
// +---+----------+------+-------------------+----------+----------+ 
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13  | 
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45  | 
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45  | 
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45  | 
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12  | 
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11  | 
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100  | 
// +---+----------+------+-------------------+----------+----------+ 

Per motivi di prestazioni questo approccio è utile solo se i dati possono essere suddivisi in più gruppi separati. In Spark < 2.0.0 avrai anche bisogno di HiveContext per farlo funzionare.

+0

Ciao sto usando Java Come fare le stesse operazioni in java e spark 2.1.0 – sathiyarajan

+0

@sathiyarajan Dovrebbe essere più o meno lo stesso, escludendo differenze di sintassi minori. – zero323

0

Per confine statico si può fare seguente:

1) Trasformazione (mappa, mapPartitions ecc) Valore temporale per formare AAAA-MM-DD-hh-mm dove mm è rotolato a livello 5 minuti. per esempio. 01, 02, 03, 05 diventa 05; 16,17,18,19,20 diventa 20

2) Eseguire groupBy o reduceBy con event_type e l'ora ed eseguire il tuo aggregazione (Sum) su metriche

3) Eseguire il filtro di trasformazione per filtrare metriche> 5

Puoi scrivere sopra in spark rdd o dataframe (sql) quasi nello stesso modo.

Per altri tipi di confine dove 00-05, 01-06, 02-07 si dovrebbe provare a guardare nel concetto di finestra scorrevole. Se il vostro caso d'uso ingestione dati si inserisce modello di streaming poi Spark Streaming API sarà perfetto altrimenti si possono trovare soluzione personalizzata come questo: Apache Spark - Dealing with Sliding Windows on Temporal RDDs