Sebbene sia possibile scegliere da quale lato dello Source
si desidera recuperare gli articoli, non è possibile creare uno Source
che produca due output, che è ciò che sembra che alla fine si vorrebbe.
Data la GraphStage
di sotto del quale divide essenzialmente i valori di destra e di sinistra in due uscite ...
/**
* Fans out left and right values of an either
* @tparam L left value type
* @tparam R right value type
*/
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
import akka.stream.{Attributes, Outlet}
import akka.stream.stage.GraphStageLogic
override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var out0demand = false
var out1demand = false
setHandler(shape.in, new InHandler {
override def onPush(): Unit = {
if (out0demand && out1demand) {
grab(shape.in) match {
case Left(l) =>
out0demand = false
push(shape.out0, l)
case Right(r) =>
out1demand = false
push(shape.out1, r)
}
}
}
})
setHandler(shape.out0, new OutHandler {
@scala.throws[Exception](classOf[Exception])
override def onPull(): Unit = {
if (!out0demand) {
out0demand = true
}
if (out0demand && out1demand) {
pull(shape.in)
}
}
})
setHandler(shape.out1, new OutHandler {
@scala.throws[Exception](classOf[Exception])
override def onPull(): Unit = {
if (!out1demand) {
out1demand = true
}
if (out0demand && out1demand) {
pull(shape.in)
}
}
})
}
}
.. è possibile instradare loro di ricevere un solo lato:
val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
import GraphDSL.Implicits._
val eitherFanOut = b.add(new EitherFanOut[Throwable, String])
s ~> eitherFanOut.in
eitherFanOut.out0 ~> Sink.ignore
SourceShape(eitherFanOut.out1)
})
Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)
... o probabilmente più desiderabile, indirizzarli a due Sink
s separati:
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))
val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>
import GraphDSL.Implicits._
val eitherFanOut = b.add(new EitherFanOut[Throwable, String])
s ~> eitherFanOut.in
eitherFanOut.out0 ~> l.in
eitherFanOut.out1 ~> r.in
ClosedShape
})
val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
(importazioni e la configurazione iniziale)
import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration.Duration
val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()
val values: List[Either[Throwable, String]] = List(
Right("B"),
Left(new Throwable),
Left(new RuntimeException),
Right("B"),
Right("C"),
Right("G"),
Right("I"),
Right("F"),
Right("T"),
Right("A")
)
val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)
Non credo sia possibile dividere una fonte in due fonti in tal modo perché queste fonti di divisione possono essere materializzati separatamente ed è assolutamente non è chiaro come dovrebbe funzionare anche allora. –
Sì, capisco le implicazioni di questo. Mi interessa un modello alternativo per ricostruire il codice in.Per esempio, potrei rendere i miei metodi restituire 'Sink's invece di accettare un' Source'. – Tvaroh