Sono un'applicazione spark con diversi punti in cui vorrei mantenere lo stato corrente. Questo di solito avviene dopo un grande passo, o memorizzando nella cache uno stato che vorrei usare più volte. Sembra che quando chiamo la cache sul mio dataframe una seconda volta, una nuova copia viene memorizzata nella cache. Nella mia applicazione, questo porta a problemi di memoria durante il ridimensionamento. Anche se un determinato dataframe è un massimo di circa 100 MB nei miei test correnti, la dimensione cumulativa dei risultati intermedi aumenta oltre la memoria allocata sull'esecutore. Vedi sotto per un piccolo esempio che mostra questo comportamento.Un-persisting tutti i dataframes in (py) spark
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
simple_data.csv:
1,2,3
4,5,6
7,8,9
Guardando l'interfaccia utente dell'applicazione, v'è una copia del dataframe originale, adition a quello con la nuova colonna . Posso rimuovere la copia originale chiamando df.unpersist()
prima della riga withColumn. È questo il metodo consigliato per rimuovere i risultati intermedi memorizzati nella cache (ad esempio, chi non effettua la chiamata prima di ogni cache()
).
Inoltre, è possibile eliminare tutti gli oggetti memorizzati nella cache. Nella mia applicazione, ci sono punti di rottura naturali in cui posso semplicemente eliminare tutta la memoria e passare al file successivo. Mi piacerebbe farlo senza creare una nuova applicazione spark per ogni file di input.
Grazie in anticipo!
Questa è una buona soluzione per la società in quanto mi permette di cancellare la cache piena a punti di rottura ragionevoli. Lo incorporerò, ma sono preoccupato quando scalerò e comincio a lavorare con dataset più grandi, i miei vecchi cache inizieranno a perdere il controllo. Se voglio cancellare le vecchie cache mentre vado, è la raccomandazione di creare una nuova variabile (o variabili temporanee), e di smistare esplicitamente i vecchi oggetti. Qualcosa come: 'df.cache()'; 'df_new = df.withColumn ('C1 + C2', df ['C1'] + df ['C2'])'; 'df_new.cache()'; 'Df.unpersist()'. Questo sembra un po 'ingombrante se è l'unico modo ... – bjack3
In genere non è necessario cancellare esplicitamente la cache. Viene pulito automaticamente quando necessario. – zero323
Sono preoccupato che stia facendo qualcosa di sbagliato allora. Nella mia intera applicazione, i miei lavori finiranno per bloccarsi a causa di errori di memoria insufficiente.Ogni singola copia di un dataframe è ragionevolmente piccola (meno di 100 MB), ma le cache sembrano vivere per sempre; anche dopo aver scritto l'output su un file e passando ai passaggi successivi. Vedrò se riesco a generare un esempio di lavoro più piccolo per mostrarlo in azione. – bjack3