2016-01-11 8 views
15

sto usando scintilla 1.6 ed eseguire la questione di cui sopra quando si esegue il seguente codice:Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id è già impostato

// Imports 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SaveMode 
import scala.concurrent.ExecutionContext.Implicits.global 
import java.util.Properties 
import scala.concurrent.Future 

// Set up spark on local with 2 threads 
val conf = new SparkConf().setMaster("local[2]").setAppName("app") 
val sc = new SparkContext(conf) 
val sqlCtx = new HiveContext(sc) 

// Create fake dataframe 
import sqlCtx.implicits._ 
var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2) 
// Write it as a parquet file 
df.write.parquet("/tmp/parquet1") 
df = sqlCtx.read.parquet("/tmp/parquet1") 

// JDBC connection 
val url = s"jdbc:postgresql://localhost:5432/tempdb" 
val prop = new Properties() 
prop.setProperty("user", "admin") 
prop.setProperty("password", "") 

// 4 futures - at least one of them has been consistently failing for 
val x1 = Future { df.write.jdbc(url, "temp1", prop) } 
val x2 = Future { df.write.jdbc(url, "temp2", prop) } 
val x3 = Future { df.write.jdbc(url, "temp3", prop) } 
val x4 = Future { df.write.jdbc(url, "temp4", prop) } 

Ecco il github gist: https://gist.github.com/karanveerm/27d852bf311e39f05491

L'errore che ottengo è: a

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na] 

questo è un bug scintilla o sto facendo qualcosa di sbagliato/alcuna soluzione alternativa?

+0

Posso chiedere su quale macchina è stato eseguito questo codice? Sono particolarmente interessato alla CPU (quanti core)? –

+0

OSX El Capitan 10.11.1 | MacBook Air (13 pollici, inizio 2014) | Intel Core i7 da 1,7 GHz | 8 GB 1600 MHz DDR3 (credo che i7 sia 4 core) – sparknoob

+0

interessante, non riesco a riprodurlo su un setup simile (da spark shell). Questo potrebbe essere un bug sgradevole, hanno avuto problemi con la generazione di ID prima. Potresti voler creare una JIRA per questo. –

risposta

0

Test 1: È utile eseguire ognuna delle operazioni df.write in modo seriale anziché in parallelo?

Test 2: Può essere d'aiuto se si mantiene il dataframe e quindi si esegue l'operazione df.write in parallelo e si esegue la seralizzazione su unpersist dopo che tutti sono stati completati per vedere se questo aiuta?

1

Dopo aver provato diverse cose, ho trovato che uno dei thread creati dal globale ForkJoinPool ottiene la sua proprietà spark.sql.execution.id impostata su un valore casuale. Non sono riuscito a identificare il processo che effettivamente l'ha fatto, ma ho potuto aggirare il problema utilizzando il mio ExecutionContext.

import java.util.concurrent.Executors 
import concurrent.ExecutionContext 
val executorService = Executors.newFixedThreadPool(4) 
implicit val ec = ExecutionContext.fromExecutorService(executorService) 

Ho utilizzato il codice da http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html. Forse gli attributi dei thread dei cloni ForkJoinPool durante la creazione di nuovi e se ciò accade durante il contesto di un'esecuzione SQL otterrebbe il suo valore non nullo mentre uno FixedThreadPool creerà i thread all'istanza.

+0

Ho incontrato lo stesso problema. Ma questa soluzione non sembra aiutare. Vedo ancora l'errore 'spark.sql.execution.id già impostato '. – mottosan

+0

@Knshiro, non dovrebbe essere Executors.newFixedThreadPool (1)? – smas

+0

@smas il problema non è nel numero di thread ma nell'inizializzazione di quei thread. Il pool di fork join inizializzerà i thread in momenti casuali e per inizializzare nuovi thread clona tutti gli attributi. Quindi, se al momento dell'inizializzazione di un nuovo thread il thread esistente ha un ID di esecuzione SQL impostato, lo copierà su quello nuovo invece di lasciarne generare uno nuovo. – Knshiro

1

Vi preghiamo di controllare SPARK-13747

prendere in considerazione di utilizzare Spark versione 2.2.0 o superiore, se applicabile nel proprio ambiente.