9

Ricevo lo stesso errore di Missing an output location for shuffle quando si uniscono i grandi dataframes in Spark SQL. La raccomandazione è di impostare MEMORY_AND_DISK e/o spark.shuffle.memoryFraction 0. Tuttavia, spark.shuffle.memoryFraction è deprecato in Spark> = 1.6.0 e l'impostazione MEMORY_AND_DISK non dovrebbe essere d'aiuto se non sto memorizzando nella cache nessun RDD o Dataframe, giusto? Inoltre sto ottenendo molti altri log WARN e tentativi di attività che mi portano a pensare che il lavoro non sia stabile.Come unire i grandi dataframes in Spark SQL? (best practice, stabilità, prestazioni)

Pertanto, la mia domanda è:

  • Quali sono le best practice per unirsi enormi dataframes a Spark SQL> = 1.6.0?

domande più specifici sono:

  • Come sintonizzare numero di esecutori e spark.sql.shuffle.partitions per ottenere una migliore stabilità/prestazioni?
  • Come trovare il giusto equilibrio tra il livello di parallelismo (numero di esecutori/core) e il numero di partizioni ? Ho riscontrato che aumentare il numero di esecutori non è sempre la soluzione in quanto può generare timeout di lettura I/O timeout a causa del traffico di rete.
  • C'è qualche altro parametro rilevante da regolare per questo scopo?
  • La mia comprensione è che l'unione di dati memorizzati come ORC o Parquet offre prestazioni migliori rispetto a testo o Avro per operazioni di join. C'è una differenza significativa tra Parquet e ORC?
  • C'è un vantaggio di SQLContext rispetto a HiveContext in merito a stabilità/prestazioni per le operazioni di join?
  • C'è una differenza in termini di prestazioni/stabilità quando i dati coinvolti nel join sono in precedenza registerTempTable() o saveAsTable()?

Finora sto utilizzando this is answer e this chapter come punto di partenza. E ci sono altre pagine StackOverflow correlate a questo argomento. Eppure non ho trovato una risposta esauriente a questo tema popolare.

Grazie in anticipo.

+1

Questa risposta raccomanda di impostare spark.sql.shuffle.partitions oltre i 2000 quando ci sono mischiare problemi di memoria, come Spark utilizza una struttura dati diversa per riordino contabilità quando il numero di partizioni è superiore a tale soglia: http://stackoverflow.com/a/36459198/2482894 – leo9r

+1

Impostazione spark.yarn.executor.memoryOverhead = 1024 è consigliato in questa risposta: http://stackoverflow.com/a/33118489/2482894 – leo9r

risposta

0

Queste sono molte domande. Consentitemi di rispondere uno per uno:

Il numero di esecutori è la maggior parte della variabile temporale in un ambiente di produzione. Questo dipende dalle risorse disponibili. Il numero di partizioni è importante quando si eseguono shuffles. Supponendo che i dati siano ora distorti, è possibile ridurre il carico per attività aumentando il numero di partizioni. Un'attività dovrebbe idealmente richiedere un paio di meno. Se l'attività richiede troppo tempo, è possibile che il contenitore venga preventivamente rimosso e il lavoro venga perso.Se l'attività richiede solo pochi millisecondi, il sovraccarico di avvio dell'attività diventa dominante.

Il livello di parallelismo e messa a punto le dimensioni esecutore, vorrei fare riferimento alla guida eccellente da Cloudera: https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

ORC e ​​parquet codificare solo i dati a riposo. Quando si esegue l'effettivo join, i dati sono nel formato in memoria di Spark. Il parquet sta diventando più popolare da quando Netflix e Facebook lo hanno adottato e ci hanno messo molto impegno. Parquet consente di archiviare i dati in modo più efficiente e ha alcune ottimizzazioni (pushdown dei predicati) utilizzate da Spark.

È necessario utilizzare SQLContext anziché HiveContext, poiché HiveContext è deprecato. SQLContext è più generale e non funziona solo con Hive.

Quando si esegue registerTempTable, i dati vengono memorizzati all'interno di SparkSession. Questo non influisce sull'esecuzione del join. Ciò che memorizza è solo il piano di esecuzione che viene richiamato quando viene eseguita un'azione (ad esempio saveAsTable). Quando si esegue un saveAsTable i dati vengono memorizzati sul file system distribuito.

Spero che questo aiuti. Suggerirei anche di guardare i nostri discorsi al vertice di Spark sulla possibilità di partecipare: https://www.youtube.com/watch?v=6zg7NTw-kTQ. Questo potrebbe fornirti alcuni spunti.

Cheers, Fokko