2015-11-20 9 views
9

Sto usando i flussi akka e ho un segmento del mio grafico che ho bisogno di saltare condizionatamente perché il flusso non può gestire certi valori. Nello specifico, ho un flusso che accetta una stringa e fa richieste http, ma il server non può gestire il caso quando la stringa è vuota. Ma ho solo bisogno di restituire una stringa vuota. C'è un modo per farlo senza dover passare attraverso la richiesta http sapendo che fallirà? Io fondamentalmente ho questo:Salta saltuariamente il flusso usando i flussi akka

val source = Source("1", "2", "", "3", "4") 
val httpRequest: Flow[String, HttpRequest, _] 
val httpResponse: Flow[HttpResponse, String, _] 
val flow = source.via(httpRequest).via(httpResponse) 

L'unica cosa che posso pensare di fare è catturare l'errore 400 nel mio flusso HttpResponse e restituendo un valore predefinito. Ma mi piacerebbe essere in grado di evitare il sovraccarico di colpire il server per una richiesta che so fallire in anticipo.

+0

L'esempio non viene compilato. L'output di httpRequest è di tipo HttpRequest e l'input di httpResponse è di tipo HttpResponse quindi non possono essere concatenati insieme a 'via'. –

risposta

8

Viktor Klang è conciso e elegant. Volevo solo dimostrare un'alternativa usando i grafici.

È possibile dividere la sorgente di stringhe in due flussi e filtrare un flusso per stringhe valide e l'altro flusso per stringhe non valide. Quindi unire i risultati ("cross the streams").

Basato sul documentation:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] => 
    import FlowGraph.Implicits._ 

    val source = Source(List("1", "2", "", "3", "4")) 
    val sink : Sink[String,_] = ??? 

    val bcast = builder.add(Broadcast[String](2)) 
    val merge = builder.add(Merge[String](2)) 

    val validReq = Flow[String].filter(_.size > 0) 
    val invalidReq = Flow[String].filter(_.size == 0) 

    val httpRequest: Flow[String, HttpRequest, _] = ??? 
    val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ??? 
    val httpResponse: Flow[HttpResponse, String, _] = ??? 
    val someHttpTransformation = httpRequest via makeHttpCall via httpResponse 

    source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink 
      bcast ~>  invalidReq     ~> merge 
    ClosedShape 
}) 

Nota: questa soluzione divide il flusso, quindi il dispersore può elaborare stringa valore risulta in un ordine diverso rispetto a quello previsto in base agli input.

+2

Sì! Questa è un'altra opzione quando sei d'accordo con il riordino delle operazioni (poiché il flusso http e il non-http funzionano in parallelo) –

+0

@ViktorKlang Notato, grazie. –

+0

Questo è perfetto. non pensavo ai filtri paralleli. Buon punto sull'ordinazione, ma il flusso di richieste di akka-http è comunque non ordinato (almeno il pool di https) – Falmarri

12

Potreste usare flatMapConcat:

(Attenzione: non è mai stato compilato, ma si otterrà l'essenza di esso) la soluzione di

val source = Source("1", "2", "", "3", "4") 
val httpRequest: Flow[String, HttpRequest, _] 
val httpResponse: Flow[HttpResponse, String, _] 
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] 
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse 
val emptyStringSource = Source.single("") 
val cleanerSource = source.flatMapConcat({ 
    case "" => emptyStringSource 
    case other => Source.single(other) via someHttpTransformation 
})