Sebbene sia possibile utilizzare uno Sink.foreach
per ottenere questo risultato (come menzionato da Ramon), è più sicuro e probabilmente più veloce (eseguendo gli inserimenti in parallelo) per utilizzare mapAsync
Flow
. Il problema che dovrai affrontare usando Sink.foreach
è che non ha un valore di ritorno. Inserimento in un database tramite slicks Il metodo db.run
restituisce un valore Future
che quindi scappa dai vapori restituiti Future[Done]
che termina non appena Sink.foreach
termina.
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
D'altra parte il def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
consente di eseguire gli inserti in parallelo tramite il paramerter parallelismo e accetta una funzione dal valore a monte ad un futuro di qualche tipo. Questo corrisponde alla nostra funzione i => db.run(numbers += i)
. Il bello di questo Flow
è che alimenta quindi il risultato di questi Futures
downstream.
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
Per dimostrare il punto si può anche restituire un risultato reale dal flusso piuttosto che un Future[Done]
(Con Fatto Unità che rappresenta).Questo stream aggiungerà anche un valore di parallelismo più elevato e il batching per prestazioni extra. *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- Nota: Probabilmente non si vedrà una migliore performance per una tale piccolo insieme di dati, ma quando avevo a che fare con un 1.7M inserire ero in grado di ottenere le migliori prestazioni sulla mia macchina con una dimensione batch di 1000 e valore di parallelismo di 8, localmente con postgresql. Questo era circa il doppio di quello che non funzionava in parallelo. Come sempre quando si ha a che fare con le prestazioni, i risultati possono variare e si dovrebbe misurare da soli.
fonte
2017-08-01 14:00:35