7

Sto provando a utilizzare il connettore spark-cassandra tramite spark-shell su dataproc, tuttavia non riesco a connettermi al mio cluster. Sembra che ci sia una mancata corrispondenza della versione poiché il classpath include una versione molto più vecchia di guava da qualche altra parte, anche quando specificherò la versione corretta all'avvio. Sospetto che ciò sia probabilmente causato da tutte le dipendenze Hadoop inserite nel classpath per impostazione predefinita.Versione guava durante l'utilizzo della shell di accensione

C'è comunque un modo per utilizzare solo la versione corretta di guava, senza eliminare tutti i dataproc relativi a Hadoop inclusi?

dati rilevanti:

partire scintilla scocca, mostrandola avere la versione corretta di Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml 
com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency 
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 
     confs: [default] 
     found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central 
     found org.apache.cassandra#cassandra-clientutil;2.2.2 in central 
     found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central 
     found io.netty#netty-handler;4.0.27.Final in central 
     found io.netty#netty-buffer;4.0.27.Final in central 
     found io.netty#netty-common;4.0.27.Final in central 
     found io.netty#netty-transport;4.0.27.Final in central 
     found io.netty#netty-codec;4.0.27.Final in central 
     found com.codahale.metrics#metrics-core;3.0.2 in central 
     found org.slf4j#slf4j-api;1.7.5 in central 
     found org.apache.commons#commons-lang3;3.3.2 in central 
     found com.google.guava#guava;16.0.1 in central 
     found org.joda#joda-convert;1.2 in central 
     found joda-time#joda-time;2.3 in central 
     found com.twitter#jsr166e;1.1.0 in central 
     found org.scala-lang#scala-reflect;2.10.5 in central 
:: resolution report :: resolve 502ms :: artifacts dl 10ms 
     :: modules in use: 
     com.codahale.metrics#metrics-core;3.0.2 from central in [default] 
     com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 from central in [default] 
     com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 from central in [default] 
     com.google.guava#guava;16.0.1 from central in [default] 
     com.twitter#jsr166e;1.1.0 from central in [default] 
     io.netty#netty-buffer;4.0.27.Final from central in [default] 
     io.netty#netty-codec;4.0.27.Final from central in [default] 
     io.netty#netty-common;4.0.27.Final from central in [default] 
     io.netty#netty-handler;4.0.27.Final from central in [default] 
     io.netty#netty-transport;4.0.27.Final from central in [default] 
     joda-time#joda-time;2.3 from central in [default] 
     org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default] 
     org.apache.commons#commons-lang3;3.3.2 from central in [default] 
     org.joda#joda-convert;1.2 from central in [default] 
     org.scala-lang#scala-reflect;2.10.5 from central in [default] 
     org.slf4j#slf4j-api;1.7.5 from central in [default] 
     --------------------------------------------------------------------- 
     |     |   modules   || artifacts | 
     |  conf  | number| search|dwnlded|evicted|| number|dwnlded| 
     --------------------------------------------------------------------- 
     |  default  | 16 | 0 | 0 | 0 || 16 | 0 | 
     --------------------------------------------------------------------- 
:: retrieving :: org.apache.spark#spark-submit-parent 
     confs: [default] 
     0 artifacts copied, 16 already retrieved (0kB/12ms) 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 1.5.2 
     /_/ 

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_66-internal) 
Type in expressions to have them evaluated. 
Type :help for more information. 
15/12/10 17:38:46 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 
Spark context available as sc. 

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used 
ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used 
15/12/10 17:38:54 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
15/12/10 17:38:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 
SQL context available as sqlContext. 

dello stack quando fare connessione iniziale:

java.io.IOException: Failed to open native connection to Cassandra at {10.240.0.7}:9042 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) 
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) 
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) 
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) 
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249) 
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) 
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1125) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) 
    at $iwC$$iwC$$iwC.<init>(<console>:53) 
    at $iwC$$iwC.<init>(<console>:55) 
    at $iwC.<init>(<console>:57) 
    at <init>(<console>:59) 
    at .<init>(<console>:63) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825) 
    at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) 
    at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) 
    at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) 
    at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) 
    at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/Listenab 
leFuture; 
     at com.datastax.driver.core.Connection.initAsync(Connection.java:178) 
     at com.datastax.driver.core.Connection$Factory.open(Connection.java:742) 
     at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:240) 
     at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:187) 
     at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) 
     at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393) 
     at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402) 
     at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155) 
     ... 70 more 

risposta

6

Purtroppo, Hadoop di la dipendenza da Guava 11 (che non ha il metodo Futures.withFallback menzionato) è un longstanding issue e in effetti Hadoop 2.7.1 still depends on Guava 11.

nucleo Spark utilizza Guava 14, come può essere seen here ma questo è lavorato intorno ombreggiando Guava all'interno del gruppo Spark:

$ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures 
org/spark-project/guava/util/concurrent/Futures$1.class 
org/spark-project/guava/util/concurrent/Futures$2.class 
org/spark-project/guava/util/concurrent/Futures$3.class 
org/spark-project/guava/util/concurrent/Futures$4.class 
org/spark-project/guava/util/concurrent/Futures$5.class 
org/spark-project/guava/util/concurrent/Futures$6.class 
org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class 
org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class 
org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class 
org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class 
org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class 
org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class 
org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class 
org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class 
org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class 
org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class 
org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class 
org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class 
org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class 
org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class 
org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class 
org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class 
org/spark-project/guava/util/concurrent/Futures.class 

$ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures 
Compiled from "Futures.java" 
public final class org.spark-project.guava.util.concurrent.Futures { 
    public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture<V>, com.google.common.base.Function<java.lang.Exception, X>); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFuture(V); 
    public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateCheckedFuture(V); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFailedFuture(java.lang.Throwable); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateCancelledFuture(); 
    public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateFailedCheckedFuture(X); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>, java.util.concurrent.Executor); 
    public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>); 
    public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>, java.util.concurrent.Executor); 
    public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>); 
    public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>, java.util.concurrent.Executor); 
    public static <I, O> java.util.concurrent.Future<O> lazyTransform(java.util.concurrent.Future<I>, com.google.common.base.Function<? super I, ? extends O>); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> dereference(org.spark-project.guava.util.concurrent.ListenableFuture<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...); 
    public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); 
    public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>); 
    public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>, java.util.concurrent.Executor); 
    public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, java.lang.Class<X>) throws X; 
    public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, long, java.util.concurrent.TimeUnit, java.lang.Class<X>) throws X; 
    public static <V> V getUnchecked(java.util.concurrent.Future<V>); 
    static {}; 
} 

è possibile seguire le istruzioni qui http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/ a fare anche da soli ombreggiatura durante la compilazione. Con spark-shell potresti essere in grado di farla franca con alcune modifiche in spark.driver.extraClassPath come menzionato here, sebbene le collisioni possano continuare a verificarsi in vari punti.

+0

È possibile farlo se creo/eseguo tutto da IDEA? – szu

0

Nel caso in cui si utilizza Gradle per creare un barattolo di grasso, è possibile utilizzare relocate nel shadowJar:

shadowJar{ 
    zip64 true 
    mergeServiceFiles() 

    relocate 'com.google', 'hidden.google' 
}