2016-04-06 11 views
7

Ho scritto un metodo che deve considerare un numero casuale per simulare una distribuzione di Bernoulli. Sto usando random.nextDouble per generare un numero compreso tra 0 e 1, quindi prendere la mia decisione in base a quel valore dato il mio parametro di probabilità.Spark - Generazione di numeri casuali

Il mio problema è che Spark sta generando gli stessi numeri casuali all'interno di ogni iterazione della mia funzione di mappatura del ciclo. Sto usando l'API DataFrame. Il mio codice segue questo formato:

val myClass = new MyClass() 
val M = 3 
val myAppSeed = 91234 
val rand = new scala.util.Random(myAppSeed) 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 

Qui è la classe:

class myClass extends Serializable { 
    val q = qProb 

    def myMethod(s: String, rand: Double) = { 
    if (rand <= q) // do something 
    else // do something else 
    } 
} 

ho bisogno di un nuovo numero casuale ogni volta che viene chiamato myMethod. Ho provato anche la generazione del numero dentro il mio metodo con java.util.Random (scala.util.Random V10 non estendere Serializable) come qui di seguito, ma sto ancora ricevendo gli stessi numeri all'interno di ogni ciclo for

val r = new java.util.Random(s.hashCode.toLong) 
val rand = r.nextDouble() 

Ho fatto qualche ricerca, e sembra che questo abbia a che fare con la natura deterministica di Sparks.

risposta

2

Il motivo per cui la stessa sequenza viene ripetuta è che il generatore casuale viene creata e inizializzata con un seme prima che i dati è partizionato. Ogni partizione quindi inizia dallo stesso seme casuale. Forse non è il modo più efficace per farlo, ma il seguente dovrebbe funzionare:

val myClass = new MyClass() 
val M = 3 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{ 
     val rand = scala.util.Random 
     row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 
+0

ho modificato questo un po 'per risolvere il mio problema . Ho passato la val di Random nel mio metodo e ho generato numeri casuali da lì dentro. Questo ha risolto il mio problema, ma ho dovuto usare 'java.util.Random' per motivi di serializzabilità. –

4

usa la funzione SQL rand:

import org.apache.spark.sql.functions._ 

//df: org.apache.spark.sql.DataFrame = [key: int] 

df.select($"key", rand() as "rand").show 
+---+-------------------+ 
|key|    rand| 
+---+-------------------+ 
| 1| 0.8635073400704648| 
| 2| 0.6870153659986652| 
| 3|0.18998048357873532| 
+---+-------------------+ 


df.select($"key", rand() as "rand").show 
+---+------------------+ 
|key|    rand| 
+---+------------------+ 
| 1|0.3422484248879837| 
| 2|0.2301384925817671| 
| 3|0.6959421970071372| 
+---+------------------+ 
+0

questo non riusciva a risolvere il mio problema, ma la sua una soluzione elegante che io probabilmente utilizzerò in futuro, in modo da +1 –

2

Secondo this post, la soluzione migliore non è quello di mettere il new scala.util.Random all'interno della mappa, né completamente al di fuori (cioè nel codice del driver.), ma in un intermedio mapPartitionsWithIndex:

import scala.util.Random 
val myAppSeed = 91234 
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) => 
    val rand = new scala.util.Random(indx+myAppSeed) 
    iter.map(x => (x, Array.fill(10)(rand.nextDouble))) 
}