Voglio configurare Flink in modo che possa trasformare e reindirizzare i flussi di dati da Apache Kafka a MongoDB. A scopo di test sto costruendo in cima all'esempio flink-streaming-connectors.kafka (https://github.com/apache/flink).Kafka -> Flink DataStream -> MongoDB
Gli stream Kafka sono correttamente rossi da Flink, posso mapparli ecc., Ma il problema si verifica quando voglio salvare ciascun messaggio ricevuto e trasformato in MongoDB. L'unico esempio che ho trovato sull'integrazione con MongoDB è flink-mongodb-test da github. Sfortunatamente utilizza l'origine dati statica (database), non il flusso di dati.
Credo che ci dovrebbe essere qualche implementazione DataStream.addSink per MongoDB, ma a quanto pare non c'è.
Quale sarebbe il modo migliore per raggiungerlo? Devo scrivere la funzione di sink personalizzata o forse mi manca qualcosa? Forse dovrebbe essere fatto in modo diverso?
Non sono legato a nessuna soluzione, quindi qualsiasi suggerimento sarebbe apprezzato.
Di seguito è riportato un esempio di cosa esattamente ottengo come input e cosa devo memorizzare come output.
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
Come si può vedere in questo esempio sto usando Flink per lo più per il messaggio flusso buffer di Kafka e un po 'di analisi di base.