5

Sto affrontando un problema che non riesco a superare da anni.Spark DataFrame che non rispetta lo schema e considera tutto come String

1) Sono su Spark 1.4 e Scala 2.10. Non posso aggiornare in questo momento (grande infrastruttura distribuita)

2) Ho un file con poche centinaia di colonne, solo 2 delle quali sono stringa e resto tutte lunghe. Voglio convertire questi dati in un dataframe Label/Features.

3) Sono stato in grado di farlo in formato LibSVM.

4) Non riesco a inserirlo in un formato Label/Features.

Il motivo è

a) Io non sono in grado di utilizzare il todf() come mostrato qui https://spark.apache.org/docs/1.5.1/ml-ensembles.html

dati val = MLUtils.loadLibSVMFile (sc, "Dati/mllib/sample_libsvm_data.txt ") .toDF()

in quanto non supportato in 1,4

b) Quindi, prima ho convertito la txtfile in un dataframe in cui ho usato qualcosa di simile

def getColumnDType(columnName:String):StructField = { 

     if((columnName== "strcol1") || (columnName== "strcol2")) 
      return StructField(columnName, StringType, false) 
     else 
      return StructField(columnName, LongType, false) 
    } 
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = { 
     val sfRDD = sc.textFile(staticfeatures_filepath)// 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     // reads a space delimited string from application.properties file 
     val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("") 

     // Generate the schema based on the string of schema 
     val schema = 
      StructType(
      schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) 

     val data = sfRDD 
     .map(line => line.split(",")) 
     .map(p => Row.fromSeq(p.toSeq)) 

     var df = sqlContext.createDataFrame(data, schema) 

     //schemaString.split(" ").drop(4) 
     //.map(s => df = convertColumn(df, s, "int")) 

     return df 
    } 

Quando faccio un df.na.drop() df.printSchema() con questo restituito dataframe ricevo perfetto schema Ti piace questa

root 
|-- rand_entry: long (nullable = false) 
|-- strcol1: string (nullable = false) 
|-- label: long (nullable = false) 
|-- strcol2: string (nullable = false) 
|-- f1: long (nullable = false) 
|-- f2: long (nullable = false) 
|-- f3: long (nullable = false) 
and so on till around f300 

Ma - la parte triste è tutto quello che cerco di fare (vedi sotto) con il DF, sto ottenendo sempre un ClassCastException (java.lang.String non può essere lanciato a java.lang.Long)

val featureColumns = Array("f1","f2",....."f300") 
assertEquals(-99,df.select("f1").head().getLong(0)) 
assertEquals(-99,df.first().get(4)) 
val transformeddf = new VectorAssembler() 
     .setInputCols(featureColumns) 
     .setOutputCol("features") 
     .transform(df) 

Così - il cattivo è - anche se lo schema dice lunga - il df sta ancora valutando internamente tutto come stringa.

Ragazzi, per favore, aiutatemi a superare questo?

saluti

EDIT 1

Aggiunta di un semplice esempio

che ho un file in questo

1, A, 20, P, -99,1,0,0 , 8,1,1,1,1,131153

1, B, 23, P, -99,0,1,0,7,1,1,0,1,65543

1, C, 24, P , -99,0,1,0,9,1,1,1,1,262149

1, D, 7, P, -99,0,0,0,8,1,1,1, 1,458759

E

sf-schema = f0 strCol1 f1 strCol2 F2 F3 F4 f5 f6 f7 f8 f9 f10 f11 (nomi di colonna in realtà non importa in modo da poter ignorare questo dettaglio)

Tutto Sto cercando di creare un Label/Features tipo di dataframe in cui la mia terza colonna diventa un'etichetta e le colonne dalla quinta all'undicesima diventano una colonna [Vector]. Tale che io possa seguire il resto dei passaggi in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles.

ho gettato le colonne anche come suggerito da zero323

val types = Map("strCol1" -> "string", "strCol2" -> "string") 
     .withDefault(_ => "bigint") 
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*) 
df = df.drop("f0") 
df = df.drop("strCol1") 
df = df.drop("strCol2") 

Ma l'asserzione e VectorAssembler non riesce ancora. featureColumns = Array ("F2", "F3", ..... "F11") Questa è tutta la sequenza che voglio fare dopo ho il mio df

traccia
var transformeddf = new VectorAssembler() 
    .setInputCols(featureColumns) 
    .setOutputCol("features") 
    .transform(df) 

    transformeddf.show(2) 

    transformeddf = new StringIndexer() 
    .setInputCol("f1") 
    .setOutputCol("indexedF1") 
    .fit(transformeddf) 
    .transform(transformeddf) 

    transformeddf.show(2) 

    transformeddf = new VectorIndexer() 
    .setInputCol("features") 
    .setOutputCol("indexedFeatures") 
    .setMaxCategories(5) 
    .fit(transformeddf) 
    .transform(transformeddf) 

L'eccezione da ScalaIDE - solo quando si colpisce il VectorAssembler è come sotto

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long 
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) 
    at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117) 
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) 
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) 
    at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436) 
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56) 
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72) 
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70) 
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960) 
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

risposta

5

si ottiene ClassCastException perché questo è esattamente ciò che dovrebbe accadere. L'argomento schema non viene utilizzato per il cast automatico (alcuni DataSources possono utilizzare lo schema in questo modo, ma non metodi come createDataFrame). Dichiara solo quali sono i tipi di valori che sono memorizzati nelle righe. È tua responsabilità passare dati che corrispondono allo schema, non viceversa.

Mentre DataFrame mostra lo schema che hai dichiarato è convalidato solo in runtime, da cui l'eccezione di runtime.Se desideri trasformare i dati in specifici hai dati cast esplicitamente.

  1. Prima leggi tutte le colonne come StringType:

    val rows = sc.textFile(staticfeatures_filepath) 
        .map(line => Row.fromSeq(line.split(","))) 
    
    val schema = StructType(
        schemaString.split(" ").map(
        columnName => StructField(columnName, StringType, false) 
    ) 
    ) 
    
    val df = sqlContext.createDataFrame(rows, schema) 
    
  2. successiva gettata colonne selezionate tipo desiderato:

    import org.apache.spark.sql.types.{LongType, StringType} 
    
    val types = Map("strcol1" -> StringType, "strcol2" -> StringType) 
        .withDefault(_ => LongType) 
    
    val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*) 
    
  3. Uso assemblatore:

    val transformeddf = new VectorAssembler() 
        .setInputCols(featureColumns) 
        .setOutputCol("features") 
        .transform(casted) 
    

Si può semplicemente i passaggi 1 e 2 utilizzando spark-csv:

// As originally 
val schema = StructType(
    schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) 


val df = sqlContext 
    .read.schema(schema) 
    .format("com.databricks.spark.csv") 
    .option("header", "false") 
    .load(staticfeatures_filepath) 

Vedi anche Correctly reading the types from file in PySpark

+0

Hmm. Anche l'asserzione non funziona. Diciamo che ho un semplice file come questo 1, A, 20, P, -99,1,0,0,8,1,1,1,1111153 \ n 1, B, 23, P, -99, 0,1,0,7,1,1,0,1,65543 \ n 1, C, 24, P, -99,0,1,0,9,1,1,1,1,262149 \ n 1, D, 7, P, -99,0,0,0,8,1,1,1,1,458759 \ n. (4 righe fornite ad es.) Tutto quello che sto cercando di fare è ottenere la 3a colonna come etichetta e la 5a e l'11a colonna come caratteristiche in un tipo di etichetta/caratteristiche di dataframe in modo da poter seguire i passaggi menzionati in https: // spark. apache.org/docs/latest/ml-classification-regression.html#tree-ensembles e sembra così difficile !!! . C'è una via d'uscita? Testato l'esempio precedente - stesso errore. – Run2

+0

zero323 Ho aggiunto le informazioni nella domanda. L'errore è sempre lo stesso anche dopo aver usato i passaggi 1 e 2 che hai citato. Spero di sentirti presto. Tornerà a controllare dopo un po 'di tempo. – Run2

+0

Ho appena testato questo live su 1.4.0 e i tuoi dati e funziona perfettamente. Si prega di ricontrollare il codice/le impostazioni. – zero323