Sto usando Akka 2.4.4 e sto provando a passare da Apache HttpAsyncClient (senza successo).Akka Flow si blocca quando si effettuano richieste HTTP tramite il pool di connessioni
Di seguito è riportata una versione semplificata del codice che utilizzo nel mio progetto.
Il problema è che si blocca se invio più di 1-3 richieste al flusso. Finora dopo 6 ore di debug non sono riuscito a localizzare il problema. Non vedo eccezioni, log degli errori, eventi in Decider
. NIENTE :)
Ho provato a ridurre connection-timeout
impostando su 1s pensando che forse è in attesa di risposta dal server ma non è stato d'aiuto.
Cosa sto sbagliando?
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
val config = ConfigFactory.load()
private val baseDomain = "www.google.com"
private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))
private val decider: Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =
Source.fromIterator(() => items.toIterator)
.via(poolClientFlow)
.log("Logger")(log = myAdapter)
.recoverWith {
case ex =>
println(ex)
null
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
.map { v =>
println(s"Got ${v.length} responses in Flow")
v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
}
def main(args: Array[String]) {
val headers = imSeq(Referer("https://www.google.com/"))
val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
val requests = List.fill(10)(reqPair)
val qwe = sendMultipleRequests(requests).map { case responses =>
println(s"Got ${responses.length} responses")
system.terminate()
}
Await.ready(system.whenTerminated, Duration.Inf)
}
}
Anche cosa succede con proxy support? Non sembra funzionare anche per me.
In realtà l'ho provato e non è stato d'aiuto. Forse l'ho messo nel posto sbagliato. Potresti per favore dare un'occhiata al progetto autosufficiente che ho creato? https://github.com/cppexpert/akka_flow_freezing – expert
Ya, questo è il problema. Stai cercando di sequenziare i risultati di 10 futures e poi di leggere il corpo. Il problema è che per chiamare la mappa su 'sequenza', tutti e 10 i futures devono essere stati completati e solo i primi 4 saranno e quei primi 4 stanno bloccando l'altro 6. Spingere ulteriormente il codice di lettura della risposta e ciò risolverà il tuo problema – cmbaxter
Puoi dimostrare come spostare il codice di lettura della risposta? Ho provato poche cose ed è ancora in attesa che i futures siano finiti alla rinfusa. Anche nel mio esempio dovremmo chiamare parseResponse in modo asincrono prima di ogni risposta PRIMA che sia passato a Future.sequence? Forse potrei passare a 'toMat' di coda ma poi non sarò in grado di usarlo per analizzare risposte diverse. Imballare lamba con '(Any, Promise [..])' per ogni richiesta mi sembra troppo brutto. – expert