2015-10-29 20 views
13

In Apache Flink ho un flusso di tuple. Supponiamo che sia davvero semplice Tuple1<String>. La tupla può avere un valore arbitrario nel suo campo valore (ad esempio "P1", "P2", ecc.). L'insieme di valori possibili è finito ma non conosco il set completo in anticipo (quindi potrebbe esserci un 'P362'). Voglio scrivere quella tupla su una certa posizione di uscita a seconda del valore all'interno della tupla. Quindi ad es. Mi piacerebbe avere la seguente struttura di file:Flink Streaming: come generare un flusso di dati su diverse uscite a seconda dei dati?

  • /output/P1
  • /output/P2

Nella documentazione ho trovato solo le possibilità di scrivere in posizioni che conosco in anticipo (ad esempio stream.writeCsv("/output/somewhere")), ma nessun modo di lasciare che il contenuto dei dati decida dove finiscono effettivamente i dati.

Ho letto sulla divisione di output nella documentazione ma questo non sembra fornire un modo per reindirizzare l'output verso destinazioni diverse nel modo in cui mi piacerebbe averlo (o semplicemente non capisco come funzionerebbe) .

Questo può essere fatto con l'API Flink, se sì, come? Se no, c'è forse una libreria di terze parti che può farlo o dovrei costruire una cosa del genere da sola?

Aggiornamento

Seguendo il suggerimento Matthias' mi si avvicinò con una funzione di dissipatore di vagliatura che determina il percorso di uscita e quindi scrive la tupla al rispettivo file dopo la serializzazione di esso. L'ho messo qui per riferimento, forse è utile per qualcun altro:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> { 

    private final OutputSelector<IT> outputSelector; 
    private final MapFunction<IT, String> serializationFunction; 
    private final String basePath; 
    Map<String, TextOutputFormat<String>> formats = new HashMap<>(); 

    /** 
    * @param outputSelector  the selector which determines into which output(s) a record is written. 
    * @param serializationFunction a function which serializes the record to a string. 
    * @param basePath    the base path for writing the records. It will be appended with the output selector. 
    */ 
    public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) { 
     this.outputSelector = outputSelector; 
     this.serializationFunction = serializationFunction; 
     this.basePath = basePath; 
    } 


    @Override 
    public void invoke(IT value) throws Exception { 
     // find out where to write. 
     Iterable<String> selection = outputSelector.select(value); 
     for (String s : selection) { 
      // ensure we have a format for this. 
      TextOutputFormat<String> destination = ensureDestinationExists(s); 
      // then serialize and write. 
      destination.writeRecord(serializationFunction.map(value)); 
     } 
    } 

    private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException { 
     // if we know the destination, we just return the format. 
     if (formats.containsKey(selection)) { 
      return formats.get(selection); 
     } 

     // create a new output format and initialize it from the context. 
     TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection)); 
     StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); 
     format.configure(context.getTaskStubParameters()); 
     format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); 

     // put it into our map. 
     formats.put(selection, format); 
     return format; 
    } 

    @Override 
    public void close() throws IOException { 
     Exception lastException = null; 
     try { 
      for (TextOutputFormat<String> format : formats.values()) { 
       try { 
        format.close(); 
       } catch (Exception e) { 
        lastException = e; 
        format.tryCleanupOnError(); 
       } 
      } 
     } finally { 
      formats.clear(); 
     } 

     if (lastException != null) { 
      throw new IOException("Close failed.", lastException); 
     } 
    } 
} 

risposta

6

È possibile implementare un sink personalizzato. Ereditare da uno dei due:

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

Nel vostro programma di utilizzo:

stream.addSink(SinkFunction<T> sinkFunction); 

invece di stream.writeCsv("/output/somewhere").

+4

Grazie! Ho controllato l'implementazione di 'FileSinkFunction' e ho trovato qualcosa di simile per conto mio. Ho aggiunto l'implementazione alla mia domanda come riferimento. –