2013-08-04 9 views
12

A volte mi trovo in una situazione in cui ho un Stream[X] e un function X => Future Y, che mi piacerebbe combinare a un Future[Stream[Y]], e non riesco a trovare un modo per farlo. Ad esempio, homappare un flusso con una funzione che restituisce un futuro

val x = (1 until 10).toStream 
def toFutureString(value : Integer) = Future(value toString) 

val result : Future[Stream[String]] = ??? 

ho cercato

val result = Future.Traverse(x, toFutureString) 

che dà il risultato corretto, ma sembra a consumare l'intero flusso prima di restituire il futuro, che più o meno sconfitte del campione da labo

ho provato

val result = x.flatMap(toFutureString) 

ma che non viene compilato type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString) 

restituisce il po 'strano ed inutile Stream[Future[String]]

Cosa devo fare qui per ottenere le cose fisso?

Edit: Io non sono bloccato su un Stream, sarei altrettanto felice con la stessa operazione su un Iterator, fintanto che non bloccherà sulla valutazione di tutti gli elementi prima di iniziare a elaborare la testa

Edit2: Non sono sicuro al 100% che il costrutto Future.Traverse debba attraversare l'intero stream prima di restituire un Future [Stream], ma penso che lo faccia. In caso contrario, questa è una buona risposta in sé.

Edit3: Inoltre non ho bisogno che il risultato sia in ordine, sto bene con lo stream o l'iteratore restituito essendo qualsiasi ordine.

+0

Nota che ho archiviato [un problema] (https://issues.scala-lang.org/browse/SI-7718) per dare seguito alla mia risposta di seguito. –

+0

ah, ottimo @TravisBrown. Volevo farlo da solo, ma non sono riuscito a trovare un modo per accedere a Jira – Martijn

+0

Un po 'poco chiaro: vuoi evitare di applicare "toFutureString" a tutti gli elementi della raccolta prima ...? Sembra che non ci dovrebbe essere molto overhead per creare semplicemente un futuro. Se gli articoli rimanenti nella "lista" sono thunk, cosa ne scatenerebbe la valutazione? Completamento del futuro precedente nell'elenco? Tutte le operazioni di sequenza/attraversamento che ho trovato in Scala sembravano essere rigide sugli elementi delle singole liste. – pdxleif

risposta

9

Sei sulla strada giusta con traverse, ma sfortunatamente sembra che la definizione della libreria standard sia un po 'rotta in questo caso, non dovrebbe aver bisogno di consumare lo stream prima di tornare.

Future.traverse è una versione specifica di una funzione molto più generale che funziona su qualsiasi funtore applicativa avvolto in un tipo "percorribile" (vedi thesepapers o la mia risposta here per ulteriori informazioni, per esempio).

La libreria Scalaz fornisce questa versione più generale, e funziona come previsto in questo caso (notare che io sto ottenendo l'istanza funtore applicativo per Future da scalaz-contrib, ma non è ancora nelle versioni stabili di Scalaz, che sono ancora cross-costruito contro Scala 2.9.2, che non ha questo Future):

import scala.concurrent._ 
import scalaz._, Scalaz._, scalaz.contrib.std._ 

import ExecutionContext.Implicits.global 

def toFutureString(value: Int) = Future(value.toString) 

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString 

Ciò restituisce immediatamente un flusso infinito, quindi sappiamo per certo che non è essere consumano prima.


Come nota in calce: Se si guarda alla the source per Future.traverse vedrai che è implementato in termini di foldLeft, che è conveniente, ma non è necessario o opportuno, nel caso di corsi d'acqua.

+0

FYI, Scala 2.9.3 contiene scala.concurrent –

+0

@ViktorKlang: Giusto, e queste istanze [stanno arrivando al nucleo di Scalaz] (https://github.com/scalaz/scalaz/wiki/Roadmap#sip-14) presto, ma per quanto ne so non c'è ancora una cronologia concreta. –

+0

Fantastico, mi sembra una buona idea –

2

Dimenticando Stream:

import scala.concurrent.Future 
import ExecutionContext.Implicits.global 

val x = 1 to 10 toList 
def toFutureString(value : Int) = Future { 
    println("starting " + value) 
    Thread.sleep(1000) 
    println("completed " + value) 
    value.toString 
} 

rendimenti (sulla mia macchina a 8 core):

scala> Future.traverse(x)(toFutureString) 
starting 1 
starting 2 
starting 3 
starting 4 
starting 5 
starting 6 
starting 7 
starting 8 
res12: scala.concurrent.Future[List[String]] = [email protected] 

scala> completed 1 
completed 2 
starting 9 
starting 10 
completed 3 
completed 4 
completed 5 
completed 6 
completed 7 
completed 8 
completed 9 
completed 10 

Così 8 di loro si è iniziato subito (uno per ogni core, anche se questo è configurabile tramite il threadpool executor), e poi mentre quelli completi ne vengono avviati altri. The Future [List [String]] ritorna immediatamente, quindi dopo una pausa inizia a stampare quei messaggi "completati x".

Un utilizzo di esempio di questo potrebbe essere quando si ha una lista [Url's], e una funzione di tipo Url => Future [HttpResponseBody]. Puoi chiamare Future.traverse in quella lista con quella funzione e dare il via a quelle richieste http in parallelo, recuperando un singolo futuro che è una lista dei risultati.

Era qualcosa di simile a quello che stavi cercando?

+0

Immagino che "non volendo che gli elementi del flusso siano valutati con entusiasmo" sembra essere in disaccordo con il parallelismo, quando quegli elementi sono l'input per il compito che dà il via ai Futures. Come vuoi che il flusso sia consumato/valutato? – pdxleif