2016-05-05 32 views
7

Sto usando Akka 2.4.4 e sto provando a passare da Apache HttpAsyncClient (senza successo).Akka Flow si blocca quando si effettuano richieste HTTP tramite il pool di connessioni

Di seguito è riportata una versione semplificata del codice che utilizzo nel mio progetto.

Il problema è che si blocca se invio più di 1-3 richieste al flusso. Finora dopo 6 ore di debug non sono riuscito a localizzare il problema. Non vedo eccezioni, log degli errori, eventi in Decider. NIENTE :)

Ho provato a ridurre connection-timeout impostando su 1s pensando che forse è in attesa di risposta dal server ma non è stato d'aiuto.

Cosa sto sbagliando?

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.headers.Referer 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream.Supervision.Decider 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.{ActorAttributes, Supervision} 
import com.typesafe.config.ConfigFactory 

import scala.collection.immutable.{Seq => imSeq} 
import scala.concurrent.{Await, Future} 
import scala.concurrent.duration.Duration 
import scala.util.Try 

object Main { 

    implicit val system = ActorSystem("root") 
    implicit val executor = system.dispatcher 
    val config = ConfigFactory.load() 

    private val baseDomain = "www.google.com" 
    private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config)) 

    private val decider: Decider = { 
    case ex => 
     ex.printStackTrace() 
     Supervision.Stop 
    } 

    private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] = 

    Source.fromIterator(() => items.toIterator) 
     .via(poolClientFlow) 
     .log("Logger")(log = myAdapter) 
     .recoverWith { 
     case ex => 
      println(ex) 
      null 
     } 
     .withAttributes(ActorAttributes.supervisionStrategy(decider)) 
     .runWith(Sink.seq) 
     .map { v => 
     println(s"Got ${v.length} responses in Flow") 
     v.asInstanceOf[Seq[(Try[HttpResponse], T)]] 
     } 

    def main(args: Array[String]) { 

    val headers = imSeq(Referer("https://www.google.com/")) 
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID" 
    val requests = List.fill(10)(reqPair) 
    val qwe = sendMultipleRequests(requests).map { case responses => 
     println(s"Got ${responses.length} responses") 

     system.terminate() 
    } 

    Await.ready(system.whenTerminated, Duration.Inf) 
    } 
} 

Anche cosa succede con proxy support? Non sembra funzionare anche per me.

risposta

7

È necessario consumare completamente il corpo della risposta in modo che la connessione sia resa disponibile per le richieste successive. Se non si preoccupano l'entità di risposta a tutti, allora si può solo scolarla a un Sink.ignore, qualcosa di simile:

resp.entity.dataBytes.runWith(Sink.ignore) 

Entro la configurazione di default, quando si utilizza un pool di connessione host, le connessioni Max è impostato su 4. Ogni pool ha la propria coda in cui le richieste attendono fino a quando una delle connessioni aperte diventa disponibile. Se quella coda va oltre 32 (la configurazione di default, può essere cambiata, deve essere una potenza di 2) allora inizierà a vedere i fallimenti. Nel tuo caso, fai solo 10 richieste, quindi non raggiungi quel limite. Ma non consumando l'entità di risposta non si libera la connessione e tutto il resto si limita a fare le code dietro, in attesa che le connessioni si liberino.

+0

In realtà l'ho provato e non è stato d'aiuto. Forse l'ho messo nel posto sbagliato. Potresti per favore dare un'occhiata al progetto autosufficiente che ho creato? https://github.com/cppexpert/akka_flow_freezing – expert

+2

Ya, questo è il problema. Stai cercando di sequenziare i risultati di 10 futures e poi di leggere il corpo. Il problema è che per chiamare la mappa su 'sequenza', tutti e 10 i futures devono essere stati completati e solo i primi 4 saranno e quei primi 4 stanno bloccando l'altro 6. Spingere ulteriormente il codice di lettura della risposta e ciò risolverà il tuo problema – cmbaxter

+0

Puoi dimostrare come spostare il codice di lettura della risposta? Ho provato poche cose ed è ancora in attesa che i futures siano finiti alla rinfusa. Anche nel mio esempio dovremmo chiamare parseResponse in modo asincrono prima di ogni risposta PRIMA che sia passato a Future.sequence? Forse potrei passare a 'toMat' di coda ma poi non sarò in grado di usarlo per analizzare risposte diverse. Imballare lamba con '(Any, Promise [..])' per ogni richiesta mi sembra troppo brutto. – expert