2016-03-31 17 views

risposta

7

È possibile risolvere il problema memorizzando tutte le parole che hai già visto. Con questa conoscenza puoi filtrare tutte le parole duplicate. Il resto può quindi essere conteggiato da un operatore di mappa con parallelismo 1. Il seguente frammento di codice fa esattamente questo.

val env = StreamExecutionEnvironment.getExecutionEnvironment 

val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo") 

// filter words out which we have already seen 
val uniqueWords = inputStream.keyBy(x => x).filterWithState{ 
    (word, seenWordsState: Option[Set[String]]) => seenWordsState match { 
    case None => (true, Some(HashSet(word))) 
    case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word)) 
    } 
} 

// count the number of incoming (first seen) words 
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{ 
    (word, counterState: Option[Int]) => 
    counterState match { 
     case None => (1, Some(1)) 
     case Some(counter) => (counter + 1, Some(counter + 1)) 
    } 
}.setParallelism(1) 

numberUniqueWords.print(); 

env.execute() 
+0

Può causare OOM o degrado delle prestazioni se il flusso in entrata è "infinito" e l'insieme di stringhe (in 'filterWithState') diventa troppo grande? –

+1

Non se si sta utilizzando un backend di stato che supporta out-of-core. Il 'RocksDBStateBackend' è un backend di stato. Se si utilizza il backend dello stato della memoria, è necessario cancellare lo stato una volta ogni tanto, altrimenti è possibile eseguire OOM. –

+0

Tuttavia una domanda, come comprendo le operazioni di salvataggio/ripristino da/verso il backend 'RocksDBStateBackend' in questo caso ha complessità O (N) dove N è il conteggio di elementi in set cioè questo backend salva/ripristina sempre tutti gli elementi di Set o solo elementi modificati? –