Spark> = 2.0
Tu ca usa window
(da non errato con le funzioni della finestra). Seconda variante assegna timestamp, ad un altro, secchi potenzialmente sovrapposti:
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+
Spark < 2,0
Iniziamo con dati Esempio:
import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0
val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")
assumo che Evento identificato da KEY
. In caso contrario, puoi modificare le clausole GROUP BY
/PARTITION BY
in base alle tue esigenze.
Se siete interessati a un'aggregazione con statico finestra indipendente dei dati di convertire timestamp per valori numerici e rotondo
import org.apache.spark.sql.functions.{round, sum}
// cast string to timestamp
val ts = $"Time".cast("timestamp").cast("long")
// Round to 300 seconds interval
val interval = (round(ts/300L) * 300.0).cast("timestamp").alias("interval")
df.groupBy($"KEY", interval).sum("metric")
// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+
Se siete interessati a una finestra relativa alle funzioni della finestra di utilizzo riga corrente:
import org.apache.spark.sql.expressions.Window
// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+
Per motivi di prestazioni questo approccio è utile solo se i dati possono essere suddivisi in più gruppi separati. In Spark < 2.0.0 avrai anche bisogno di HiveContext
per farlo funzionare.
Ciao sto usando Java Come fare le stesse operazioni in java e spark 2.1.0 – sathiyarajan
@sathiyarajan Dovrebbe essere più o meno lo stesso, escludendo differenze di sintassi minori. – zero323