2014-09-17 3 views
6

OK, inizierò con un caso d'uso elaborato e spiegherà la mia domanda:Spiegare Kinesis Shard Iterator - AWS Java SDK

  1. Io uso una piattaforma di analisi web 3rd party che utilizza AWS Kinesis flussi per passare i dati dal client alla destinazione finale - un flusso Kinesis;
  2. La piattaforma di analisi Web utilizza 2 flussi:
    1. Un flusso di raccolta dati (flusso shard singolo);
    2. Un secondo flusso per arricchire i dati grezzi dal flusso del raccoglitore (singolo flusso di shard); Ancora più importante, questo flusso consuma i dati grezzi dal primo stream utilizzando il tipo di iteratore TRIM_HORIZON;
  3. mi consumano i dati dal flusso utilizzando AWS Java SDK, secifically utilizzando la classe GetShardIteratorRequest;
  4. Attualmente sto sviluppando la classe di estrazione, quindi questo viene fatto in modo sincrono, nel senso che consumo i dati solo quando compilo la mia classe;
  5. La classe funziona sorprendentemente, anche se ci sono alcune cose che non riesco a capire, in particolare rispetto a come i dati vengono consumati dal flusso e il significato di ognuno dei tipi di iteratore;

Il mio problema è che i dati a recuperare è incoerente e non ha alcuna logica cronologica in esso.

  • Quando uso AT_SEQUENCE_NUMBER e fornire il primo numero di sequenza dal frammento con

    .getSequenceNumberRange() getStartingSequenceNumber().;

    ... come ``, non ricevo tutti i record. Allo stesso modo, AFTER_SEQUENCE_NUMBER;

  • Quando uso LATEST, ottengo zero risultati;
  • Quando uso TRIM_HORIZON, che dovrebbe avere senso da usare, non sembra funzionare correttamente. Prima mi forniva i dati, quindi ho aggiunto nuovi "eventi" (record al flusso finale) e ho ricevuto zero record. Mistero.

Le mie domande sono:

  1. Come posso tranquillamente consumare dati dal flusso, senza doversi preoccupare di record perse?
  2. Esiste un'alternativa allo ShardIteratorRequest?
  3. Se c'è, come posso "sfogliare" il flusso e vedere cosa c'è dentro per i riferimenti di debug?
  4. Cosa mi manca con il metodo TRIM_HORIZON?

Grazie in anticipo, mi piacerebbe davvero imparare un po 'di più sul consumo di dati da un flusso Kinesis.

+0

Anche io ho problemi simili, anche se per me ottengo record duplicati su ogni iterazione (utilizzando sia AT_SEQUENCE_NUMBER che FROM_SEQUENCE_NUMBER), nonostante si utilizzi il valore NextShardIterator di ogni risposta. I documenti sono piuttosto criptici su questo argomento .... Mi piacerebbe anche sapere cosa significa "non modificato" (w.r.t TRIM_HORIZON). – Erve1879

+0

Per la cronaca, ho fatto qualcosa di diverso nel frattempo: ho preso un consumatore Scala esistente che ascolta continuamente lo streaming e lo ha semplicemente riportato su Java puro per i miei scopi. Ecco l'app Scala, originariamente sviluppata da SnowPlow https://github.com/snowplow/kinesis-example-scala-consumer – YuvalHerziger

+0

Purtroppo, non sono java-friendly .....! Spero solo che ci siano state regole indipendenti dall'apprendimento linguistico su come garantire l'idempotenza e la "copertura" al 100% dei record, consentendo al contempo il riavvio, il crash, ecc. Dei consumatori. Sembra negare lo scopo di Kinesis se dobbiamo salvare e verificare rispetto al SequenceNumber di tutti i record recuperati in precedenza per garantire la non duplicazione. Sono sicuro che mi manca qualcosa però ....... – Erve1879

risposta

0

Capisco la confusione di cui sopra, e ho avuto gli stessi problemi, ma penso di aver capito ora. Si noti che sto usando lo JSON API direttamente senza KCL.

mi sembra che l'API offre ai clienti 2 scelte fondamentali della iteratori quando cominciano consumando un torrente:

A) TRIM_HORIZON: per la lettura di record passato ritardo tra molti minuti (anche ore) e 24 ore di vita. Non restituisce i record inseriti di recente. L'utilizzo di AFTER_SEQUENCE_NUMBER sull'ultimo record visto da questo iteratore restituisce un array vuoto anche quando i record sono stati recentemente PUT.

B) LATEST: per leggere i record FUTURE in tempo reale (immediatamente dopo che sono PUT). Sono stato ingannato dall'unica frase di documentazione che ho trovato su questo argomento "Inizia a leggere subito dopo il record più recente nel frammento, in modo da leggere sempre i dati più recenti nel frammento." Stavi ottenendo un array vuoto perché nessun record era stato PUT da quando ha ottenuto l'iteratore. Se si ottiene questo tipo di iteratore e quindi si inserisce un record, tale record sarà immediatamente disponibile.

Infine, se si conosce l'ID sequenza di un record inserito di recente, è possibile ottenerlo immediatamente utilizzando AT_SEQUENCE_NUMBER e si possono ottenere record successivi utilizzando AFTER_SEQUENCE_NUMBER anche se non appaiono su un iteratore TRIM_HORIZON.

Quanto sopra significa che se si desidera leggere tutti i record passati e futuri registrati in tempo reale, è necessario utilizzare una combinazione di A e B, con la logica per far fronte ai record tra (il passato recente) . Il KCL potrebbe andare bene per questo.