2015-10-28 7 views
5

Sto costruendo un'API REST che avvia alcuni calcoli in un cluster Spark e risponde con un flusso chunked dei risultati. Dato il flusso di Spark con i risultati dei calcoli, posso usareModo idiomatico per utilizzare Spark DStream come origine per un flusso Akka

dstream.foreachRDD() 

per inviare i dati di Spark. Sto inviando la risposta HTTP Chunked con Akka-http:

val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) 
} 

Per semplicità, sto cercando di ottenere il testo normale di lavoro prima, aggiungerà JSON smistamento tardi.

Ma qual è il modo idiomatico di utilizzare Spark DStream come sorgente per il flusso Akka? Ho pensato che dovrei essere in grado di farlo tramite un socket ma dal momento che il driver Spark e l'endpoint REST sono seduti sulla stessa JVM l'apertura di un socket solo per questo sembra un po 'eccessivo.

risposta

1

Modifica: questa risposta si applica solo alla versione precedente di spark e akka. La risposta di PH88 è il metodo corretto per le versioni recenti.

È possibile utilizzare un intermedio akka.actor.Actor che alimenta una Sorgente (simile a this question). La soluzione di seguito non è "reattiva" perché l'attore sottostante dovrebbe mantenere un buffer di messaggi RDD che potrebbero essere eliminati se il client http a valle non sta consumando blocchi abbastanza rapidamente. Ma questo problema si verifica indipendentemente dai dettagli dell'implementazione poiché non è possibile connettere la "limitazione" della contropressione del flusso di akka al DStream per rallentare i dati. Ciò è dovuto al fatto che DStream non implementa org.reactivestreams.Publisher.

La topologia di base è:

DStream --> Actor with buffer --> Source 

per costruire questo toplogy è necessario creare un attore simile all'implementazione here:

//JobManager definition is provided in the link 
val actorRef = actorSystem actorOf JobManager.props 

creare un flusso Fonte di stringhe di byte (messaggi) sulla base di il JobManager. Inoltre, convertire il ByteString-HttpEntity.ChunkStreamPart, che è ciò che il HttpResponse richiede:

import akka.stream.actor.ActorPublisher 
import akka.stream.scaladsl.Source 
import akka.http.scaladsl.model.HttpEntity 
import akka.util.ByteString 

type Message = ByteString 

val messageToChunkPart = 
    Flow[Message].map(HttpEntity.ChunkStreamPart(_)) 

//Actor with buffer --> Source 
val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
    Source(ActorPublisher[Message](actorRef)) via messageToChunkPart 

link il DSTREAM Spark l'attore in modo che ogni RDD incomining viene convertita in un Iterable di ByteString e poi trasmesso al Attore:

import org.apache.spark.streaming.dstream.Dstream 
import org.apache.spark.rdd.RDD 

val dstream : DStream = ??? 

//This function converts your RDDs to messages being sent 
//via the http response 
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ??? 

def sendMessageToActor(message : Message) = actorRef ! message 

//DStream --> Actor with buffer 
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor} 

fornire la fonte alla HttpResponse:

val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) 
} 

Nota: ci dovrebbe essere molto poco tempo/cod e tra la riga dstream foreachRDD e la risposta Http dal momento che il buffer interno dell'Actor inizierà immediatamente a riempirsi con il messaggio ByteString proveniente dal DStream dopo l'esecuzione della riga foreach.

7

Non sono sicuro della versione di api al momento della domanda. Ma ora, con akka-stream 2.0.3, credo che puoi farlo come:

val source = Source 
    .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead) 
    .mapMaterializedValue[Unit] { actorRef => 
    dstream.foreach(actorRef ! _) 
    }