Sto cercando di implementare il lato di lettura nella mia architettura ES-CQRS. Diciamo che ho un attore persistente come questo:Stream eventi query persistenza Akka e CQRS
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
Per quanto ho capito, ogni volta evento viene persisteva, possiamo pubblicarlo tramite Akka Persistence Query
. Ora, non sono sicuro di quale sarebbe il modo corretto di iscriversi su questi eventi, così posso mantenerlo nel mio database di lettura? Una delle idee è di inviare inizialmente un messaggio UsersStream
dal mio lettore secondario di lettura a UserWrite
attore e gli eventi "sink" in quell'attore letto.
EDIT
Seguendo il suggerimento di @cmbaxter, ho implementato Leggi Side in questo modo:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
Ci sono alcuni problemi come: flusso di eventi sembra essere lento. Cioè L'attore UserRead
può rispondere con un gruppo di utenti prima che l'utente appena aggiunto venga salvato.
EDIT 2
ho aumentato intervallo di aggiornamento di cassandra ufficiale domanda che più meno risolto problema con flusso evento lento. Sembra che il diario degli eventi di Cassandra sia per impostazione predefinita, essendo interrogato ogni 3 secondi. Nel mio application.conf
ho aggiunto:
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 3
In realtà, non diminuisce l'intervallo di aggiornamento. Ciò aumenterà l'utilizzo della memoria, ma non è pericoloso, né un punto. In generale, il concetto di CQRS è che i lati di scrittura e lettura sono asincroni. Pertanto, dopo aver scritto i dati non saranno mai disponibili immediatamente per la lettura. Trattare con l'interfaccia utente? Io apro il flusso e spingo i dati tramite eventi inviati dal server dopo che il lato di lettura li ha riconosciuti.
vorrei solo spostare il codice di base di lettura rivista nel vostro attore risalto laterale lettura invece di inviare un messaggio con un 'fonte fissa su di esso. Quindi elaborare quel flusso in quell'attore di proiezione lato lettura e proiettare tali informazioni in Elasticsearch. – cmbaxter
@cmbaxter L'ho fatto. Sembra essere un'ottima idea. Ho aggiornato la mia domanda e continuo ad accettare suggerimenti poiché ho ancora dei dubbi. –