Modifica: questa risposta si applica solo alla versione precedente di spark e akka. La risposta di PH88 è il metodo corretto per le versioni recenti.
È possibile utilizzare un intermedio akka.actor.Actor
che alimenta una Sorgente (simile a this question). La soluzione di seguito non è "reattiva" perché l'attore sottostante dovrebbe mantenere un buffer di messaggi RDD che potrebbero essere eliminati se il client http a valle non sta consumando blocchi abbastanza rapidamente. Ma questo problema si verifica indipendentemente dai dettagli dell'implementazione poiché non è possibile connettere la "limitazione" della contropressione del flusso di akka al DStream per rallentare i dati. Ciò è dovuto al fatto che DStream non implementa org.reactivestreams.Publisher
.
La topologia di base è:
DStream --> Actor with buffer --> Source
per costruire questo toplogy è necessario creare un attore simile all'implementazione here:
//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props
creare un flusso Fonte di stringhe di byte (messaggi) sulla base di il JobManager. Inoltre, convertire il ByteString
-HttpEntity.ChunkStreamPart
, che è ciò che il HttpResponse richiede:
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString
type Message = ByteString
val messageToChunkPart =
Flow[Message].map(HttpEntity.ChunkStreamPart(_))
//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
link il DSTREAM Spark l'attore in modo che ogni RDD incomining viene convertita in un Iterable di ByteString e poi trasmesso al Attore:
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD
val dstream : DStream = ???
//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
def sendMessageToActor(message : Message) = actorRef ! message
//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
fornire la fonte alla HttpResponse:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Nota: ci dovrebbe essere molto poco tempo/cod e tra la riga dstream foreachRDD
e la risposta Http dal momento che il buffer interno dell'Actor inizierà immediatamente a riempirsi con il messaggio ByteString proveniente dal DStream dopo l'esecuzione della riga foreach
.