2016-07-07 51 views
7

Sto cercando di implementare il lato di lettura nella mia architettura ES-CQRS. Diciamo che ho un attore persistente come questo:Stream eventi query persistenza Akka e CQRS

object UserWrite { 

    sealed trait UserEvent 
    sealed trait State 
    case object Uninitialized extends State 
    case class User(username: String, password: String) extends State 
    case class AddUser(user: User) 
    case class UserAdded(user: User) extends UserEvent 
    case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) 
    case class UsersStream(fromSeqNo: Long) 
    case object GetCurrentUser 

    def props = Props(new UserWrite) 
} 

class UserWrite extends PersistentActor { 

    import UserWrite._ 

    private var currentUser: State = Uninitialized 

    override def persistenceId: String = "user-write" 

    override def receiveRecover: Receive = { 
    case UserAdded(user) => currentUser = user 
    } 

    override def receiveCommand: Receive = { 
    case AddUser(user: User) => persist(UserAdded(user)) { 
     case UserAdded(`user`) => currentUser = user 
    } 
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) 
    case GetCurrentUser => sender() ! currentUser 
    } 

    def publishUserEvents(fromSeqNo: Long) = { 
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    val userEvents = readJournal 
     .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) 
     .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } 
    sender() ! UserEvents(userEvents) 
    } 
} 

Per quanto ho capito, ogni volta evento viene persisteva, possiamo pubblicarlo tramite Akka Persistence Query. Ora, non sono sicuro di quale sarebbe il modo corretto di iscriversi su questi eventi, così posso mantenerlo nel mio database di lettura? Una delle idee è di inviare inizialmente un messaggio UsersStream dal mio lettore secondario di lettura a UserWriteattore e gli eventi "sink" in quell'attore letto.

EDIT

Seguendo il suggerimento di @cmbaxter, ho implementato Leggi Side in questo modo:

object UserRead { 

    case object GetUsers 
    case class GetUserByUsername(username: String) 
    case class LastProcessedEventOffset(seqNo: Long) 
    case object StreamCompleted 

    def props = Props(new UserRead) 
} 

class UserRead extends PersistentActor { 
    import UserRead._ 

    var inMemoryUsers = Set.empty[User] 
    var offset  = 0L 

    override val persistenceId: String = "user-read" 

    override def receiveRecover: Receive = { 
    // Recovery from snapshot will always give us last sequence number 
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo 
    case RecoveryCompleted         => recoveryCompleted() 
    } 

    // After recovery is being completed, events will be projected to UserRead actor 
    def recoveryCompleted(): Unit = { 
    implicit val materializer = ActorMaterializer() 
    PersistenceQuery(context.system) 
     .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
     .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) 
     .map { 
     case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event 
     } 
     .runWith(Sink.actorRef(self, StreamCompleted)) 
    } 

    override def receiveCommand: Receive = { 
    case GetUsers     => sender() ! inMemoryUsers 
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) 
    // Match projected event and update offset 
    case (seqNo: Long, UserAdded(user)) => 
     saveSnapshot(LastProcessedEventOffset(seqNo)) 
     inMemoryUsers += user 
    } 
} 

Ci sono alcuni problemi come: flusso di eventi sembra essere lento. Cioè L'attore UserRead può rispondere con un gruppo di utenti prima che l'utente appena aggiunto venga salvato.

EDIT 2

ho aumentato intervallo di aggiornamento di cassandra ufficiale domanda che più meno risolto problema con flusso evento lento. Sembra che il diario degli eventi di Cassandra sia per impostazione predefinita, essendo interrogato ogni 3 secondi. Nel mio application.conf ho aggiunto:

cassandra-query-journal { 
    refresh-interval = 20ms 
} 

EDIT 3

In realtà, non diminuisce l'intervallo di aggiornamento. Ciò aumenterà l'utilizzo della memoria, ma non è pericoloso, né un punto. In generale, il concetto di CQRS è che i lati di scrittura e lettura sono asincroni. Pertanto, dopo aver scritto i dati non saranno mai disponibili immediatamente per la lettura. Trattare con l'interfaccia utente? Io apro il flusso e spingo i dati tramite eventi inviati dal server dopo che il lato di lettura li ha riconosciuti.

+2

vorrei solo spostare il codice di base di lettura rivista nel vostro attore risalto laterale lettura invece di inviare un messaggio con un 'fonte fissa su di esso. Quindi elaborare quel flusso in quell'attore di proiezione lato lettura e proiettare tali informazioni in Elasticsearch. – cmbaxter

+0

@cmbaxter L'ho fatto. Sembra essere un'ottima idea. Ho aggiornato la mia domanda e continuo ad accettare suggerimenti poiché ho ancora dei dubbi. –

risposta

4

Ci sono alcuni modi per farlo. Ad esempio, nella mia app ho un attore nel mio lato di query che ha PersistenceQuery che è costantemente alla ricerca di modifiche, ma puoi anche avere un thread con la stessa query. La cosa è di mantenere la corrente aperto per essere in grado di leggere l'evento persistente non appena accade

val readJournal = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
    CassandraReadJournal.Identifier) 

// issue query to journal 
val source: Source[EventEnvelope, NotUsed] = 
    readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) 

// materialize stream, consuming events 
implicit val mat = ActorMaterializer() 
source.map(_.event).runForeach{ 
    case userEvent: UserEvent => { 
    doSomething(userEvent) 
    } 
} 

Invece di questo, si può avere un timer che solleva una PersistenceQuery e memorizza i nuovi eventi, ma penso che avendo un flusso aperto è il modo migliore

2

Anche se la soluzione con PersistenceQuery solo è stato approvato, contiene i seguenti problemi:

  1. si è parziale, c'è un solo modo di leggere EventEnvelopes presentati.
  2. Non può funzionare con le istantanee di stato e, come risultato, la parte Lettore CQRS deve superare tutti gli eventi persistenti persistenti.

La prima soluzione è migliore, ma ha i seguenti problemi:

  1. è troppo complicato. Causa all'utente non necessario gestire i numeri di sequenza.
  2. Il codice si occupa dello stato (query/aggiornamento) troppo accoppiato con l'implementazione degli attori.

C'è esiste più semplice uno:

import akka.NotUsed 
import akka.actor.{Actor, ActorLogging} 
import akka.persistence.query.{EventEnvelope, PersistenceQuery} 
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal} 
import akka.persistence._ 
import akka.stream.ActorMaterializer 
import akka.stream.javadsl.Source 

/** 
    * Created by alexv on 4/26/2017. 
    */ 
class CQRSTest { 

    // User Command, will be transformed to User Event 
    sealed trait UserCommand 
    // User Event 
    // let's assume some conversion from Command to event here 
    case class PersistedEvent(command: UserCommand) extends Serializable 
    // User State, for simplicity assumed that all State will be snapshotted 
    sealed trait State extends Serializable{ 
    def clear(): Unit 
    def updateState(event: PersistedEvent): Unit 
    def validateCommand(command:UserCommand): Boolean 
    def applyShapshot(newState: State): Unit 
    def getShapshot() : State 
    } 
    case class SaveSnapshot() 

    /** 
    * Common code for Both reader and writer 
    * @param state - State 
    */ 
    abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging { 
    override def persistenceId: String = "CQRSPersistenceId" 

    override def preStart(): Unit = { 
     // Since the state is external and not depends to Actor's failure or restarts it should be cleared. 
     state.clear() 
    } 

    override def receiveRecover: Receive = { 
     case event : PersistedEvent => state.updateState(event) 
     case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot) 
     case RecoveryCompleted => onRecoveryCompleted(super.lastSequenceNr) 
    } 

    abstract def onRecoveryCompleted(lastSequenceNr:Long) 
    } 

    class CQRSWriter(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSWriter Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed") 
    } 

    override def receiveCommand: Receive = { 
     case command: UserCommand => 
     if(state.validateCommand(command)) { 
      // Persist events and call state.updateState with each persisted event 
      persistAll(List(PersistedEvent(command)))(state.updateState) 
     } 
     else { 
      log.error("Validation Failed for Command: {}", command) 
     } 
     case SaveSnapshot => saveSnapshot(state.getShapshot()) 
     case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata) 
     case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason) 
    } 
    } 

    class CQRSReader(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSReader Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed, Starting QueryStream") 

     // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests) 
     val readJournal = PersistenceQuery(context.system).readJournalFor(
     context.system.settings.config.getString("akka.persistence.query.my-read-journal")) 
     .asInstanceOf[ReadJournal 
     with EventsByPersistenceIdQuery] 
     val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
     OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue) 
     source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer()) 

    } 

    // Nothing received since it is Reader only 
    override def receiveCommand: Receive = Actor.emptyBehavior 
    } 
} 
+0

La parte CQRSRead presupposta deve essere interrogata direttamente dal suo stato. CQRSReader si assicura che lo stato sia simile a quello di CQRSWriter. Non ho implementato lo stato Concrete qui, ma può essere qualsiasi cosa, dalla semplice Hash Map fino al DB grafico in memoria –