2015-02-27 2 views
16

Sto eseguendo alcune operazioni in PySpark e recentemente ho aumentato il numero di nodi nella mia configurazione (che è su Amazon EMR). Tuttavia, anche se ho triplicato il numero di nodi (da 4 a 12), le prestazioni non sembrano essere cambiate. Come tale, mi piacerebbe vedere se i nuovi nodi sono visibili a Spark.riceve il numero di nodi visibili in PySpark

sto chiamando la seguente funzione:

sc.defaultParallelism 
>>>> 2 

ma penso che questo mi sta dicendo il numero totale di operazioni distribuite a ciascun nodo, non il numero totale di codici che Spark può vedere.

Come posso vedere la quantità di nodi che PySpark sta utilizzando nel mio cluster?

risposta

13

sc.defaultParallelism è solo un suggerimento. A seconda della configurazione, potrebbe non avere una relazione con il numero di nodi. Questo è il numero di partizioni se si utilizza un'operazione che accetta un argomento di conteggio delle partizioni ma non lo si fornisce. Ad esempio sc.parallelize creerà un nuovo RDD da un elenco. Puoi dire quante partizioni creare nell'RDD con il secondo argomento. Ma il valore predefinito per questo argomento è sc.defaultParallelism.

È possibile ottenere il numero di esecutori con sc.getExecutorMemoryStatus nell'API Scala, ma questo non è esposto nell'API Python.

In generale, la raccomandazione è di avere circa 4 volte il numero di partizioni in un RDD di cui si dispone con gli esecutori. Questo è un buon suggerimento, perché se c'è una variazione in quanto tempo impiegano le attività, questo verrà risolto. Alcuni esecutori elaboreranno 5 compiti più veloci, mentre altri elaboreranno 3 attività più lente, per esempio.

Non è necessario essere molto precisi con questo. Se hai una brutta idea, puoi andare con un preventivo. Come se sapessi di avere meno di 200 CPU, puoi dire che 500 partizioni andranno bene.

Quindi cercate di creare RDDs con questo numero di partizioni:

rdd = sc.parallelize(data, 500)  # If distributing local data. 
rdd = sc.textFile('file.csv', 500) # If loading data from a file. 

o la ripartizione del RDD prima del calcolo se non si controlla la creazione del RDD:

rdd = rdd.repartition(500) 

Puoi controllare il numero di partizioni in un RDD con rdd.getNumPartitions().

+0

Grazie. Tuttavia, quando eseguo 'sc.getExecutorMemoryStatus', viene visualizzato un errore che dice che l'oggetto '' SparkContext '' non ha attributo 'getExecutorMemoryStatus''. Stai usando PySpark? – Bryan

+0

No, sto usando l'API Scala. Ho pensato che sarebbe anche parte dell'API Python, ma sembra che non lo sia. Immagino che le tue opzioni siano da aggiungere all'API Python o passare all'API di Scala. Oppure prendi il numero di macchine come argomento da linea di comando. –

+0

Cosa intendi considerando il numero di macchine come argomento della riga di comando? Puoi richiamarlo quando apri la shell interattiva di PySpark? Sono un tipo Python (quindi non Scala) e non voglio iniziare a fare cambiamenti API. – Bryan

16

Su pyspark si poteva ancora chiamare la Scala getExecutorMemoryStatus API utilizzando ponte py4j di pyspark:

sc._jsc.sc().getExecutorMemoryStatus().size() 
+0

grazie - molto bello – Tagar

1

ho trovato a volte le mie sessioni sono stati uccisi dalla dando a distanza uno strano Java errore

Py4JJavaError: An error occurred while calling o349.defaultMinPartitions. 
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. 

ho evitato questo dal seguente

def check_alive(spark_conn): 
    """Check if connection is alive. ``True`` if alive, ``False`` if not""" 
    try: 
     get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus() 
     return True 
    except Exception: 
     return False 

def get_number_of_executors(spark_conn): 
    if not check_alive(spark_conn): 
     raise Exception('Unexpected Error: Spark Session has been killed') 
    try: 
     return spark_conn._jsc.sc().getExecutorMemoryStatus().size() 
    except: 
     raise Exception('Unknown error') 
+0

Il numero di macchine non corrisponde al numero di esecutori. Una macchina molti hanno uno o più esecutori. –

+1

Grazie è stato un errore di battitura. Il mio scopo era solo quello di evitare che le sessioni venissero uccise dando un errore –

0

Le altre risposte forniscono un modo per ottenere il numero di esecutori. Ecco un modo per ottenere il numero di nodi. Ciò include i nodi head e worker.

s = sc._jsc.sc().getExecutorMemoryStatus().keys() 
l = str(s).replace("Set(","").replace(")","").split(", ") 

d = set() 
for i in l: 
    d.add(i.split(":")[0]) 
len(d) 
0

Dovrebbe essere possibile ottenere il numero di nodi nel cluster utilizzando questo (simile al precedente metodo di Dan, ma più breve e funziona meglio!).

sc._jsc.sc().getExecutorMemoryStatus().keySet().size()