6

Come raccogliere queste metriche su una console (Spark Shell o Spark submit job) subito dopo l'esecuzione dell'attività o del lavoro.Come recuperare le metriche come la dimensione dell'output e i record scritti dall'interfaccia utente di Spark?

Usiamo Spark per caricare dati da Mysql a Cassandra ed è piuttosto grande (ad es .: ~ 200 GB e 600M righe). Quando l'attività è terminata, vogliamo verificare quante righe ha esattamente elaborato la scintilla? Possiamo ottenere il numero da Spark UI, ma come possiamo recuperare quel numero ("Output Records Written") da spark shell o in spark-submit job.

Esempio di comando da caricare da Mysql a Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load() 

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "payment_types", "keyspace" -> "test")) 

voglio recuperare tutte le metriche Spark UI sul compito in senso prevalentemente formato di output e documenti scritti.

Per favore aiuto.

Grazie per il vostro tempo!

risposta

3

Trovato la risposta. Puoi ottenere le statistiche usando SparkListener.

Se il lavoro non ha parametri di input o output, è possibile ottenere None.get eccezioni che è possibile ignorare tranquillamente fornendo se stmt.

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

Si prega di trovare l'esempio di seguito.

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import com.datastax.spark.connector._ 
import org.apache.spark.sql._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

val conf = new SparkConf() 
.set("spark.cassandra.connection.host", "...") 
.set("spark.driver.allowMultipleContexts","true") 
.set("spark.master","spark://....:7077") 
.set("spark.driver.memory","1g") 
.set("spark.executor.memory","10g") 
.set("spark.shuffle.spill","true") 
.set("spark.shuffle.memoryFraction","0.2") 
.setAppName("CassandraTest") 
sc.stop 
val sc = new SparkContext(conf) 
val sqlcontext = new org.apache.spark.sql.SQLContext(sc) 

var outputWritten = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load() 
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "bucks_payments", "keyspace" -> "test")) 

println("outputWritten",outputWritten) 

Risultato:

scala> println("outputWritten",outputWritten) 
(outputWritten,16383)