2016-04-04 31 views
7

Negli esempi Slick's documentation per l'utilizzo dei flussi reattivi vengono presentati solo per la lettura dei dati come mezzo di un DatabasePublisher. Ma cosa succede quando si desidera utilizzare il database come sink e backpreasure in base alla frequenza di inserimento?Come vengono utilizzati i flussi reattivi in ​​Slick per l'inserimento dei dati

Ho cercato l'equivalente DatabaseSubscriber ma non esiste. Quindi la domanda è, se ho una Fonte, dire:

val source = Source(0 to 100)

come posso crete un lavandino con chiazza di petrolio che scrive quei valori in una tabella con schema:

create table NumberTable (value INT)

risposta

7

Inserti seriali

Il modo più semplice sarebbe quello di fare inserts all'interno di un Sink.foreach.

Supponendo che hai utilizzato il schema code generation e ulteriormente supponendo che il tavolo si chiama "NumberTable"

//Tables file was auto-generated by the schema code generation 
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig" 

possiamo scrivere una funzione che fa l'inserimento

def insertIntoDb(num : Int) = 
    numberTableDB run (Numbertable += NumbertableRow(num)) 

e che la funzione può essere collocato nel lavandino

val insertSink = Sink[Int] foreach insertIntoDb 

Source(0 to 100) runWith insertSink 

Ba tched Inserti

Si potrebbe estendere ulteriormente la metodologia Sink da inserti dosaggio N alla volta:

def batchInsertIntoDb(nums : Seq[Int]) = 
    numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply)) 

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb 

Questo batch Sink può essere alimentato da un Flow che fa il raggruppamento batch:

val batchSize = 10 

Source(0 to 100).via(Flow[Int].grouped(batchSize)) 
       .runWith(batchInsertSink) 
2

Sebbene sia possibile utilizzare uno Sink.foreach per ottenere questo risultato (come menzionato da Ramon), è più sicuro e probabilmente più veloce (eseguendo gli inserimenti in parallelo) per utilizzare mapAsyncFlow. Il problema che dovrai affrontare usando Sink.foreach è che non ha un valore di ritorno. Inserimento in un database tramite slicks Il metodo db.run restituisce un valore Future che quindi scappa dai vapori restituiti Future[Done] che termina non appena Sink.foreach termina.

implicit val system = ActorSystem("system") 
implicit val materializer = ActorMaterializer() 

class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") { 
    def value = column[Int]("value") 
    def * = value 
} 

val numbers = TableQuery[Numbers] 

val db = Database.forConfig("postgres") 
Await.result(db.run(numbers.schema.create), Duration.Inf) 

val streamFuture: Future[Done] = Source(0 to 100) 
    .runWith(Sink.foreach[Int] { (i: Int) => 
    db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done")) 
    }) 
Await.result(streamFuture, Duration.Inf) 
println("stream 1 done") 

//// sample 1 output: //// 
// stream 1 insert 1 done 
// ... 
// stream 1 insert 99 done 
// stream 1 done <-- stream Future[Done] returned before inserts finished 
// stream 1 insert 100 done 

D'altra parte il def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])Flow consente di eseguire gli inserti in parallelo tramite il paramerter parallelismo e accetta una funzione dal valore a monte ad un futuro di qualche tipo. Questo corrisponde alla nostra funzione i => db.run(numbers += i). Il bello di questo Flow è che alimenta quindi il risultato di questi Futures downstream.

val streamFuture2: Future[Done] = Source(0 to 100) 
    .mapAsync(1) { (i: Int) => 
    db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r } 
    } 
    .runWith(Sink.ignore) 
Await.result(streamFuture2, Duration.Inf) 
println("stream 2 done") 

//// sample 2 output: //// 
// stream 2 insert 1 done 
// ... 
// stream 2 insert 100 done 
// stream 1 done <-- stream Future[Done] returned after inserts finished 

Per dimostrare il punto si può anche restituire un risultato reale dal flusso piuttosto che un Future[Done] (Con Fatto Unità che rappresenta).Questo stream aggiungerà anche un valore di parallelismo più elevato e il batching per prestazioni extra. *

val streamFuture3: Future[Int] = Source(0 to 100) 
    .via(Flow[Int].grouped(10)) // Batch in size 10 
    .mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count 
    .runWith(Sink.fold(0)(_+_)) // count all inserts and return total 
val rowsInserted = Await.result(streamFuture3, Duration.Inf) 
println(s"stream 3 done, inserted $rowsInserted rows") 

// sample 3 output: 
// stream 3 done, inserted 101 rows 
  • Nota: Probabilmente non si vedrà una migliore performance per una tale piccolo insieme di dati, ma quando avevo a che fare con un 1.7M inserire ero in grado di ottenere le migliori prestazioni sulla mia macchina con una dimensione batch di 1000 e valore di parallelismo di 8, localmente con postgresql. Questo era circa il doppio di quello che non funzionava in parallelo. Come sempre quando si ha a che fare con le prestazioni, i risultati possono variare e si dovrebbe misurare da soli.