Sto tentando di creare una funzione di aggregazione definita dall'utente (UDAF) in Java utilizzando Apache Spark SQL che restituisce più array al termine. Ho cercato online e non riesco a trovare esempi o suggerimenti su come farlo.Restituzione di più array dalla funzione di aggregazione definita dall'utente (UDAF) in Apache Spark SQL
Sono in grado di restituire un singolo array, ma non riesco a capire come ottenere i dati nel formato corretto nel metodo evaluate() per restituire più matrici.
L'UDAF funziona come posso stampare gli array nel metodo evaluate(), non riesco a capire come restituire tali matrici al codice chiamante (che viene mostrato di seguito come riferimento).
UserDefinedAggregateFunction customUDAF = new CustomUDAF();
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data");
ho incluso l'intero personalizzato di classe UDAF sotto, ma i metodi principali sono il tipo di dati() e valutare i metodi(), che vengono mostrati prima.
Qualsiasi aiuto o consiglio sarebbe molto apprezzato. Grazie.
public class CustomUDAF extends UserDefinedAggregateFunction {
@Override
public DataType dataType() {
// TODO: Is this the correct way to return 2 arrays?
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public Object evaluate(Row buffer) {
// Data conversion
List<Long> longList = new ArrayList<Long>(buffer.getList(0));
List<Double> dataList = new ArrayList<Double>(buffer.getList(1));
// Processing of data (omitted)
// TODO: How to get data into format needed to return 2 arrays?
return dataList;
}
@Override
public StructType inputSchema() {
return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType);
}
@Override
public StructType bufferSchema() {
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, new ArrayList<Long>());
buffer.update(1, new ArrayList<Double>());
}
@Override
public void update(MutableAggregationBuffer buffer, Row row) {
ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0));
longList.add(row.getLong(0));
ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1));
dataList.add(row.getDouble(1));
buffer.update(0, longList);
buffer.update(1, dataList);
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0));
longList.addAll(buffer2.getList(0));
ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1));
dataList.addAll(buffer2.getList(1));
buffer1.update(0, longList);
buffer1.update(1, dataList);
}
@Override
public boolean deterministic() {
return true;
}
}
Aggiornamento: Sulla base della risposta da zero323 sono stato in grado di tornare due matrici utilizzando:
return new Tuple2<>(longArray, dataArray);
Ottenere i dati da questo è stato un po 'di fatica, ma ha coinvolto decostruire la dataframe alle liste di Java e quindi a ricostruirle in un DataFrame.
Sto restituendo una tupla di (bassa, media, alta) come intereval di confidenza da un UDAF. C'è un modo per far esplodere questa tupla in più colonne, invece di avere '| key | [1.0,1.5,2.0] |' ottengo '| key | 1.0 | 1.5 | 2.0 |' – TomTom101
@ TomTom101 Se è un tupla (struct field) dovrebbe essere sufficiente la semplice selezione. – zero323
Sorprendentemente fa il trucco! Sono tornato ora una case case per una migliore leggibilità (avrei dovuto provarlo prima di postare). grazie! – TomTom101