Sto cercando di confrontare diversi modi per aggregare i miei dati.Spark: Come convertire count (distinto (valore)) in Dataframe API
Questo è il mio dati di input con 2 elementi (pagina, visitatori):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Lavorare con un comando SQL in Spark SQL con questo codice:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
ottengo questo output:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Ora, mi piacerebbe ottenere lo stesso risultato utilizzando Dataframes e l'API di thiers, ma non riesco a ottenere lo stesso Uscita:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
In realtà, questo è quello che ottengo come output:
[PAG1,8] // just the simple page count for every page
[PAG2,4]
E 'probabilmente qualcosa di stupido, ma non riesco a vedere in questo momento.
Grazie in anticipo!
FF
ottengo questo errore -> non trovato: il valore CountDistinct –
è un metodo in 'org.apache.spark.sql .functions', importa che :), modifica fatta. –
con intelliJ Devo scrivere il comando agg/countDistinct come questo .agg (org.apache.spark.sql.functions.countDistinct ("visitor")) perché anche se ho importato org.apache.spark.sql. funziona ancora mi dà lo stesso errore ... comunque questo funziona, ma ottengo solo la colonna visitatore e nessuna colonna della pagina ([2], [3]) ... cosa mi manca? –