Ho un'applicazione spark python che viene uccisa dal filo per il superamento dei limiti di memoria. Ho un passaggio che comporta il caricamento di alcune risorse un po 'pesanti (500+ MB), quindi sto usando mapPartitions. In sostanza:Utilizzo memoria Python Spark/Yarn
def process_and_output(partition):
resources = load_resources()
for record in partition:
yield transform_record(resources, record)
input = sc.textFile(input_location)
processed = input.mapPartitions(process_and_output)
processed.saveAsTextFile(output_location)
Durante l'esecuzione, ho sempre ottengo questo errore:
ERRORE YarnScheduler: Perso esecutore 1 su (indirizzo rimosso): Contenitore ucciso da filato per il superamento dei limiti di memoria. 11,4 GB di memoria fisica da 11,2 GB utilizzati. Prendi in considerazione il potenziamento di spark.yarn.executor.memoryOverhead.
Ho tentato di aumentare la memoria di memoria molto in alto, ma ancora lo stesso problema. Ho eseguito con:
--conf "spark.python.worker.memory=1200m" \
--conf "spark.yarn.executor.memoryOverhead=5300" \
--conf "spark.executor.memory=6g" \
Sicuramente, è sufficiente memoria Overhead?
Immagino più generalmente, sto faticando a capire come viene controllata/contata la memoria del pitone worker nel totale complessivo. C'è qualche documentazione di questo?
Vorrei anche capire se l'utilizzo di una funzione di generatore riduce effettivamente l'utilizzo della memoria. Trasmetterà i dati attraverso il processo Python (come spero) o lo bufferizzerà tutto prima di inviare nuovamente all'infrastruttura JVM/spark?
Amore questa spiegazione. Grazie per averlo mantenuto reale. – deepelement
Mi ha aiutato! grazie – g07kore