2016-05-11 42 views
8

Invio il mio codice a un cluster spark-alone. Il comando di invio è il seguente:Si tratta di un bug del flusso di scintilla o perdita di memoria?

nohup ./bin/spark-submit \ 
--master spark://ES01:7077 \ 
--executor-memory 4G \ 
--num-executors 1 \ 
--total-executor-cores 1 \ 
--conf "spark.storage.memoryFraction=0.2" \ 
./myCode.py 1>a.log 2>b.log & 

Specifico che l'executor utilizza la memoria 4G nel comando precedente. Ma usa il comando top per monitorare il processo dell'esecutore, noto che l'utilizzo della memoria continua a crescere. Ora l'uscita di comando superiore è qui sotto:

PID USER  PR NI VIRT RES SHR S %CPU %MEM  TIME+ COMMAND                                      
12578 root  20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java  

La mia memoria totale è di 16G così il 37,3% è già più grande di 4 GB ho specificato. Ed è ancora in crescita.

Utilizzare il comando ps, è possibile sapere che è il processo dell'esecutore.

[[email protected] ~]# ps -awx | grep spark | grep java 
10409 ?  Sl  1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080 
10603 ?  Sl  6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077 
12420 ?  Sl 10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py 
12578 ?  Sl 21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://[email protected]:52660 

Di seguito sono indicati il ​​codice. È molto semplice, quindi non penso ci sia perdita di memoria

if __name__ == "__main__": 

    dataDirectory = '/stream/raw' 

    sc = SparkContext(appName="Netflow") 
    ssc = StreamingContext(sc, 20) 

    # Read CSV File 
    lines = ssc.textFileStream(dataDirectory) 

    lines.foreachRDD(process) 

    ssc.start() 
    ssc.awaitTermination() 

Il codice per la funzione di processo è di seguito. Si prega di notare che sto usando HiveContext non SqlContext qui. Perché SqlContext non supporta la funzione finestra

def getSqlContextInstance(sparkContext): 
    if ('sqlContextSingletonInstance' not in globals()): 
     globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext) 
    return globals()['sqlContextSingletonInstance'] 

def process(time, rdd): 

    if rdd.isEmpty(): 
     return sc.emptyRDD() 

    sqlContext = getSqlContextInstance(rdd.context) 

    # Convert CSV File to Dataframe 
    parts = rdd.map(lambda l: l.split(",")) 
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11]))) 
    dataframe = sqlContext.createDataFrame(rowRdd) 

    # Get the top 2 interface of each router 
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits')) 
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc()) 
    rank = func.dense_rank().over(windowSpec) 
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2") 

    ret.show() 
    dataframe.show() 

In realtà ho trovato sotto il codice causerà il problema:

# Get the top 2 interface of each router 
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits')) 
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc()) 
    rank = func.dense_rank().over(windowSpec) 
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2") 
    ret.show() 

perché se rimuovo questi 5 linea. Il codice può funzionare tutta la notte senza mostrare aumento della memoria. Ma aggiungerli farà sì che l'utilizzo della memoria di executor cresca fino a un numero molto alto.

Fondamentalmente il codice precedente è solo una finestra + grouby in SparkSQL. Quindi questo è un bug?

+0

correlati http://stackoverflow.com/q/37283624/1560062? – zero323

+0

@ zero323 grazie. Ma sto usando spark1.6.1 –

risposta

0

Come posso vedere nelle 5 righe, forse il groupBy è il problema, proverei con reduceBy, e vedere come si comporta.

Vedere here e here.

+0

grazie per le informazioni. Ma mi aspetto di sapere se è un bug o non lo sto usando nel modo giusto. –

+0

@Tristan Non è lo stesso gruppo. Come su RDD, vedere http://stackoverflow.com/q/32902982/1560062 – zero323

+0

Presumo che questo file csv sia memorizzato su HDFS. qual è la sua dimensione? quanto cresce/cambia e in quale frequenza. quello che sto cercando di capire è quanti dati hai bisogno di elaborare ad ogni intervallo di batch e che cos'è quell'intervallo (1 sec di default)? –

3

responsabilità: questa risposta non si basa su debug, ma più su osservazioni e la documentazione di Apache Spark offre

Non credo che questo sia un bug per cominciare!

Osservando le configurazioni, possiamo vedere che ci si sta concentrando principalmente sull'ottimizzazione dell'esecutore, il che non è sbagliato, ma si sta dimenticando la parte driver dell'equazione.

Guardando la panoramica grappolo scintilla Apache Spark documentaion

enter image description here

Come si può vedere, ogni lavoratore ha un esecutore, però, nel tuo caso, il nodo dei lavoratori è lo stesso che il nodo conducente! Che francamente è il caso quando si esegue localmente o su un cluster autonomo in un singolo nodo.

Inoltre, il driver utilizza 1G di memoria per impostazione predefinita a meno che non sia ottimizzato utilizzando il flag spark.driver.memory.Inoltre, non bisogna dimenticare l'utilizzo dell'heap dalla JVM stessa e l'interfaccia utente Web che è stata gestita dal driver anche AFAIK!

Quando si elimina le righe di codice che lei ha citato, il codice è rimasto senza azioni come map funzione è solo una trasformazione, di conseguenza, non ci sarà l'esecuzione, e quindi, non si vede aumentare la memoria a tutti !

Lo stesso vale per groupBy in quanto è solo una trasformazione che non verrà eseguita a meno che non venga chiamata un'azione che nel tuo caso è agg e show più in basso nel flusso!

Detto questo, cercare di ridurre al minimo la memoria del driver e il numero complessivo di core in scintilla definito da spark.cores.max se si desidera controllare il numero di core in questo processo, quindi eseguire il collegamento in cascata agli esecutori. Inoltre, aggiungerei il numero spark.python.profile.dump all'elenco di configurazione in modo da poter visualizzare un profilo per l'esecuzione del lavoro spark, che può aiutarti a comprendere meglio il caso e ad adattare il cluster alle tue esigenze.

+0

Ciao, grazie per la risposta. Ma 1. Ho ancora un dataframe.show() dopo aver rimosso quelle linee. Quindi c'è ancora un'azione. 2. Nel mio caso, il calcolo del flusso può essere eseguito per alcune ore. Il che significa migliaia di cicli (l'intervallo è di 20 secondi). Durante questo periodo l'utilizzo della memoria di executor continua a crescere. Quindi non so quale sia la tua soluzione suggerita. Riduci la memoria del mio driver? Perché? –

+0

Non mi sono reso conto che lo spettacolo non fa parte del codice cancellato. Per quanto riguarda la memoria, sto costruendo basandomi sul tentativo di minimizzarlo e vedere se traboccherà! –