Sto affrontando un problema che non riesco a superare da anni.Spark DataFrame che non rispetta lo schema e considera tutto come String
1) Sono su Spark 1.4 e Scala 2.10. Non posso aggiornare in questo momento (grande infrastruttura distribuita)
2) Ho un file con poche centinaia di colonne, solo 2 delle quali sono stringa e resto tutte lunghe. Voglio convertire questi dati in un dataframe Label/Features.
3) Sono stato in grado di farlo in formato LibSVM.
4) Non riesco a inserirlo in un formato Label/Features.
Il motivo è
a) Io non sono in grado di utilizzare il todf() come mostrato qui https://spark.apache.org/docs/1.5.1/ml-ensembles.html
dati val = MLUtils.loadLibSVMFile (sc, "Dati/mllib/sample_libsvm_data.txt ") .toDF()
in quanto non supportato in 1,4
b) Quindi, prima ho convertito la txtfile in un dataframe in cui ho usato qualcosa di simile
def getColumnDType(columnName:String):StructField = {
if((columnName== "strcol1") || (columnName== "strcol2"))
return StructField(columnName, StringType, false)
else
return StructField(columnName, LongType, false)
}
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
val sfRDD = sc.textFile(staticfeatures_filepath)//
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// reads a space delimited string from application.properties file
val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val data = sfRDD
.map(line => line.split(","))
.map(p => Row.fromSeq(p.toSeq))
var df = sqlContext.createDataFrame(data, schema)
//schemaString.split(" ").drop(4)
//.map(s => df = convertColumn(df, s, "int"))
return df
}
Quando faccio un df.na.drop() df.printSchema()
con questo restituito dataframe ricevo perfetto schema Ti piace questa
root
|-- rand_entry: long (nullable = false)
|-- strcol1: string (nullable = false)
|-- label: long (nullable = false)
|-- strcol2: string (nullable = false)
|-- f1: long (nullable = false)
|-- f2: long (nullable = false)
|-- f3: long (nullable = false)
and so on till around f300
Ma - la parte triste è tutto quello che cerco di fare (vedi sotto) con il DF, sto ottenendo sempre un ClassCastException (java.lang.String non può essere lanciato a java.lang.Long)
val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
Così - il cattivo è - anche se lo schema dice lunga - il df sta ancora valutando internamente tutto come stringa.
Ragazzi, per favore, aiutatemi a superare questo?
saluti
EDIT 1
Aggiunta di un semplice esempio
che ho un file in questo
1, A, 20, P, -99,1,0,0 , 8,1,1,1,1,131153
1, B, 23, P, -99,0,1,0,7,1,1,0,1,65543
1, C, 24, P , -99,0,1,0,9,1,1,1,1,262149
1, D, 7, P, -99,0,0,0,8,1,1,1, 1,458759
E
sf-schema = f0 strCol1 f1 strCol2 F2 F3 F4 f5 f6 f7 f8 f9 f10 f11 (nomi di colonna in realtà non importa in modo da poter ignorare questo dettaglio)
Tutto Sto cercando di creare un Label/Features tipo di dataframe in cui la mia terza colonna diventa un'etichetta e le colonne dalla quinta all'undicesima diventano una colonna [Vector]. Tale che io possa seguire il resto dei passaggi in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles.
ho gettato le colonne anche come suggerito da zero323
val types = Map("strCol1" -> "string", "strCol2" -> "string")
.withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")
Ma l'asserzione e VectorAssembler non riesce ancora. featureColumns = Array ("F2", "F3", ..... "F11") Questa è tutta la sequenza che voglio fare dopo ho il mio df
tracciavar transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
transformeddf.show(2)
transformeddf = new StringIndexer()
.setInputCol("f1")
.setOutputCol("indexedF1")
.fit(transformeddf)
.transform(transformeddf)
transformeddf.show(2)
transformeddf = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(5)
.fit(transformeddf)
.transform(transformeddf)
L'eccezione da ScalaIDE - solo quando si colpisce il VectorAssembler è come sotto
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Hmm. Anche l'asserzione non funziona. Diciamo che ho un semplice file come questo 1, A, 20, P, -99,1,0,0,8,1,1,1,1111153 \ n 1, B, 23, P, -99, 0,1,0,7,1,1,0,1,65543 \ n 1, C, 24, P, -99,0,1,0,9,1,1,1,1,262149 \ n 1, D, 7, P, -99,0,0,0,8,1,1,1,1,458759 \ n. (4 righe fornite ad es.) Tutto quello che sto cercando di fare è ottenere la 3a colonna come etichetta e la 5a e l'11a colonna come caratteristiche in un tipo di etichetta/caratteristiche di dataframe in modo da poter seguire i passaggi menzionati in https: // spark. apache.org/docs/latest/ml-classification-regression.html#tree-ensembles e sembra così difficile !!! . C'è una via d'uscita? Testato l'esempio precedente - stesso errore. – Run2
zero323 Ho aggiunto le informazioni nella domanda. L'errore è sempre lo stesso anche dopo aver usato i passaggi 1 e 2 che hai citato. Spero di sentirti presto. Tornerà a controllare dopo un po 'di tempo. – Run2
Ho appena testato questo live su 1.4.0 e i tuoi dati e funziona perfettamente. Si prega di ricontrollare il codice/le impostazioni. – zero323