OK, inizierò con un caso d'uso elaborato e spiegherà la mia domanda:Spiegare Kinesis Shard Iterator - AWS Java SDK
- Io uso una piattaforma di analisi web 3rd party che utilizza AWS Kinesis flussi per passare i dati dal client alla destinazione finale - un flusso Kinesis;
- La piattaforma di analisi Web utilizza 2 flussi:
- Un flusso di raccolta dati (flusso shard singolo);
- Un secondo flusso per arricchire i dati grezzi dal flusso del raccoglitore (singolo flusso di shard); Ancora più importante, questo flusso consuma i dati grezzi dal primo stream utilizzando il tipo di iteratore
TRIM_HORIZON
;
- mi consumano i dati dal flusso utilizzando AWS Java SDK, secifically utilizzando la classe
GetShardIteratorRequest
; - Attualmente sto sviluppando la classe di estrazione, quindi questo viene fatto in modo sincrono, nel senso che consumo i dati solo quando compilo la mia classe;
- La classe funziona sorprendentemente, anche se ci sono alcune cose che non riesco a capire, in particolare rispetto a come i dati vengono consumati dal flusso e il significato di ognuno dei tipi di iteratore;
Il mio problema è che i dati a recuperare è incoerente e non ha alcuna logica cronologica in esso.
Quando uso
AT_SEQUENCE_NUMBER
e fornire il primo numero di sequenza dal frammento con.getSequenceNumberRange() getStartingSequenceNumber().;
... come ``, non ricevo tutti i record. Allo stesso modo,
AFTER_SEQUENCE_NUMBER
;- Quando uso
LATEST
, ottengo zero risultati; - Quando uso
TRIM_HORIZON
, che dovrebbe avere senso da usare, non sembra funzionare correttamente. Prima mi forniva i dati, quindi ho aggiunto nuovi "eventi" (record al flusso finale) e ho ricevuto zero record. Mistero.
Le mie domande sono:
- Come posso tranquillamente consumare dati dal flusso, senza doversi preoccupare di record perse?
- Esiste un'alternativa allo
ShardIteratorRequest
? - Se c'è, come posso "sfogliare" il flusso e vedere cosa c'è dentro per i riferimenti di debug?
- Cosa mi manca con il metodo
TRIM_HORIZON
?
Grazie in anticipo, mi piacerebbe davvero imparare un po 'di più sul consumo di dati da un flusso Kinesis.
Anche io ho problemi simili, anche se per me ottengo record duplicati su ogni iterazione (utilizzando sia AT_SEQUENCE_NUMBER che FROM_SEQUENCE_NUMBER), nonostante si utilizzi il valore NextShardIterator di ogni risposta. I documenti sono piuttosto criptici su questo argomento .... Mi piacerebbe anche sapere cosa significa "non modificato" (w.r.t TRIM_HORIZON). – Erve1879
Per la cronaca, ho fatto qualcosa di diverso nel frattempo: ho preso un consumatore Scala esistente che ascolta continuamente lo streaming e lo ha semplicemente riportato su Java puro per i miei scopi. Ecco l'app Scala, originariamente sviluppata da SnowPlow https://github.com/snowplow/kinesis-example-scala-consumer – YuvalHerziger
Purtroppo, non sono java-friendly .....! Spero solo che ci siano state regole indipendenti dall'apprendimento linguistico su come garantire l'idempotenza e la "copertura" al 100% dei record, consentendo al contempo il riavvio, il crash, ecc. Dei consumatori. Sembra negare lo scopo di Kinesis se dobbiamo salvare e verificare rispetto al SequenceNumber di tutti i record recuperati in precedenza per garantire la non duplicazione. Sono sicuro che mi manca qualcosa però ....... – Erve1879