Invio il mio codice a un cluster spark-alone. Il comando di invio è il seguente:Si tratta di un bug del flusso di scintilla o perdita di memoria?
nohup ./bin/spark-submit \
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2" \
./myCode.py 1>a.log 2>b.log &
Specifico che l'executor utilizza la memoria 4G nel comando precedente. Ma usa il comando top per monitorare il processo dell'esecutore, noto che l'utilizzo della memoria continua a crescere. Ora l'uscita di comando superiore è qui sotto:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12578 root 20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java
La mia memoria totale è di 16G così il 37,3% è già più grande di 4 GB ho specificato. Ed è ancora in crescita.
Utilizzare il comando ps, è possibile sapere che è il processo dell'esecutore.
[[email protected] ~]# ps -awx | grep spark | grep java
10409 ? Sl 1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ? Sl 6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ? Sl 10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ? Sl 21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://[email protected]:52660
Di seguito sono indicati il codice. È molto semplice, quindi non penso ci sia perdita di memoria
if __name__ == "__main__":
dataDirectory = '/stream/raw'
sc = SparkContext(appName="Netflow")
ssc = StreamingContext(sc, 20)
# Read CSV File
lines = ssc.textFileStream(dataDirectory)
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
Il codice per la funzione di processo è di seguito. Si prega di notare che sto usando HiveContext non SqlContext qui. Perché SqlContext non supporta la funzione finestra
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def process(time, rdd):
if rdd.isEmpty():
return sc.emptyRDD()
sqlContext = getSqlContextInstance(rdd.context)
# Convert CSV File to Dataframe
parts = rdd.map(lambda l: l.split(","))
rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
dataframe = sqlContext.createDataFrame(rowRdd)
# Get the top 2 interface of each router
dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
ret.show()
dataframe.show()
In realtà ho trovato sotto il codice causerà il problema:
# Get the top 2 interface of each router
dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
ret.show()
perché se rimuovo questi 5 linea. Il codice può funzionare tutta la notte senza mostrare aumento della memoria. Ma aggiungerli farà sì che l'utilizzo della memoria di executor cresca fino a un numero molto alto.
Fondamentalmente il codice precedente è solo una finestra + grouby in SparkSQL. Quindi questo è un bug?
correlati http://stackoverflow.com/q/37283624/1560062? – zero323
@ zero323 grazie. Ma sto usando spark1.6.1 –