Ho un flusso Akka Source che voglio suddividere in due fonti in base a un predicato.Divisione sorgente streaming Akka in due

E.g. avendo una sorgente (tipi sono semplificate intenzionalmente):

val source: Source[Either[Throwable, String], NotUsed] = ??? 

E due metodi:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ??? 
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ??? 

desidero poter dividere il source secondo _.isRight predicato e passare la parte destra di handleSuccess metodo e lasciato parte al metodo handleFailure.

Ho provato a utilizzare lo splitter Broadcast ma alla fine richiede Sink s.


Non credo sia possibile dividere una fonte in due fonti in tal modo perché queste fonti di divisione possono essere materializzati separatamente ed è assolutamente non è chiaro come dovrebbe funzionare anche allora. –


Sì, capisco le implicazioni di questo. Mi interessa un modello alternativo per ricostruire il codice in.Per esempio, potrei rendere i miei metodi restituire 'Sink's invece di accettare un' Source'. – Tvaroh



Sebbene sia possibile scegliere da quale lato dello Source si desidera recuperare gli articoli, non è possibile creare uno Source che produca due output, che è ciò che sembra che alla fine si vorrebbe.

Data la GraphStage di sotto del quale divide essenzialmente i valori di destra e di sinistra in due uscite ...

    * Fans out left and right values of an either 
    * @tparam L left value type 
    * @tparam R right value type 
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] { 
    import akka.stream.{Attributes, Outlet} 
    import akka.stream.stage.GraphStageLogic 

    override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut") 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 

    var out0demand = false 
    var out1demand = false 

    setHandler(shape.in, new InHandler { 
     override def onPush(): Unit = { 

     if (out0demand && out1demand) { 
      grab(shape.in) match { 
      case Left(l) => 
       out0demand = false 
       push(shape.out0, l) 
      case Right(r) => 
       out1demand = false 
       push(shape.out1, r) 

    setHandler(shape.out0, new OutHandler { 
     override def onPull(): Unit = { 
     if (!out0demand) { 
      out0demand = true 

     if (out0demand && out1demand) { 

    setHandler(shape.out1, new OutHandler { 
     override def onPull(): Unit = { 
     if (!out1demand) { 
      out1demand = true 

     if (out0demand && out1demand) { 

.. è possibile instradare loro di ricevere un solo lato:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s => 
    import GraphDSL.Implicits._ 

    val eitherFanOut = b.add(new EitherFanOut[Throwable, String]) 

    s ~> eitherFanOut.in 
    eitherFanOut.out0 ~> Sink.ignore 


Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf) 

... o probabilmente più desiderabile, indirizzarli a due Sink s separati:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) 
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) 

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) => 

    import GraphDSL.Implicits._ 

    val eitherFanOut = b.add(new EitherFanOut[Throwable, String]) 

    s ~> eitherFanOut.in 
    eitherFanOut.out0 ~> l.in 
    eitherFanOut.out1 ~> r.in 


val r = flow.run() 
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf) 

(importazioni e la configurazione iniziale)

import akka.NotUsed 
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source} 
import akka.stream.stage.{GraphStage, InHandler, OutHandler} 
import akka.stream._ 
import akka.actor.ActorSystem 
import com.typesafe.config.ConfigFactory 

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 

import scala.concurrent.Await 
import scala.concurrent.duration.Duration 

val classLoader = getClass.getClassLoader 
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader) 
implicit val materializer = ActorMaterializer() 

val values: List[Either[Throwable, String]] = List(
    Left(new Throwable), 
    Left(new RuntimeException), 

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator) 

Per questo si può utilizzare una trasmissione, quindi filtrare e mappare i flussi all'interno del GraphDSL:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) 
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) 

val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) => 

     import GraphDSL.Implicits._ 

     val broadcast = b.add(Broadcast[Either[Throwable,String]](2)) 

     s ~> broadcast.in 
     broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in 
     broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in 


val r = flow.run() 
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf) 

mi aspetto che sarà in grado di eseguire il funzioni che desideri all'interno della mappa.


Una variante più sicura di '.filter (_. IsLeft) .map (_. Left.get)' è '.collect {case Left (l) => l}' E così per il ramo _right_ – Alexander


Questo è implementato in akka-stream-contrib come PartitionWith. Aggiungi questa dipendenza a SBT di tirarlo al tuo progetto:

// latest version available on https://github.com/akka/akka-stream-contrib/releases libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.8"

PartitionWith è a forma di Broadcast(2), ma con potenzialmente diversi tipi per ciascuna delle due uscite. Fornisci un predicato da applicare a ciascun elemento e, in base al risultato, viene indirizzato allo sbocco applicabile. È quindi possibile allegare un Sink o Flow a ciascuna di queste prese in modo appropriato. Sulla cessationoftime's example, con la Broadcast sostituito con un PartitionWith:

val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty 
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) 
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) 

val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink) 
            ((_, _, _)) { implicit b => (s, l, r) => 

    import GraphDSL.Implicits._ 

    val pw = b.add(
    PartitionWith.apply[Either[Throwable, String], Throwable, String](identity) 

    eitherSource ~> pw.in 
    pw.out0 ~> leftSink 
    pw.out1 ~> rightSink 


val r = flow.run() 
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)