2014-07-18 12 views
5

Sto scrivendo un programma per unire due file su un parametro comune usando spark-sql. Penso che il mio codice sia a posto, ma quando cerco di salvarlo come file di testo, ricevo degli errori. Sto mettendo il mio codice, come di seguito: -NullPointerException in spark-sql

import java.util.regex.Pattern; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.api.java.JavaSQLContext; 
import org.apache.spark.sql.api.java.JavaSchemaRDD; 



import java.io.Serializable; 


public class JoinCSV { 
    @SuppressWarnings("serial") 
    public static class CompleteSample implements Serializable { 
     private String ASSETNUM; 
     private String ASSETTAG; 
     private String CALNUM; 



     public String getASSETNUM() { 
      return ASSETNUM; 
     } 
     public void setASSETNUM(String aSSETNUM) { 
      ASSETNUM = aSSETNUM; 
     } 
     public String getASSETTAG() { 
      return ASSETTAG; 
     } 
     public void setASSETTAG(String aSSETTAG) { 
      ASSETTAG = aSSETTAG; 
     } 
     public String getCALNUM() { 
      return CALNUM; 
     } 
     public void setCALNUM(String cALNUM) { 
      CALNUM = cALNUM; 
     } 


     } 

    @SuppressWarnings("serial") 
    public static class ExtendedSample implements Serializable { 

     private String ASSETNUM; 
     private String CHANGEBY; 
     private String CHANGEDATE; 


     public String getASSETNUM() { 
      return ASSETNUM; 
     } 
     public void setASSETNUM(String aSSETNUM) { 
      ASSETNUM = aSSETNUM; 
     } 
     public String getCHANGEBY() { 
      return CHANGEBY; 
     } 
     public void setCHANGEBY(String cHANGEBY) { 
      CHANGEBY = cHANGEBY; 
     } 
     public String getCHANGEDATE() { 
      return CHANGEDATE; 
     } 
     public void setCHANGEDATE(String cHANGEDATE) { 
      CHANGEDATE = cHANGEDATE; 
     } 
    } 

    private static final Pattern comma = Pattern.compile(","); 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv"; 
     String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv"; 

      JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL"); 
      JavaSQLContext sqlCtx = new JavaSQLContext(ctx); 

      JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
        new Function<String, CompleteSample>() { 
        public CompleteSample call(String line) throws Exception { 
         String[] parts = line.split(","); 

         CompleteSample cs = new CompleteSample(); 
         cs.setASSETNUM(parts[0]); 
         cs.setASSETTAG(parts[1]); 
         cs.setCALNUM(parts[2]); 

         return cs; 
        } 
        }); 

      JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
        new Function<String, ExtendedSample>() { 
        public ExtendedSample call(String line) throws Exception { 
         String[] parts = line.split(","); 

         ExtendedSample es = new ExtendedSample(); 
         es.setASSETNUM(parts[0]); 
         es.setCHANGEBY(parts[1]); 
         es.setCHANGEDATE(parts[2]); 

         return es; 
        } 
        }); 

      JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class); 
      complete.registerAsTable("cs"); 

      JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class); 
      extended.registerAsTable("es"); 

      JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;"); 
      fs.saveAsTextFile("result");     //Here I am getting error 
    } 

} 

ed i miei errori sono le seguenti: -

14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException 
      java.lang.ProcessBuilder.start(Unknown Source) 
      org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
      org.apache.hadoop.util.Shell.run(Shell.java:379) 
      org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
      org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
------------ 
------------ 

e

14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path 
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 
     at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) 
     at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) 
     at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293) 
     at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) 
     at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362) 
     at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) 
     at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) 
----------------- 
----------------- 

Questo secondo errore sta arrivando ovunque se sto usando scintilla , spark-sql o spark-streaming. Non ho idea di cosa sia questo errore. Ma sembra che questo secondo errore non abbia alcun impatto sul codice, perché anche dopo questo errore i risultati vengono utilizzati correttamente. Ma è ancora molto irritante vedere un errore sconosciuto ogni volta che si esegue un programma.

Qualcuno può aiutarmi a comprendere la questione per favore? Sono bloccato con questo molto male. Grazie

+0

Hai lo stesso errore su Linux? –

+0

No, qui sto cercando di utilizzare "saveAsTextFile" per salvare il file sul file system locale su Windows OS. Qui per il file system locale nessuna delle opzioni "saveAs" funziona. Tuttavia queste opzioni funzionano perfettamente mentre si salvano i file su hdf. –

+1

Mi sembra un problema specifico di Windows. Non so come risolverlo, ma se si tratta solo di un salvataggio locale, è possibile aggirare il problema. Recupera i dati con 'RDD.collect()', quindi salvali tramite un normale 'FileOutputStream' di Java. –

risposta

7

C'è una soluzione per l'errore rdd.saveAsTextFile() su Windows. Corregge sia gli errori SparkException e IOException che stavo anche sperimentando con Spark v1.1.0 su Windows 8.1 in modalità locale.

http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

qui sono i passi da quel link:

1) download compiled winutils.exe;

2) inserisci da qualche parte come c:\winutil\bin;

3) aggiungere questa riga di codice: System.setProperty("hadoop.home.dir", "c:\\winutil\\")

Spero che questo funziona per voi.

+1

Grazie mille Dylan per aver fornito questa fantastica soluzione. Ho ignorato il messaggio WARN winutils che non è correlato con saveTextAsFile. Ora che ho seguito i tuoi passi, la mia applicazione ha funzionato senza intoppi fino alla fine. – florins

+0

@florins no worries, felice ha funzionato per voi. –