2014-12-30 20 views
7

Uso la scintilla con cassandra e ho un JavaRDD<String> di client. E per ogni cliente, voglio scegliere tra Cassandra sue interazioni in questo modo:JavaSparkContext non serializzabile

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() { 
     @Override 
     public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {    
      List<InteractionByMonthAndCustomer> b = javaFunctions(sc) 
        .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer") 
        .where("ctid =?", s) 
        .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() { 
         @Override 
         public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception { 
          return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"), 
            cassandraRow.getString("motif"), 
            cassandraRow.getDate("start"), 
            cassandraRow.getDate("end"), 
            cassandraRow.getString("ctid"), 
            cassandraRow.getString("month") 
          ); 
         } 
        }).collect(); 
      return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b); 
     } 
    }); 

Per questo sto utilizzando uno JavaSparkContext sc. Ma ho questo errore:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99) 
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
... 14 more 

Penso che JavaSparkContext deve essere serializzabile. Ma come posso renderlo serializzabile per favore?

Grazie.

risposta

12

No, JavaSparkContext non è serializzabile e non dovrebbe essere. Non può essere utilizzato in una funzione inviata ai lavoratori remoti. Qui non stai facendo esplicitamente riferimento ma un riferimento viene serializzato in ogni caso perché la tua funzione anonima di classe interna non è static e quindi ha un riferimento alla classe che lo include.

Provare a riscrivere il codice con questa funzione come static, oggetto autonomo.

0

Non è possibile utilizzare SparkContext e creare altri RDD da un esecutore (funzione mappa di un RDD).

È necessario creare il Cassandra RDD (sc.cassandraTable) nel driver e quindi eseguire un collegamento tra questi due RDD (client RDD e cassandra table RDD).

+0

È vero, il codice non dovrebbe funzionare in qualsiasi modo (Spark vieta la trasformazione all'interno trasformazione, ecc ..) –

0

dichiareremo con transient parola chiave:

private transient JavaSparkContext sparkContext;