2016-02-02 29 views
7

Voglio configurare Flink in modo che possa trasformare e reindirizzare i flussi di dati da Apache Kafka a MongoDB. A scopo di test sto costruendo in cima all'esempio flink-streaming-connectors.kafka (https://github.com/apache/flink).Kafka -> Flink DataStream -> MongoDB

Gli stream Kafka sono correttamente rossi da Flink, posso mapparli ecc., Ma il problema si verifica quando voglio salvare ciascun messaggio ricevuto e trasformato in MongoDB. L'unico esempio che ho trovato sull'integrazione con MongoDB è flink-mongodb-test da github. Sfortunatamente utilizza l'origine dati statica (database), non il flusso di dati.

Credo che ci dovrebbe essere qualche implementazione DataStream.addSink per MongoDB, ma a quanto pare non c'è.

Quale sarebbe il modo migliore per raggiungerlo? Devo scrivere la funzione di sink personalizzata o forse mi manca qualcosa? Forse dovrebbe essere fatto in modo diverso?

Non sono legato a nessuna soluzione, quindi qualsiasi suggerimento sarebbe apprezzato.

Di seguito è riportato un esempio di cosa esattamente ottengo come input e cosa devo memorizzare come output.

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String) 
Apache Kafka Broker --------------> Flink: DataStream<String> 

Flink: DataStream.map({ 
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD") 
}) 
.rebalance() 
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection 

Come si può vedere in questo esempio sto usando Flink per lo più per il messaggio flusso buffer di Kafka e un po 'di analisi di base.

risposta

3

Attualmente in Flink non è disponibile alcun sink Streaming MongoDB.

Tuttavia, ci sono due modi per la scrittura di dati in MongoDB:

  • Usare il DataStream.write() chiamata di Flink. Ti consente di utilizzare qualsiasi OutputFormat (dall'API Batch) con lo streaming. Usando HadoopOutputFormatWrapper di Flink, puoi utilizzare il connettore ufficiale Hadoop di MongoDB

  • Implementare autonomamente il lavello. Implementare i sink è abbastanza semplice con l'API Streaming, e sono sicuro che MongoDB ha una buona libreria Java Client.

Entrambi gli approcci non forniscono alcuna garanzia di elaborazione sofisticata. Tuttavia, quando si utilizza Flink con Kafka (e il checkpoint abilitato) si avrà almeno una volta semantica: in un caso di errore, i dati vengono nuovamente inviati in streaming al sink MongoDB. Se si stanno eseguendo aggiornamenti idempotenti, la ripetizione di questi aggiornamenti non dovrebbe causare incongruenze.

Se hai davvero bisogno di una sola semantica per MongoDB, dovresti probabilmente creare un file JIRA in Flink e discutere con la comunità su come implementarlo.

2

In alternativa alla risposta di Robert Metzger, è possibile scrivere di nuovo i risultati su Kafka e quindi utilizzare uno dei connettori kafka mantenuti per eliminare il contenuto di un argomento all'interno del database MongoDB.

Kafka -> Flink -> Kafka -> Mongo/Tutto

Con questo approccio è possibile mantenere la behaivour "at-almeno-una volta la semantica".