2016-04-23 41 views
13

ho questo programma di accensione e cercherò di limitarla ad una delle parti pertinentiprogramma Spark dà risultati strani quando correva sul gruppo autonomo

# Split by delimiter , 
# If the file is in unicode, we need to convert each value to a float in order to be able to 
# treat it as a number 
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist() 

# start with K randomly selected points from the dataset 
# A centroid cannot be an actual data point or else the distance measure between a point and 
# that centroid will be zero. This leads to an undefined membership value into that centroid. 
centroids = points.takeSample(False, K, 34) 
#print centroids 
# Initialize our new centroids 
newCentroids = [[] for k in range(K)] 
tempCentroids = [] 
for centroid in centroids: 
    tempCentroids.append([centroid[N] + 0.5]) 
#centroids = sc.broadcast(tempCentroids) 

convergence = False 

ncm = NCM() 

while(not convergence): 
    memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m))) 
    cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value))) 
    # Memberships 
    T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c))) 
    I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0])) 
    F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0])) 
    # Components of new centroids 
    wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t)))) 
    #print "wTm = " + str(wTm.collect()) 
    print "at first reduce" 
    sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2)) 
    #print "sumwTm = " + str(sumwTm.collect()) 
    wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t)))) 
    print "adding to cnumerator list" 
    #print wTmx.collect() 
    cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values() 
    print "collected cnumerator, now printing"  
    #print "cnumerator = " + str(cnumerator.collect()) 
    #print str(sumwTm.collect()) 
    # Calculate the new centroids 
    sumwTmCollection = sumwTm.collect()[0][1] 
    cnumeratorCollection = cnumerator.collect() 
    #print "sumwTmCollection = " + str(sumwTmCollection) 
    #cnumeratorCollection =cnumerator.collectAsMap().get(0).items 
    print "cnumeratorCollection = " + str(cnumeratorCollection) 
    for i in range(len(newCentroids)): 
     newCentroids[i] = scalarMult(1/sumwTmCollection[i], [cnumeratorCollection[i]]) 
    centroids = newCentroids 
    # Test for convergence 
    convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon) 

    #convergence = True 
    # Replace our old centroids with the newly found centroids and repeat if convergence not met 
    # Clear out space for a new set of centroids 
    newCentroids = [[] for k in range(K)] 

Questo programma funziona abbastanza bene sulla mia macchina locale, tuttavia, non si comporta come previsto quando viene eseguito su un cluster autonomo. Non genera necessariamente un errore, ma ciò che fa produce un output diverso da quello che ricevo quando viene eseguito sul mio computer locale. Il cluster e i 3 nodi sembrano funzionare correttamente. Ho la sensazione che il problema è che continuo ad aggiornare centroids, che è un elenco python, e cambia ogni volta attraverso il while-loop. È possibile che ciascun nodo non abbia la copia più recente di quella lista? Penso di sì, quindi ho provato a utilizzare uno broadcast variable ma quelli non possono essere aggiornati (sola lettura). Ho anche provato ad usare un accumulator ma quelli sono solo per accumuli. Ho anche provato a salvare gli elenchi python come file su hdf per ciascun nodo a cui accedere, ma questo non ha funzionato bene. Pensi che sto capendo il problema correttamente? C'è qualcos'altro che potrebbe succedere qui? Come posso ottenere codice che funzioni bene sul mio computer locale, ma non su un cluster?

+0

scusa, non riesco a capire dove stai aggiornando i centroidi nel tuo codice .. potresti evidenziarlo per favore? Grazie – mgaido

+0

Grazie per aver guardato questo. È verso il fondo. 'centroids = newCentroids'. –

+0

Sarei più motivato a rispondere alla tua domanda se avessi ripulito/ridotto il tuo codice e, cosa ancora più importante, fornito un campione dei dati che il tuo script fornisce con il diverso output sul cluster. –

risposta

4

Grazie per tutto il tempo e l'attenzione a questo problema, soprattutto perché sembra che avrei potuto pubblicare più informazioni per rendere più facile il lavoro. Il problema qui è in

centroids = points.takeSample(False, K, 34) 

non mi rendo conto, ma dopo un breve esperimento, questa funzione restituisce ogni volta la stessa uscita, pur essendo quello che pensavo fosse un campione casuale. Finché utilizzi lo stesso seme (34 in questo caso), riceverai lo stesso RDD in cambio. Il RDD sul mio cluster era diverso per qualche motivo rispetto a quello restituito al mio computer locale. In ogni caso, dal momento che era lo stesso RDD ogni volta, il mio output non è mai cambiato. Il problema con i centroidi "casuali" che mi sono stati restituiti è che questi particolari hanno dato origine a qualcosa di simile a un punto di sella in matematica, dove non sarebbe stata trovata alcuna convergenza dei centroidi. Questa parte della risposta è matematica e di programmazione, quindi non ne parlerò più oltre. La mia vera speranza a questo punto è che gli altri sono aiutati dalla nozione che se si vuole

centroids = points.takeSample(False, K, 34) 

per la produzione di diversi campioni ogni volta che viene chiamato, che si cambia il seme ogni volta per un po 'di numeri casuali.

Spero che tutto questo sia d'aiuto. Non ho mai passato così tanto tempo in una soluzione alla mia memoria.

Grazie ancora.