Se la condizione di arresto è "sulla parte esterna del torrente"
C'è un avanzato building-block chiamato KillSwitch
che si potrebbe usare per fare questo: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html Il flusso sarebbe venga chiuso una volta che il kill switch è notificato.
Ha metodi come abort(reason)
/shutdown
ecc, vedi qui per la sua API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html
Documentazione di riferimento è qui: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala
esempio d'uso potrebbe essere:
val countingSrc = Source(Stream.from(1)).delay(1.second,
DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
Se la condizione di arresto è all'interno dello stream
È possibile utilizzare lo stato takeWhile
per esprimere qualsiasi condizione, anche se a volte lo take
o lo limit
può essere anche sufficiente "prendere 10 lenze".
Se la logica è molto avanzata, si potrebbe costruire una prova speciale che gestisce la logica speciale utilizzando statefulMapConcat
che permette di esprimere letteralmente qualsiasi cosa - così si potrebbe completare il flusso ogni volta che si vuole "dal di dentro".
fonte
2016-07-12 10:20:34
Se la condizione è basata sul contenuto del flusso, 'Source.takewhile' (http://doc.akka.io/api/akka/2.4.8/index.html#[email protected] (p: Out => Boolean): FlowOps.this.Repr [Out]) dovrebbe funzionare. – devkat