2015-09-24 2 views
5

I Sono appena iniziato il flink. Ho scritto codice seguente ed ho ottenuto "uscite di DataSource ha causato un errore: Impossibile leggere il codice wrapper utente" erroreFlink: le uscite di DataSource hanno causato un errore: Impossibile leggere il wrapper del codice utente

C'è qualche cosa che sto facendo male?

versione: Flink v 0.9.1 (Hadoop 1) non si usa Hadoop: esecuzione locale Scocca: Scala

Codice:

val env = ExecutionEnvironment.getExecutionEnvironment 
val text = env.readTextFile("/home/ashish/Downloads/spark/synop.201501.csv" 
val data_split = text.flatMap{_.split(';')} 
data_split.first(3).print() 

Nota: Il file di input usa ';' come deliminator

Errore:

Scala-Flink> val data_split = text.flatMap{_.split(';')} 
data_split: org.apache.flink.api.scala.DataSet[String] = [email protected] 
Scala-Flink> data_split.first(3).print() 
09/24/2015 09:20:14 Job execution switched to status RUNNING. 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to SCHEDULED 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to DEPLOYING 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to FAILED 
java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 

09/24/2015 09:20:14 Job execution switched to status FAILING. 
09/24/2015 09:20:14 CHAIN GroupReduce (GroupReduce at org.apache.flink.api.scala.DataSet.first(DataSet.scala:707)) -> FlatMap (collect())(1/1) switched to CANCELED 
09/24/2015 09:20:14 DataSink (collect() sink)(1/1) switched to CANCELED 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 
+1

Poche cose per aiutare a rispondere a questo: (1) La terza riga (ottenere il nuovo ambiente di esecuzione) non dovrebbe essere rimossa. Mescolare ambienti diversi probabilmente causerà problemi (e potrebbe essere la ragione del tuo problema qui, in effetti). (2) Puoi pubblicare la traccia completa dello stack di eccezioni. Manca la causa principale, dovrebbe essere sotto "causato da" più in basso nella traccia dello stack. (3) Il tuo esempio di codice sembra avere linee troncate, puoi pubblicare le linee complete? –

+0

la val env avrebbe dovuto essere la prima riga ... Mi dispiace per questo .val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile ("/ home/ashish/Downloads/spark/synop.201501.csv" val data_split = text.flatMap {_. split (';')} data_split.first (3) .print() – ashish

+0

Ho aggiornato il registro errori completo – ashish

risposta

2

Il problema è la dichiarazione "val env = ExecutionEnvironment.getExecutionEnvironment" nella prima riga.

La shell Scala dispone già di un ambiente Execution, associato alla variabile "env", configurata per il corretto caricamento delle classi generate da Shell.

Creando un nuovo ambiente di esecuzione, si ignora l'ambiente preconfigurato con uno non configurato correttamente.

+0

Stephan, potresti specificare un link su come dovrebbe essere eseguito il programma di flink localmente testato in repl? Ho lo stesso problema con il progetto flink 1.2 che si sta sviluppando in ambiente emacs-ensime. 'env.readTextFile (" file:/d: /data/test.csv "). first (5) .print()' funziona bene, comunque 'env.rea dTextFile ("file:/d: /data/test.csv"). map (_. split ('')). first (5) .print() '- not. –

+0

E funziona correttamente in windows cmd nella console sbt. In ensime-inferior-scala 'org.apache.flink.runtime.operators.util.CorruptConfigurationException: Impossibile leggere il wrapper del codice utente: $ anonfun $ 1'. –