SUGGERIMENTO: Questa risposta si basa su akka-stream-experimental
versione 2.0-M2
. L'API potrebbe essere leggermente diversa in altre versioni.
Un modo semplice per chiudere la connessione è quello di utilizzare un PushStage
: Ogni elemento
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
che si riceve sul lato client o sul lato server (e, in generale, ogni elemento che passa attraverso a Flow
) passa attraverso tale componente Stage
. In Akka, l'astrazione completa si chiama GraphStage
, ulteriori informazioni possono essere trovate nel official documentation.
Con un PushStage
possiamo osservare gli elementi in entrata concreti per il loro valore e quindi trasformare il contesto di conseguenza. Nell'esempio sopra, una volta ricevuto il messaggio goodbye
, finiamo il contesto altrimenti inoltriamo semplicemente il valore tramite il metodo push
.
Ora, possiamo collegare il componente closeClient
ad un flusso arbitrario attraverso il metodo transform
:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
Il flusso sopra riceve un ByteString
e restituisce una ByteString
, che significa che può essere collegato al connection
attraverso il join
metodo. All'interno del flusso, prima convertiamo i byte in una stringa prima di inviarli a closeClient
. Se PushStage
non termina il flusso, l'elemento viene inoltrato nello stream, dove viene rilasciato e sostituito da un input da stdin, che viene quindi inviato nuovamente sul filo. Nel caso in cui lo stream sia terminato, tutte le ulteriori fasi di elaborazione del flusso dopo il componente stage verranno eliminate: lo stream è ora chiuso.
Grazie mille!Sembra qualcosa che stavo cercando. – Tvaroh
Sfortunatamente questo non chiude la connessione websocket (su akka-http) quando viene applicata al flusso "in". Forse ha anche bisogno che la fonte 'out' sia 'finish'ed. – Tvaroh
Sì, akka-http chiude la connessione quando sono entrambi "finiti": 'Flow' - con' finish'ing come suggerito e 'Source' (dell'output) - fermando il suo attore sottostante. – Tvaroh