Ho provato a utilizzare Spark per lavorare su un semplice problema di grafico. Ho trovato un programma di esempio nella cartella sorgente Spark: transitive_closure.py, che calcola la chiusura transitiva in un grafico con non più di 200 spigoli e vertici. Ma nel mio portatile, funziona più di 10 minuti e non termina. La riga di comando che uso è: spark-submit transitive_closure.py.Il programma di esempio Spark funziona molto lentamente
Mi chiedo perché la scintilla sia così lenta anche quando si calcola un risultato di chiusura transitoria così piccolo? È un caso comune? C'è qualche configurazione che mi manca?
Il programma è mostrato di seguito e può essere trovato nella cartella di installazione di scintilla sul loro sito web.
from __future__ import print_function
import sys
from random import Random
from pyspark import SparkContext
numEdges = 200
numVertices = 100
rand = Random(42)
def generateGraph():
edges = set()
while len(edges) < numEdges:
src = rand.randrange(0, numEdges)
dst = rand.randrange(0, numEdges)
if src != dst:
edges.add((src, dst))
return edges
if __name__ == "__main__":
"""
Usage: transitive_closure [partitions]
"""
sc = SparkContext(appName="PythonTransitiveClosure")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
tc = sc.parallelize(generateGraph(), partitions).cache()
# Linear transitive closure: each round grows paths by one edge,
# by joining the graph's edges with the already-discovered paths.
# e.g. join the path (y, z) from the TC with the edge (x, y) from
# the graph to obtain the path (x, z).
# Because join() joins on keys, the edges are stored in reversed order.
edges = tc.map(lambda x_y: (x_y[1], x_y[0]))
oldCount = 0
nextCount = tc.count()
while True:
oldCount = nextCount
# Perform the join, obtaining an RDD of (y, (z, x)) pairs,
# then project the result to obtain the new (x, z) paths.
new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
tc = tc.union(new_edges).distinct().cache()
nextCount = tc.count()
if nextCount == oldCount:
break
print("TC has %i edges" % tc.count())
sc.stop()
Grazie. Davvero utile Ho ancora una domanda, se puoi aiutare, te ne sarò molto grato. Supponiamo di avere un programma che utilizza molte operazioni relazionali come join, select, union, update ecc. In un ciclo, fino a quando i fatti nelle relazioni non raggiungono un punto fisso. Anche con tuple totali non più di 50, mi sono bloccato sulla seconda iterazione e sull'eccezione della dimensione heap Java. Ho usato cache() e coalizione (1) su ogni operazione di dataframe. Quale potrebbe essere il problema che pensi? – c21