2015-05-06 6 views
5

Sto cercando di costruire e gestire un flusso di flusso Akka (in Java DSL) con 2 attori come fonti, poi un bivio unione e poi 1 lavello:Come collegare più attori come sorgenti ad un flusso Akka?

Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.backpressure()); 
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.backpressure()); 
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println)); 

    RunnableFlow<BoxedUnit> closed = FlowGraph.factory().closed(sink, (b, out) -> { 
     UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2)); 
     b.from(src1).via(merge).to(out); 
     b.from(src2).to(merge); 
    }); 

    closed.run(mat); 

mia domanda è come posso ottenere ActorRef riferimenti a gli attori di origine per inviare loro messaggi? Nel caso di 1 attore, non utilizzerei il generatore di grafici e quindi il metodo .run() o runWith() restituirebbe l'oggetto ActorRef. Ma cosa fare in caso di molti attori di origine? È persino possibile materializzare un tale flusso?

+0

È necessario passare gli elementi per i quali è necessario l'accesso il valore materializzato a 'closed' e quindi fornire una funzione che combina i valori materializzati. Qualcosa del genere: 'closed (src1, src2, (actorRef1, actorRef2) -> SomethingContainingBothActorRefs, (b, s1, s2) -> ...)' – jrudolph

+0

Grazie, jrudolph. – dev4ever44

risposta

6

Rispondere alla mia domanda nel caso qualcuno ne abbia bisogno.

Utilizzando il consiglio di jrudolph, sono stato in grado di utilizzare attori come questo (in codice vero che ho fatto qualcosa di più che un elenco di 2 ActorRefs):

Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.fail()); 
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.fail()); 
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println)); 

    RunnableFlow<List<ActorRef>> closed = FlowGraph.factory().closed(src1, src2, (a1, a2) -> Arrays.asList(a1, a2), (b, s1, s2) -> { 
     UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2)); 
     b.from(s1).via(merge).to(sink); 
     b.from(s2).to(merge); 
    }); 

    List<ActorRef> stream = closed.run(mat); 
    ActorRef a1 = stream.get(0); 
    ActorRef a2 = stream.get(1);