2016-06-18 41 views
12

Voglio leggere più file di grandi dimensioni utilizzando gli stream di akka per elaborare ogni riga. Immagina che ogni chiave sia costituita da un ("identificatore" -> "valore"). Se viene trovato un nuovo "identificatore", voglio salvarlo nel database e salvarne il "valore", altrimenti se l'identificatore è già stato trovato durante l'elaborazione del flusso di linee, voglio salvare solo il "valore". Per questo, penso di aver bisogno di una sorta di flusso di stato ricorsivo al fine di mantenere gli identificatori che sono già stati trovati in una mappa. Penso che riceverei in questo flusso un paio di (newLine, contextWithIdentifiers).Akka Streams. Stato di stato in un flusso

Ho appena iniziato a esaminare i flussi di akka. Credo di riuscire a gestirmi da solo, ma non ho idea di come mantenere "contextWithIdentifiers". Apprezzerei se qualcuno non potesse indicarmi la buona direzione.

Sto usando Scala.

+2

Apprezzo che tu chieda questo. È una richiesta così semplice, ma trovare una risposta significativa con un codice di esempio sembra elaborato. Questo è l'unico che ho trovato! – akauppi

risposta

17

Ehi forse qualcosa come lo statefulMapConcat può esservi d'aiuto.

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import scala.util.Random._ 
import scala.math.abs 
import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

//encapsulating your input 
case class IdentValue(id: Int, value: String) 
//some random generated input 
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere")) 

val stateFlow = Flow[IdentValue].statefulMapConcat{() => 
    //state with already processed ids 
    var ids = Set.empty[Int] 
    identValue => if (ids.contains(identValue.id)) { 
    //save value to DB 
    println(identValue.value) 
    List(identValue) 
    } else { 
    //save both to database 
    println(identValue) 
    ids = ids + identValue.id 
    List(identValue) 
    } 
} 

Source(identValues) 
    .via(stateFlow) 
    .runWith(Sink.seq) 
    .onSuccess { case identValue => println(identValue) } 
+0

Grazie per il codice. Gradirei un po 'più tipi nel mezzo, dato che c'è una() => ... fabbrica coinvolta. Sapresti perché non esiste un metodo '.statefulMap'? – akauppi