2016-02-17 19 views
14

Sto utilizzando akka stream graphDSL per creare un grafico eseguibile. Non ci sono errori in fase di compilazione dell'ingresso/uscita dei componenti del flusso. Il runtime genera il seguente errore:Errore nel creare il grafico: requisito non riuscito: gli ingressi [] e le uscite [] devono corrispondere agli ingressi [in] e alle uscite [out]

Qualche idea che cosa dovrei verificare per farlo funzionare?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out] 
at scala.Predef$.require(Predef.scala:219) 
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168) 
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390) 
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18) 
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813) 
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109) 
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65) 
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39) 
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13) 

La struttura grafico:

source ~> flowRate ~> render ~> platformPartition.in 
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0) 
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1) 
merger.out ~> evtCreator ~> Sink.ignore 
+0

Puoi pubblicare il tuo progetto grafico? – manub

+1

Ho aggiornato la domanda con la struttura del grafico. I parametri del tipo source/flow/sink corrispondono tutti in ingresso/uscita. – phantomastray

+0

Anche i tipi di 'render',' platformPartition', 'merger' e' evtCreator' potrebbero essere utili. – manub

risposta

14

Hai un ingresso inutilizzato o uscita (uno dei flussi o qualcosa non è collegata su tutti i lati). Ecco alcuni esempi:

funziona:

val workingFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val intFlow = b.add(Flow[Int]) 
    FlowShape(intFlow.in, intFlow.out) 
    }) 

Il seguente codice produce un errore simile al tuo, perché ha un intero flusso inutilizzato:

val buggyFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val intFlow = b.add(Flow[Int]) 
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow is unused 
    FlowShape(intFlow.in, intFlow.out) 
    }) 

Un esempio leggermente più complesso: Qui non c'è un intero flusso inutilizzato, c'è solo una presa inutilizzata. Produce lo stesso errore:

val buggyFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[Int](2)) 
    val intFlow = b.add(Flow[Int]) 
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow's outlet isn't used 

    broadcast ~> intFlow 
    broadcast ~> unusedFlow 

    FlowShape(broadcast.in, intFlow.out) 
    }) 
+0

sì, l'abbiamo capito da molto tempo. Il messaggio di errore non è intuitivo. Avrei dovuto aggiornare la domanda :) Grazie, anche se per averlo esaminato. – phantomastray