2016-01-20 8 views
6

Negli ultimi giorni ho cercato di capire il modo migliore per scaricare una risorsa HTTP in un file utilizzando Akka Streams e HTTP.Come scaricare una risorsa HTTP in un file con Akka Streams e HTTP?

Inizialmente ho iniziato con la Future-Based Variant e che sembrava qualcosa di simile:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

Quella era una sorta di bene, ma una volta che ho imparato di più su puri Akka Streams ho voluto provare e utilizzare il Flow-Based Variant per creare un flusso a partire da un Source[HttpRequest]. All'inizio questo mi ha completamente bloccato finché non sono incappato nella trasformazione del flusso flatMapConcat. Questo ha finito per un po 'più dettagliato:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { 
    case (responseTry, context) => (responseTry.get, context) 
} 

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { 
    case (response, _) => response.entity.dataBytes 
} 

def downloadViaFlow(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSource). 
    runWith(FileIO.toFile(file)) 
} 

Poi ho voluto ottenere un po' complicato e utilizzare l'intestazione Content-Disposition.

Tornando alla variante Future-Based:

def destinationFile(downloadDir: File, response: HttpResponse): File = { 
    val fileName = response.header[ContentDisposition].get.value 
    val file = new File(downloadDir, fileName) 
    file.createNewFile() 
    file 
} 

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val file = destinationFile(downloadDir, response) 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

Ma ora non ho idea di come fare questo con la variante Future-Based. Questo è quanto ho ottenuto:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { 
    case (response, _) => 
    val source = responseToByteSource(in) 
    val file = destinationFile(downloadDir, response) 
    source.map((_, file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    val sourceWithDest: Source[(ByteString, File), Unit] = source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) 
    sourceWithDest.runWith(???) 
} 

Così ora ho un Source che emetterà uno o più (ByteString, File) elementi per ogni File (dico ogni File poiché non v'è alcuna ragione per l'originale Source deve essere un singolo HttpRequest).

Esiste comunque la possibilità di prenderli e indirizzarli a un Sink dinamico?

Sto pensando qualcosa del tipo flatMapConcat, come ad esempio:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? 

così che ho potuto completare downloadViaFlow2 con:

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { 
    val sink = FileIO.toFile(destination, true) 
    Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) 
} 
sourceWithDest.runWithMap { 
    case (_, file) => destToSink(file) 
} 

risposta

5

La soluzione non richiede una flatMapConcat. Se non avete bisogno di alcun valore di ritorno dalla scrittura di file è possibile utilizzare Sink.foreach:

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = { 
    val file = destinationFile(downloadDir, httpResponse) 
    httpResponse.entity.dataBytes.runWith(FileIO.toFile(file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = { 
    val request = HttpRequest(uri=uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 

    source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .runWith(Sink.foreach(writeFile(downloadDir))) 
} 

Si noti che il Sink.foreach crea Futures dalla funzione writeFile. Quindi non c'è molta contropressione. Il writefile potrebbe essere rallentato dal disco rigido, ma lo stream continuerebbe a generare Futures. Per controllare questo è possibile utilizzare Flow.mapAsyncUnordered (o Flow.mapAsync):

val parallelism = 10 

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.ignore) 

Se si desidera accumulare i valori lungo per un conteggio totale è necessario combinare con un Sink.fold:

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.fold(0L)(_ + _)) 

La piega manterrà una somma parziale ed emette il valore finale quando la fonte delle richieste si è esaurita.

+0

Hmm speravo che ci fosse un modo migliore di questo. Non sono neanche sicuro che funzionerà correttamente. 'writeFile' tornerà non appena il flusso FileIO sarà stato materializzato. Se la risposta è suddivisa in blocchi, deve essere scritta nel file in ordine.Problema simile con l'uso di 'mapAsync'. Dovrebbe essere impostato anche il parametro 'append'. Inoltre sembra che qualsiasi errore nella scrittura del file non provochi un errore nel flusso esterno. – Steiny

+1

@Steiny Rompendo la mia risposta ai tuoi commenti multipli: (a) correggi, scrivi i ritorni di file con un Futuro immediatamente ma mapAsync gestisce questo (b) non c'è soluzione che possa correggere il chunkedsource né questa parte della domanda/dei requisiti originali (c) append è necessario solo se scrivere sullo stesso file (d) forzando lo stream esterno a fallire su un errore di scrittura di un file non faceva parte della domanda originale. Hai chiesto "Esiste comunque la possibilità di prenderli e indirizzarli verso un dissipatore dinamico?", La mia risposta risponde ** a quella domanda. Ho scritto la mia risposta nel contesto del tuo codice di esempio ... –