2016-02-22 10 views
6

Ho provato a utilizzare Spark per lavorare su un semplice problema di grafico. Ho trovato un programma di esempio nella cartella sorgente Spark: transitive_closure.py, che calcola la chiusura transitiva in un grafico con non più di 200 spigoli e vertici. Ma nel mio portatile, funziona più di 10 minuti e non termina. La riga di comando che uso è: spark-submit transitive_closure.py.Il programma di esempio Spark funziona molto lentamente

Mi chiedo perché la scintilla sia così lenta anche quando si calcola un risultato di chiusura transitoria così piccolo? È un caso comune? C'è qualche configurazione che mi manca?

Il programma è mostrato di seguito e può essere trovato nella cartella di installazione di scintilla sul loro sito web.

from __future__ import print_function 

import sys 
from random import Random 

from pyspark import SparkContext 

numEdges = 200 
numVertices = 100 
rand = Random(42) 


def generateGraph(): 
    edges = set() 
    while len(edges) < numEdges: 
     src = rand.randrange(0, numEdges) 
     dst = rand.randrange(0, numEdges) 
     if src != dst: 
      edges.add((src, dst)) 
    return edges 


if __name__ == "__main__": 
    """ 
    Usage: transitive_closure [partitions] 
    """ 
    sc = SparkContext(appName="PythonTransitiveClosure") 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 
    tc = sc.parallelize(generateGraph(), partitions).cache() 

    # Linear transitive closure: each round grows paths by one edge, 
    # by joining the graph's edges with the already-discovered paths. 
    # e.g. join the path (y, z) from the TC with the edge (x, y) from 
    # the graph to obtain the path (x, z). 

    # Because join() joins on keys, the edges are stored in reversed order. 
    edges = tc.map(lambda x_y: (x_y[1], x_y[0])) 

    oldCount = 0 
    nextCount = tc.count() 
    while True: 
     oldCount = nextCount 
     # Perform the join, obtaining an RDD of (y, (z, x)) pairs, 
     # then project the result to obtain the new (x, z) paths. 
     new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0])) 
     tc = tc.union(new_edges).distinct().cache() 
     nextCount = tc.count() 
     if nextCount == oldCount: 
      break 

    print("TC has %i edges" % tc.count()) 

    sc.stop() 

risposta

4

ci possono molte ragioni per cui questo codice non eseguire particolarmente bene sulla vostra macchina ma molto probabilmente questo è solo un altro variante del problema descritto in Spark iteration time increasing exponentially when using join. Il modo più semplice per verificare se è davvero il caso è quello di fornire spark.default.parallelism parametro presentare:

bin/spark-submit --conf spark.default.parallelism=2 \ 
    examples/src/main/python/transitive_closure.py 

Se non limitato altrimenti, SparkContext.union, RDD.join e RDD.union impostare un numero di partizioni del bambino per il numero totale di partizioni nei genitori. Di solito è un comportamento desiderato ma può diventare estremamente inefficiente se applicato in modo iterativo.

+1

Grazie. Davvero utile Ho ancora una domanda, se puoi aiutare, te ne sarò molto grato. Supponiamo di avere un programma che utilizza molte operazioni relazionali come join, select, union, update ecc. In un ciclo, fino a quando i fatti nelle relazioni non raggiungono un punto fisso. Anche con tuple totali non più di 50, mi sono bloccato sulla seconda iterazione e sull'eccezione della dimensione heap Java. Ho usato cache() e coalizione (1) su ogni operazione di dataframe. Quale potrebbe essere il problema che pensi? – c21

0

L'useage dice il linea di comando è

transitive_closure [partitions] 

impostazione predefinita parallelismo solo aiuterà con i join in ogni partizione, non la distribuzione iniziale che del lavoro.

Ho intenzione di sostenere che si dovrebbero usare le partizioni MORE. L'impostazione del parallelismo predefinito può ancora essere d'aiuto, ma il codice che hai postato imposta esplicitamente il numero (l'argomento passato o 2, a seconda di quale è maggiore). Il minimo assoluto dovrebbe essere il core disponibile per Spark, altrimenti si lavora sempre a meno del 100%.

+0

Qui non c'è alcun valore nell'aumentare il parallelismo. La quantità di dati effettivamente fornita consente di ottenere di più riducendola a 1 :) Per non parlare del fatto di far cadere Spark. – zero323