2016-02-09 17 views
10

Perché la corrispondenza del modello in Spark non funziona come in Scala? Vedi esempio sotto ... la funzione f() tenta di eseguire lo schema di corrispondenza sulla classe, che funziona nel REPL di Scala, ma fallisce in Spark e restituisce tutti "???". f2() è una soluzione alternativa che ottiene il risultato desiderato in Spark utilizzando .isInstanceOf(), ma capisco che sia una cattiva forma in Scala.uguaglianza della classe caso in Apache Spark

Qualsiasi aiuto sul modello che corrisponda al modo corretto in questo scenario in Spark sarebbe molto apprezzato.

abstract class a extends Serializable {val a: Int} 
case class b(a: Int) extends a 
case class bNull(a: Int=0) extends a 

val x: List[a] = List(b(0), b(1), bNull()) 
val xRdd = sc.parallelize(x) 

tentativo di pattern matching che lavora a Scala REPL, ma non riesce a Spark

def f(x: a) = x match { 
    case b(n) => "b" 
    case bNull(n) => "bnull" 
    case _ => "???" 
} 

soluzione che funzioni in Spark, ma è di cattivo gusto (credo)

def f2(x: a) = { 
    if (x.isInstanceOf[b]) { 
     "b" 
    } else if (x.isInstanceOf[bNull]) { 
     "bnull" 
    } else { 
     "???" 
    } 
} 

Visualizza risultati

xRdd.map(f).collect     //does not work in Spark 
             // result: Array("???", "???", "???") 
xRdd.map(f2).collect     // works in Spark 
             // resut: Array("b", "b", "bnull") 
x.map(f(_))       // works in Scala REPL  
             // result: List("b", "b", "bnull") 

Versioni utilizzati ... Risultati scintilla eseguiti in scintilla-shell (Spark 1.6 su AWS EMR-4.3) Scala REPL in SBT 0.13.9 (Scala 2.10.5)

risposta

15

Questo è un problema noto con Spark REPL. Puoi trovare maggiori dettagli in SPARK-2620. Influisce su più operazioni in Spark REPL che include la maggior parte delle trasformazioni su PairwiseRDDs. Per esempio:

case class Foo(x: Int) 

val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2)) 
foos.distinct.size 
// Int = 2 

val foosRdd = sc.parallelize(foos, 4) 
foosRdd.distinct.count 
// Long = 4 

foosRdd.map((_, 1)).reduceByKey(_ + _).collect 
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1)) 

foosRdd.first == foos.head 
// Boolean = false 

Foo.unapply(foosRdd.first) == Foo.unapply(foos.head) 
// Boolean = true 

Ciò che rende ancora peggio è che i risultati dipendono dalla distribuzione dei dati:

sc.parallelize(foos, 1).distinct.count 
// Long = 2 

sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect 
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2)) 

La cosa più semplice che si può fare è quello di definire e pacchetto di classi case richieste al di fuori REPL. Qualsiasi codice inviato direttamente utilizzando spark-submit dovrebbe funzionare anche.

In Scala 2.11+ è possibile creare un pacchetto direttamente in REPL con paste -raw.

scala> :paste -raw 
// Entering paste mode (ctrl-D to finish) 

package bar 

case class Bar(x: Int) 


// Exiting paste mode, now interpreting. 

scala> import bar.Bar 
import bar.Bar 

scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect 
res1: Array[bar.Bar] = Array(Bar(1), Bar(2)) 
+0

Grazie zero323! Vedo menzionare il pattern matching che non funziona nello spark shell, ma non ci sono dettagli ... stai dicendo che se definisco le mie case class in un jar, sarò in grado di eseguire il pattern match su di loro nel REPL? Grazie ancora! – kmh

+1

Esattamente. Definisci all'esterno, crea jar, includi in 'CLASSPATH' e importa. – zero323

+0

Perfetto! Grazie ancora! – kmh