2016-02-11 14 views
5

Per cominciare sto usando scala 2.10.4 e l'esempio sopra è eseguito in Spark 1.6 (anche se dubito che Spark abbia qualcosa a che fare con questo, è solo un problema di serializzazione).Riflesso Scala con Serializzazione (su Spark) - Simboli non serializzabili

Quindi, ecco il mio problema: supponiamo di avere un tratto Base implementato da due classi B1 e B2. Ora ho un carattere generico che viene esteso da una serie di classi, una delle quali è su sottotipi di Base, ad es. (Qui tengo nozione di Spark di RDD, ma potrebbe essere qualcosa di diverso in realtà, non appena è serializzato; Qualcosa è solo il risultato, non importa che cosa effettivamente):

trait Foo[T] { def function(rdd: RDD[T]): Something } 
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... } 
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... } 
... 

Ora ho bisogno di un oggetto che avrà un RDD[T] (non assumere alcuna ambiguità, è solo una versione semplificata) e restituisce Something corrispondente al risultato della funzione corrispondente al tipo T. Ma dovrebbe funzionare anche per Array[T] con una strategia di fusione. Finora sembra che:

object Obj { 
    def compute[T: TypeTag](input: RDD[T]): Something = { 
     typeOf[T] match { 
     case t if t <:< typeOf[A] => 
      val foo = new Foo[T] 
      foo.function(rdd) 
     case t if t <:< typeOf[Array[A]] => 
      val foo = new Foo[A] 
      foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]]))) 
     case t if t <:< typeOf[Base] => 
      val foo = new Foo[T] 
      foo.function(rdd) 
     // here it gets ugly... 
     case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why? 
      val tt = getSubInfo[T](0) 
      val tpe = tt.tpe 
      val foo = new Foo[tpe.type] 
      foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]])) 
     } 
    } 

    // strategy to transform arrays of T into a T object when possible 
    private def mergeArray[T: TypeTag](a: Array[T]): T = ... 

    // extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though 
    private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... 
} 

Purtroppo, sembra funzionare bene su un computer locale, ma quando viene inviato a Spark (serializzato), ottengo un org.apache.spark.SparkException: Task not serializable con:

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol 
Serialization stack: 
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types) 
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) 

I avere una soluzione (abbastanza ovvia, elencare le possibilità), ma per la mia curiosità, c'è un modo per risolvere questo problema? E perché il simbolo non è serializzabile mentre i loro equivalenti nei manifesti erano?

Grazie per l'aiuto.

risposta

1

Tipo I tag generalmente sono serializzabili in scala ma, stranamente, non i tipi direttamente (questo è strano perché i typetag contengono simboli che non sono: - /).

Questo potrebbe fare quello che vuoi

// implicit constructor TypeTag parameter is serialized. 
abstract class TypeAware[T:TypeTag] extends Serializable { 
    def typ:Type = _typeCached 

    @transient 
    lazy val _typeCached:Type = typeOf[T] 
} 

trait Foo[T] extends Serializable { 
    def function(rdd: RDD[T]): Something {... impl here?...} 
    def typ:Type 
} 

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{ 
    def function(rdd: RDD[T]): Something {... impl here?...} 
} 
+0

Credo TypeApi contiene anche scala.reflect.internal.Symbols $ PackageClassSymbol quindi questo non funziona altrettanto bene – tribbloid

+0

Certamente funziona. (In scala 2.11) ecco la richiesta di fusione https://github.com/scala/scala/pull/3817 – user48956