5

Ho uno strano problema nella scintilla di apache e gradirei un po 'di aiuto. Dopo aver letto i dati da hdfs (e facendo una conversione da json a object) la fase successiva (elaborazione di detti oggetti) fallisce dopo che 2 partizioni sono state elaborate (su 512 in totale). Questo accade su set di dati di grandi dimensioni (il più piccolo che ho notato è di circa 700 mega, ma potrebbe essere inferiore, non l'ho ancora ristretto).Spark 1.0.2 (anche 1.1.0) si blocca su una partizione

MODIFICA: 700 mega è la dimensione del file tgz, non compressa è di 6 concerti.
EDIT 2: La stessa cosa accade sul scintilla 1.1.0

Io corro scintilla con il maestro locale, su un nucleo 32, 60 macchina di concerto, con le seguenti impostazioni:

spark.akka.timeout = 200 
spark.shuffle.consolidateFiles = true 
spark.kryoserializer.buffer.mb = 128 
spark.reducer.maxMbInFlight = 128 

con 16 dimensione heap dell'esecutore di gig. La memoria non viene estesa al massimo, il carico della CPU è trascurabile. Spark pende solo, per sempre.

Di seguito si riporta il registro scintilla:

14/09/11 10:19:52 INFO HadoopRDD: Input split: hdfs://localhost:9000/spew/data/json.lines:6351070299+12428842 
14/09/11 10:19:53 INFO Executor: Serialized size of result for 511 is 1263 
14/09/11 10:19:53 INFO Executor: Sending result for 511 directly to driver 
14/09/11 10:19:53 INFO Executor: Finished task ID 511 
14/09/11 10:19:53 INFO TaskSetManager: Finished TID 511 in 868 ms on localhost (progress: 512/512) 
14/09/11 10:19:53 INFO DAGScheduler: Completed ShuffleMapTask(3, 511) 
14/09/11 10:19:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/09/11 10:19:53 INFO DAGScheduler: Stage 3 (mapToPair at Main.java:205) finished in 535.874 s 
14/09/11 10:19:53 INFO DAGScheduler: looking for newly runnable stages 
14/09/11 10:19:53 INFO DAGScheduler: running: Set() 
14/09/11 10:19:53 INFO DAGScheduler: waiting: Set(Stage 0, Stage 1, Stage 2) 
14/09/11 10:19:53 INFO DAGScheduler: failed: Set() 
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 0: List(Stage 1) 
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 1: List(Stage 2) 
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 2: List() 
14/09/11 10:19:53 INFO DAGScheduler: Submitting Stage 2 (FlatMappedRDD[10] at flatMapToPair at Driver.java:145), which is now runnable 
14/09/11 10:19:53 INFO DAGScheduler: Submitting 512 missing tasks from Stage 2 (FlatMappedRDD[10] at flatMapToPair at Driver.java:145) 
14/09/11 10:19:53 INFO TaskSchedulerImpl: Adding task set 2.0 with 512 tasks 
14/09/11 10:19:53 INFO TaskSetManager: Starting task 2.0:0 as TID 512 on executor localhost: localhost (PROCESS_LOCAL) 
14/09/11 10:19:53 INFO TaskSetManager: Serialized task 2.0:0 as 3469 bytes in 0 ms 
14/09/11 10:19:53 INFO Executor: Running task ID 512 
14/09/11 10:19:53 INFO BlockManager: Found block broadcast_0 locally 
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545 
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks 
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms 
14/09/11 10:20:07 INFO Executor: Serialized size of result for 512 is 1479 
14/09/11 10:20:07 INFO Executor: Sending result for 512 directly to driver 
14/09/11 10:20:07 INFO Executor: Finished task ID 512 
14/09/11 10:20:07 INFO TaskSetManager: Starting task 2.0:1 as TID 513 on executor localhost: localhost (PROCESS_LOCAL) 
14/09/11 10:20:07 INFO TaskSetManager: Serialized task 2.0:1 as 3469 bytes in 0 ms 
14/09/11 10:20:07 INFO Executor: Running task ID 513 
14/09/11 10:20:07 INFO TaskSetManager: Finished TID 512 in 13996 ms on localhost (progress: 1/512) 
14/09/11 10:20:07 INFO DAGScheduler: Completed ShuffleMapTask(2, 0) 
14/09/11 10:20:07 INFO BlockManager: Found block broadcast_0 locally 
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545 
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks 
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 
14/09/11 10:20:15 INFO Executor: Serialized size of result for 513 is 1479 
14/09/11 10:20:15 INFO Executor: Sending result for 513 directly to driver 
14/09/11 10:20:15 INFO Executor: Finished task ID 513 
14/09/11 10:20:15 INFO TaskSetManager: Starting task 2.0:2 as TID 514 on executor localhost: localhost (PROCESS_LOCAL) 
14/09/11 10:20:15 INFO TaskSetManager: Serialized task 2.0:2 as 3469 bytes in 0 ms 
14/09/11 10:20:15 INFO Executor: Running task ID 514 
14/09/11 10:20:15 INFO TaskSetManager: Finished TID 513 in 7768 ms on localhost (progress: 2/512) 
14/09/11 10:20:15 INFO DAGScheduler: Completed ShuffleMapTask(2, 1) 
14/09/11 10:20:15 INFO BlockManager: Found block broadcast_0 locally 
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545 
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks 
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 

1) Che cosa DAGScheduler: failed: Set() significa? Presumo che non sia fondamentale dal momento che è il livello INFO, ma non si sa mai.

2) Che cosa significa Missing parents? Di nuovo, sono INFO.

Questa è l'uscita di jstack:

"Service Thread" #20 daemon prio=9 os_prio=0 tid=0x00007f39400ff000 nid=0x10560 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x00007f39400fa000 nid=0x1055f waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread13" #18 daemon prio=9 os_prio=0 tid=0x00007f39400f8000 nid=0x1055e waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread12" #17 daemon prio=9 os_prio=0 tid=0x00007f39400f6000 nid=0x1055d waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread11" #16 daemon prio=9 os_prio=0 tid=0x00007f39400f4000 nid=0x1055c waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread10" #15 daemon prio=9 os_prio=0 tid=0x00007f39400f1800 nid=0x1055b waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread9" #14 daemon prio=9 os_prio=0 tid=0x00007f39400ef800 nid=0x1055a waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread8" #13 daemon prio=9 os_prio=0 tid=0x00007f39400ed800 nid=0x10559 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread7" #12 daemon prio=9 os_prio=0 tid=0x00007f39400eb800 nid=0x10558 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread6" #11 daemon prio=9 os_prio=0 tid=0x00007f39400e9800 nid=0x10557 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread5" #10 daemon prio=9 os_prio=0 tid=0x00007f39400e7800 nid=0x10556 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread4" #9 daemon prio=9 os_prio=0 tid=0x00007f39400dd000 nid=0x10555 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007f39400db000 nid=0x10554 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f39400d8800 nid=0x10553 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f39400d7000 nid=0x10552 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f39400d4000 nid=0x10551 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f39400d2000 nid=0x10550 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f39400a2800 nid=0x1054f in Object.wait() [0x00007f38d39f8000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142) 
    - locked <0x00000000e0038180> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:158) 
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) 

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f39400a0800 nid=0x1054e in Object.wait() [0x00007f38d3af9000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    at java.lang.Object.wait(Object.java:502) 
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) 
    - locked <0x00000000e00161b8> (a java.lang.ref.Reference$Lock) 

"main" #1 prio=5 os_prio=0 tid=0x00007f394000a000 nid=0x10535 in Object.wait() [0x00007f3945ee1000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0x00000000e03df000> (a org.apache.spark.scheduler.JobWaiter) 
    at java.lang.Object.wait(Object.java:502) 
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) 
    - locked <0x00000000e03df000> (a org.apache.spark.scheduler.JobWaiter) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:452) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1051) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097) 
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:716) 
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:294) 
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:44) 
    at spew.Driver.run(Driver.java:88) 
    at spew.Main.main(Main.java:92) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

"VM Thread" os_prio=0 tid=0x00007f3940099800 nid=0x1054d runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f394001f800 nid=0x10536 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f3940021000 nid=0x10537 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f3940023000 nid=0x10538 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f3940024800 nid=0x10539 runnable 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f3940026800 nid=0x1053a runnable 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f3940028000 nid=0x1053b runnable 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f394002a000 nid=0x1053c runnable 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f394002b800 nid=0x1053d runnable 

"GC task thread#8 (ParallelGC)" os_prio=0 tid=0x00007f394002d000 nid=0x1053e runnable 

"GC task thread#9 (ParallelGC)" os_prio=0 tid=0x00007f394002f000 nid=0x1053f runnable 

"GC task thread#10 (ParallelGC)" os_prio=0 tid=0x00007f3940030800 nid=0x10540 runnable 

"GC task thread#11 (ParallelGC)" os_prio=0 tid=0x00007f3940032800 nid=0x10541 runnable 

"GC task thread#12 (ParallelGC)" os_prio=0 tid=0x00007f3940034000 nid=0x10542 runnable 

"GC task thread#13 (ParallelGC)" os_prio=0 tid=0x00007f3940036000 nid=0x10543 runnable 

"GC task thread#14 (ParallelGC)" os_prio=0 tid=0x00007f3940037800 nid=0x10544 runnable 

"GC task thread#15 (ParallelGC)" os_prio=0 tid=0x00007f3940039800 nid=0x10545 runnable 

"GC task thread#16 (ParallelGC)" os_prio=0 tid=0x00007f394003b000 nid=0x10546 runnable 

"GC task thread#17 (ParallelGC)" os_prio=0 tid=0x00007f394003d000 nid=0x10547 runnable 

"GC task thread#18 (ParallelGC)" os_prio=0 tid=0x00007f394003e800 nid=0x10548 runnable 

"GC task thread#19 (ParallelGC)" os_prio=0 tid=0x00007f3940040800 nid=0x10549 runnable 

"GC task thread#20 (ParallelGC)" os_prio=0 tid=0x00007f3940042000 nid=0x1054a runnable 

"GC task thread#21 (ParallelGC)" os_prio=0 tid=0x00007f3940044000 nid=0x1054b runnable 

"GC task thread#22 (ParallelGC)" os_prio=0 tid=0x00007f3940045800 nid=0x1054c runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f3940102000 nid=0x10561 waiting on condition 

JNI global references: 422 

Qualcuno ha avuto problemi come questo con scintilla? È strano perché per piccoli (minuscoli) set di dati (dispositivi di prova, ecc.) Funziona.

+0

Probabilmente nessuna delle linee INFO può aiutarti a capire perché è sospesa. 1) 'DAGScheduler: failed: Set()' indica che l'insieme di stadi falliti è vuoto (cioè non è ancora stato effettuato nulla.) 2) 'Missing parents' è l'elenco di fasi i cui risultati sono necessari per calcolare i risultati richiesti e che non sono già memorizzati nella memoria. –

+0

@KarlHigley non può essere più d'accordo, ha postato una risposta e ha utilizzato il tuo commento. Se non sei OK, fammelo sapere. mikejohnharry, bella domanda! – gsamaras

risposta

1

Non rispondere a una domanda per una versione precedente di tipo , ma so che quando l'applicazione si blocca, è probabilmente a causa della morte delle risorse (ad esempio per ).

Ho avuto un problema simile in Is Spark's KMeans unable to handle bigdata? La cosa migliore che si può fare è mettere a punto l'applicazione, poiché non ci sono informazioni nella tua domanda che suggerirebbero come risolvere questo problema.

È inoltre possibile ottimizzare il numero di partizioni, con la regola generale.

Per un altro lavoro che ho dovuto mettere a punto per scalare a dati 15T, ho segnalato il mio approccio in memoryOverhead issue in Spark, ma non so se questo è correlato.


Come Karl Higley suggerito e sono d'accordo: (. Vale a dire nulla è ancora fallito)

Il fallimento di pianificazione Aciclica grafo diretto (DAG) significa che l'insieme degli stadi falliti è vuoto.

I genitori mancanti sono l'elenco di fasi i cui risultati sono necessari per calcolare i risultati richiesti e che non sono già memorizzati nella memoria.