2015-11-26 18 views
8

Sto tentando di creare una funzione di aggregazione definita dall'utente (UDAF) in Java utilizzando Apache Spark SQL che restituisce più array al termine. Ho cercato online e non riesco a trovare esempi o suggerimenti su come farlo.Restituzione di più array dalla funzione di aggregazione definita dall'utente (UDAF) in Apache Spark SQL

Sono in grado di restituire un singolo array, ma non riesco a capire come ottenere i dati nel formato corretto nel metodo evaluate() per restituire più matrici.

L'UDAF funziona come posso stampare gli array nel metodo evaluate(), non riesco a capire come restituire tali matrici al codice chiamante (che viene mostrato di seguito come riferimento).

UserDefinedAggregateFunction customUDAF = new CustomUDAF(); 
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data"); 

ho incluso l'intero personalizzato di classe UDAF sotto, ma i metodi principali sono il tipo di dati() e valutare i metodi(), che vengono mostrati prima.

Qualsiasi aiuto o consiglio sarebbe molto apprezzato. Grazie.

public class CustomUDAF extends UserDefinedAggregateFunction { 

    @Override 
    public DataType dataType() { 
     // TODO: Is this the correct way to return 2 arrays? 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public Object evaluate(Row buffer) { 
     // Data conversion 
     List<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     List<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 

     // Processing of data (omitted) 

     // TODO: How to get data into format needed to return 2 arrays? 
     return dataList; 
    } 

    @Override 
    public StructType inputSchema() { 
     return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType); 
    } 

    @Override 
    public StructType bufferSchema() { 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public void initialize(MutableAggregationBuffer buffer) { 
     buffer.update(0, new ArrayList<Long>()); 
     buffer.update(1, new ArrayList<Double>()); 
    } 

    @Override 
    public void update(MutableAggregationBuffer buffer, Row row) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     longList.add(row.getLong(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 
     dataList.add(row.getDouble(1)); 

     buffer.update(0, longList); 
     buffer.update(1, dataList); 
    } 

    @Override 
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0)); 
     longList.addAll(buffer2.getList(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1)); 
     dataList.addAll(buffer2.getList(1)); 

     buffer1.update(0, longList); 
     buffer1.update(1, dataList); 
    } 

    @Override 
    public boolean deterministic() { 
     return true; 
    } 
} 

Aggiornamento: Sulla base della risposta da zero323 sono stato in grado di tornare due matrici utilizzando:

return new Tuple2<>(longArray, dataArray); 

Ottenere i dati da questo è stato un po 'di fatica, ma ha coinvolto decostruire la dataframe alle liste di Java e quindi a ricostruirle in un DataFrame.

risposta

5

Per quanto posso dire, la restituzione di una tupla dovrebbe essere sufficiente. In Scala:

import org.apache.spark.sql.expressions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.{Row, Column} 

object DummyUDAF extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType() 
    .add("buff", ArrayType(LongType)) 
    .add("buff2", ArrayType(DoubleType)) 
    def dataType = new StructType() 
    .add("xs", ArrayType(LongType)) 
    .add("ys", ArrayType(DoubleType)) 
    def deterministic = true 
    def initialize(buffer: MutableAggregationBuffer) = {} 
    def update(buffer: MutableAggregationBuffer, input: Row) = {} 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {} 
    def evaluate(buffer: Row) = (Array(1L, 2L, 3L), Array(1.0, 2.0, 3.0)) 
} 

val df = sc.parallelize(Seq(("a", 1), ("b", 2))).toDF("k", "v") 
df.select(DummyUDAF($"k")).show(1, false) 

// +---------------------------------------------------+ 
// |(DummyUDAF$(k),mode=Complete,isDistinct=false)  | 
// +---------------------------------------------------+ 
// |[WrappedArray(1, 2, 3),WrappedArray(1.0, 2.0, 3.0)]| 
// +---------------------------------------------------+ 
+0

Sto restituendo una tupla di (bassa, media, alta) come intereval di confidenza da un UDAF. C'è un modo per far esplodere questa tupla in più colonne, invece di avere '| key | [1.0,1.5,2.0] |' ottengo '| key | 1.0 | 1.5 | 2.0 |' – TomTom101

+0

@ TomTom101 Se è un tupla (struct field) dovrebbe essere sufficiente la semplice selezione. – zero323

+0

Sorprendentemente fa il trucco! Sono tornato ora una case case per una migliore leggibilità (avrei dovuto provarlo prima di postare). grazie! – TomTom101