ho questo programma di accensione e cercherò di limitarla ad una delle parti pertinentiprogramma Spark dà risultati strani quando correva sul gruppo autonomo
# Split by delimiter ,
# If the file is in unicode, we need to convert each value to a float in order to be able to
# treat it as a number
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()
# start with K randomly selected points from the dataset
# A centroid cannot be an actual data point or else the distance measure between a point and
# that centroid will be zero. This leads to an undefined membership value into that centroid.
centroids = points.takeSample(False, K, 34)
#print centroids
# Initialize our new centroids
newCentroids = [[] for k in range(K)]
tempCentroids = []
for centroid in centroids:
tempCentroids.append([centroid[N] + 0.5])
#centroids = sc.broadcast(tempCentroids)
convergence = False
ncm = NCM()
while(not convergence):
memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))
cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))
# Memberships
T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))
I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0]))
F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0]))
# Components of new centroids
wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))
#print "wTm = " + str(wTm.collect())
print "at first reduce"
sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))
#print "sumwTm = " + str(sumwTm.collect())
wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))
print "adding to cnumerator list"
#print wTmx.collect()
cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()
print "collected cnumerator, now printing"
#print "cnumerator = " + str(cnumerator.collect())
#print str(sumwTm.collect())
# Calculate the new centroids
sumwTmCollection = sumwTm.collect()[0][1]
cnumeratorCollection = cnumerator.collect()
#print "sumwTmCollection = " + str(sumwTmCollection)
#cnumeratorCollection =cnumerator.collectAsMap().get(0).items
print "cnumeratorCollection = " + str(cnumeratorCollection)
for i in range(len(newCentroids)):
newCentroids[i] = scalarMult(1/sumwTmCollection[i], [cnumeratorCollection[i]])
centroids = newCentroids
# Test for convergence
convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)
#convergence = True
# Replace our old centroids with the newly found centroids and repeat if convergence not met
# Clear out space for a new set of centroids
newCentroids = [[] for k in range(K)]
Questo programma funziona abbastanza bene sulla mia macchina locale, tuttavia, non si comporta come previsto quando viene eseguito su un cluster autonomo. Non genera necessariamente un errore, ma ciò che fa produce un output diverso da quello che ricevo quando viene eseguito sul mio computer locale. Il cluster e i 3 nodi sembrano funzionare correttamente. Ho la sensazione che il problema è che continuo ad aggiornare centroids
, che è un elenco python, e cambia ogni volta attraverso il while-loop
. È possibile che ciascun nodo non abbia la copia più recente di quella lista? Penso di sì, quindi ho provato a utilizzare uno broadcast variable
ma quelli non possono essere aggiornati (sola lettura). Ho anche provato ad usare un accumulator
ma quelli sono solo per accumuli. Ho anche provato a salvare gli elenchi python come file su hdf per ciascun nodo a cui accedere, ma questo non ha funzionato bene. Pensi che sto capendo il problema correttamente? C'è qualcos'altro che potrebbe succedere qui? Come posso ottenere codice che funzioni bene sul mio computer locale, ma non su un cluster?
scusa, non riesco a capire dove stai aggiornando i centroidi nel tuo codice .. potresti evidenziarlo per favore? Grazie – mgaido
Grazie per aver guardato questo. È verso il fondo. 'centroids = newCentroids'. –
Sarei più motivato a rispondere alla tua domanda se avessi ripulito/ridotto il tuo codice e, cosa ancora più importante, fornito un campione dei dati che il tuo script fornisce con il diverso output sul cluster. –