10
C'è un modo per contare il numero di parole uniche in un flusso con Flink Streaming? I risultati sarebbero un flusso di numeri che continua ad aumentare.Come contare le parole uniche in un flusso?
C'è un modo per contare il numero di parole uniche in un flusso con Flink Streaming? I risultati sarebbero un flusso di numeri che continua ad aumentare.Come contare le parole uniche in un flusso?
È possibile risolvere il problema memorizzando tutte le parole che hai già visto. Con questa conoscenza puoi filtrare tutte le parole duplicate. Il resto può quindi essere conteggiato da un operatore di mappa con parallelismo 1
. Il seguente frammento di codice fa esattamente questo.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo")
// filter words out which we have already seen
val uniqueWords = inputStream.keyBy(x => x).filterWithState{
(word, seenWordsState: Option[Set[String]]) => seenWordsState match {
case None => (true, Some(HashSet(word)))
case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word))
}
}
// count the number of incoming (first seen) words
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{
(word, counterState: Option[Int]) =>
counterState match {
case None => (1, Some(1))
case Some(counter) => (counter + 1, Some(counter + 1))
}
}.setParallelism(1)
numberUniqueWords.print();
env.execute()
Può causare OOM o degrado delle prestazioni se il flusso in entrata è "infinito" e l'insieme di stringhe (in 'filterWithState') diventa troppo grande? –
Non se si sta utilizzando un backend di stato che supporta out-of-core. Il 'RocksDBStateBackend' è un backend di stato. Se si utilizza il backend dello stato della memoria, è necessario cancellare lo stato una volta ogni tanto, altrimenti è possibile eseguire OOM. –
Tuttavia una domanda, come comprendo le operazioni di salvataggio/ripristino da/verso il backend 'RocksDBStateBackend' in questo caso ha complessità O (N) dove N è il conteggio di elementi in set cioè questo backend salva/ripristina sempre tutti gli elementi di Set o solo elementi modificati? –