2014-11-21 11 views
10

Sto cercando di utilizzare Spark streaming con Kafka (versione 1.1.0), ma il lavoro Spark continua a bloccarsi a causa di questo errore:Spark Streaming: Impossibile calcolare spaccatura, il blocco non trovato

14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 

L'unica rilevante le informazioni che ricevo da registri è questo:

14/11/21 12:34:18 INFO MemoryStore: Block input-0-1416573258200 stored as bytes to memory (size 85.8 KB, free 2.3 GB) 
14/11/21 12:34:18 INFO BlockManagerMaster: Updated info of block input-0-1416573258200 
14/11/21 12:34:18 INFO BlockGenerator: Pushed block input-0-1416573258200 
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)] 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
14/11/21 12:37:35 INFO BlockManagerInfo: Added input-0-1416573258200 in memory on ********:43117 (size: 85.8 KB, free: 2.3 GB) 
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)] 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 

codice di esempio:

SparkConf conf = new SparkConf(); 
JavaSparkContext sc = new JavaSparkContext(conf); 
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); 
jssc.checkpoint(checkpointDir); 

HashMap<String, Integer> topics = new HashMap<String, Integer>(); 
topics.put(KAFKA_TOPIC, 1); 

HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
kafkaParams.put("group.id", "spark-streaming-test"); 
kafkaParams.put("zookeeper.connect", ZOOKEEPER_QUORUM); 
kafkaParams.put("zookeeper.connection.timeout.ms", "1000"); 
kafkaParams.put("auto.offset.reset", "smallest"); 

JavaPairReceiverInputDStream<String, String> kafkaStream = 
    KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevels.MEMORY_AND_DISK_SER); 

JavaPairDStream<String, String> streamPair = kafkaStream.flatMapToPair(...).reduceByKey(...); 

Non sono sicuro quale sia la causa di questo problema.

+0

come sono le prestazioni del lavoro? È in ritardo? – maasg

+0

No, non è in ritardo. – Bobby

+0

Hai già trovato una soluzione? Ho lo stesso problema con Kafka/Spark Streaming 1.2 – bibac

risposta

1
+0

Benvenuti in questo sito. Mentre il link aiuta, la risorsa può essere spostata o cancellata. Quindi è buona norma non fornire un collegamento senza spiegare che cosa il link porta alla soluzione. Vedi [risposta] – mins

+0

Sì, ho provato questo e non aiuta. – Bobby

+0

Ho risolto questo problema per ridurre i dati di input nel ricevitore. Penso che uno dei possibili motivi per cui i dati di input superino la capacità di elaborazione. –

0

Verificare quanto segue.

1) Hai creato il contesto in streaming correttamente come in

def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

// Do additional setup on context that needs to be done, 
// irrespective of whether it is being started or restarted 
context. ... 

// Start the context 
context.start() 
context.awaitTermination() 

tuo inizializzazione non è corretto.

Date un'occhiata al di sotto

Esempio: il codice a recoverableNetworkCount App

2) Hai attivato la scrittura di proprietà avanti log "spark.streaming.receiver.writeAheadLog.enable"

3) Verifica la stabilità dello streaming nell'interfaccia utente di streaming. tempo di elaborazione intervallo < batch.