2015-12-15 11 views
6

Nel mio scenario, un client invia un messaggio di websocket "addio" e ho bisogno di chiudere la connessione precedentemente stabilita sul lato server.Chiudere la connessione webkite-ka http dal server

Da Akka-http docs:

connessioni di chiusura è possibile cancellando Flow connessione in ingresso e la logica di server (es collegando sua valle ad un Sink.cancelled e il suo monte di Source.empty) . È anche possibile spegnere il socket del server annullando le connessioni di origine IncomingConnection.

ma non è chiaro per me come fare tenendo conto che Sink e Source sono impostati una volta quando la negoziazione di un nuovo collegamento:

(get & path("ws")) { 
    optionalHeaderValueByType[UpgradeToWebsocket]() { 
    case Some(upgrade) ⇒ 
     val connectionId = UUID() 
     complete(upgrade.handleMessagesWithSinkSource(sink, source)) 
    case None ⇒ 
     reject(ExpectedWebsocketRequestRejection) 
    } 
} 

risposta

3

SUGGERIMENTO: Questa risposta si basa su akka-stream-experimental versione 2.0-M2. L'API potrebbe essere leggermente diversa in altre versioni.


Un modo semplice per chiudere la connessione è quello di utilizzare un PushStage: Ogni elemento

import akka.stream.stage._ 

val closeClient = new PushStage[String, String] { 
    override def onPush(elem: String, ctx: Context[String]) = elem match { 
    case "goodbye" ⇒ 
     // println("Connection closed") 
     ctx.finish() 
    case msg ⇒ 
     ctx.push(msg) 
    } 
} 

che si riceve sul lato client o sul lato server (e, in generale, ogni elemento che passa attraverso a Flow) passa attraverso tale componente Stage. In Akka, l'astrazione completa si chiama GraphStage, ulteriori informazioni possono essere trovate nel official documentation.

Con un PushStage possiamo osservare gli elementi in entrata concreti per il loro valore e quindi trasformare il contesto di conseguenza. Nell'esempio sopra, una volta ricevuto il messaggio goodbye, finiamo il contesto altrimenti inoltriamo semplicemente il valore tramite il metodo push.

Ora, possiamo collegare il componente closeClient ad un flusso arbitrario attraverso il metodo transform:

val connection = Tcp().outgoingConnection(address, port) 

val flow = Flow[ByteString] 
    .via(Framing.delimiter(
     ByteString("\n"), 
     maximumFrameLength = 256, 
     allowTruncation = true)) 
    .map(_.utf8String) 
    .transform(() ⇒ closeClient) 
    .map(_ ⇒ StdIn.readLine("> ")) 
    .map(_ + "\n") 
    .map(ByteString(_)) 

connection.join(flow).run() 

Il flusso sopra riceve un ByteString e restituisce una ByteString, che significa che può essere collegato al connection attraverso il join metodo. All'interno del flusso, prima convertiamo i byte in una stringa prima di inviarli a closeClient. Se PushStage non termina il flusso, l'elemento viene inoltrato nello stream, dove viene rilasciato e sostituito da un input da stdin, che viene quindi inviato nuovamente sul filo. Nel caso in cui lo stream sia terminato, tutte le ulteriori fasi di elaborazione del flusso dopo il componente stage verranno eliminate: lo stream è ora chiuso.

+0

Grazie mille!Sembra qualcosa che stavo cercando. – Tvaroh

+0

Sfortunatamente questo non chiude la connessione websocket (su akka-http) quando viene applicata al flusso "in". Forse ha anche bisogno che la fonte 'out' sia 'finish'ed. – Tvaroh

+0

Sì, akka-http chiude la connessione quando sono entrambi "finiti": 'Flow' - con' finish'ing come suggerito e 'Source' (dell'output) - fermando il suo attore sottostante. – Tvaroh

2

Ciò può essere ottenuto dalla seguente nella (2.4.14) versione corrente di Akka-stream

package com.trackabus.misc 

import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 

// terminates the flow based on a predicate for a message of type T 
// if forwardTerminatingMessage is set the message is passed along the flow 
// before termination 
// if terminate is true the stage is failed, if it is false the stage is completed 
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true) 
    extends GraphStage[FlowShape[T, T]] 
{ 
    val in = Inlet[T]("TerminateFlowStage.in") 
    val out = Outlet[T]("TerminateFlowStage.out") 
    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     setHandlers(in, out, new InHandler with OutHandler { 
     override def onPull(): Unit = { pull(in) } 

     override def onPush(): Unit = { 
      val chunk = grab(in) 

      if (pred(chunk)) { 
      if (forwardTerminatingMessage) 
       push(out, chunk) 
      if (terminate) 
       failStage(new RuntimeException("Flow terminated by TerminateFlowStage")) 
      else 
       completeStage() 
      } 
      else 
      push(out, chunk) 
     } 
     }) 
    } 
} 

di usarlo definire la fase

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe]) 

e poi includerlo come parte del flusso

.via(termOnKillMe)