5

Sto progettando un sistema che dovrebbe analizzare un numero elevato di transazioni utente e produrre misure aggregate (come le tendenze e così via). Il sistema dovrebbe funzionare velocemente, essere robusto e scalabile. Il sistema è basato su Java (su Linux).Progettazione del sistema di elaborazione analitica in tempo reale

I dati provengono da un sistema che genera file di registro (CSV based) di transazioni utente. Il sistema genera un file ogni minuto e ogni file contiene le transazioni di diversi utenti (ordinati per ora), ogni file può contenere migliaia di utenti.

Una struttura di dati di esempio per un file CSV:

10: 30: 01, 1 utente, ...
10: 30: 01, 1 utente, ...
10:30:02 , l'utente 78, ...
10: 30: 02, l'utente 2, ...
10: 30: 03, 1 utente, ...
10: 30: 04, l'utente 2, ...
. . .

Il sistema che sto pianificando dovrebbe elaborare i file ed eseguire alcune analisi in tempo reale. Deve raccogliere l'input, inviarlo a diversi algoritmi e altri sistemi e memorizzare i risultati calcolati in un database. Il database non contiene i record di input effettivi ma solo un'analisi aggregata di alto livello sulle transazioni. Ad esempio tendenze ed ecc.

Il primo algoritmo che sto pianificando di utilizzare richiede il funzionamento ottimale di almeno 10 record utente, se non riesce a trovare 10 record dopo 5 minuti, dovrebbe utilizzare i dati sempre disponibili.

Vorrei utilizzare Storm per l'implementazione, ma preferirei lasciare la discussione a livello di progettazione il più possibile.

un elenco dei componenti del sistema:

  1. Un compito che controlla i file in ingresso ogni minuto.

  2. Un'attività che legge il file, lo analizza e lo rende disponibile per altri componenti di sistema e algoritmi.

  3. Un componente per memorizzare 10 record per un utente (non più di 5 minuti), quando vengono raccolti 10 record o trascorsi 5 minuti, è il momento di inviare i dati all'algoritmo per l'ulteriore elaborazione. Poiché il requisito è quello di fornire almeno 10 record per l'algoritmo, ho pensato di utilizzare Storm Field Grouping (che significa che la stessa attività viene chiamata per lo stesso utente) e tracciare la raccolta di 10 record dell'utente all'interno dell'attività, ovviamente io piano di avere molte di queste attività, ciascuna gestisce una parte degli utenti.

  4. Ci sono altri componenti che funzionano su una singola transazione, per loro ho intenzione di creare altre attività che ricevono ogni transazione mentre viene analizzata (in parallelo ad altre attività).

Ho bisogno del vostro aiuto con # 3.

Quali sono le migliori pratiche per la progettazione di tale componente? È ovvio che è necessario mantenere i dati per 10 record per utente. Una mappa dei valori chiave può essere d'aiuto, è meglio avere la mappa gestita nell'attività stessa o utilizzando una cache distribuita? Ad esempio Redis un key value store (non l'ho mai usato prima).

Grazie per il vostro aiuto

risposta

5

Avevo lavorato con i rossi un po '. Quindi, io commento sul tuo pensiero di usare Redis

# 3 ha 3 requisiti

  1. Buffer per utente

  2. buffer per 10 Compiti

  3. deve scadere ogni 5 minuti

1. Buffer P er Utente: Redis è solo un negozio di valore chiave. Sebbene supporti un'ampia varietà di datatypes, sono sempre valori associati a un tasto STRING. Pertanto, è necessario decidere come identificare un utente in modo univoco in caso sia necessario disporre di un buffer per utente. Perché in redis non otterrai mai un errore quando sostituisci un nuovo valore chiave. Una soluzione potrebbe essere controllare l'esistenza prima di scrivere.

2. Buffer per 10 Compiti: È ovviamente possibile implementare uno queue in redis. Ma restringere le sue dimensioni è lasciato a te. Ad esempio: utilizzando LPUSH e LTRIM o utilizzando LLEN per verificare la lunghezza e decidere se attivare il processo. La chiave associata a questa coda dovrebbe essere quella che hai deciso nella parte 1.

3. Il buffer scade tra 5 minuti: Questo è un compito più difficile. In rosso, ogni chiave indipendentemente dal tipo di dati sottostante ha un valore, può avere un expiry. Ma il processo di scadenza è silenzioso. Non riceverai una notifica alla scadenza di una chiave. Quindi, perderai silenziosamente il tuo buffer se usi questa proprietà. Uno si preoccupa di questo, avendo un indice. Significa che l'indice eseguirà il mapping di un timestamp alle chiavi che devono essere tutte scadute con quel valore di timestamp. Quindi in background è possibile leggere l'indice ogni minuto e cancellare manualmente il tasto [dopo averlo letto] dai numeri rossi e chiamare il processo desiderato con i dati del buffer. Per avere un tale indice puoi guardare a Sorted Sets. Laddove il timestamp sarà il tuo score e impostato member saranno le chiavi [chiave univoca per utente decisa nella parte 1 che esegue il mapping a una coda] che desideri eliminare a quel timestamp. Si può fare zrangebyscore per leggere tutti i membri del set con data e ora specificato

Globale:

Usa elenco Redis per implementare una coda.

Utilizzare LLEN per assicurarsi di non superare il limite di 10.

Ogni volta che si crea un nuovo elenco, immettere una voce nell'indice [Set ordinato] con Punteggio come Current Timestamp + 5 min e Valore come chiave dell'elenco.

Quando LLEN raggiunge 10, ricordarsi di leggere, quindi rimuovere la chiave dall'indice [set ordinato] e dal db [eliminare l'elenco tasti->]. Quindi attiva il tuo processo con i dati.

Per ogni minuto, generare la data/ora corrente, leggere l'indice e per ogni chiave, leggere i dati, quindi rimuovere la chiave da db e attivare il processo.

Questo potrebbe essere il mio modo di implementarlo. Ci potrebbe essere qualche altro modo migliore per modellare i dati in Redis

0

Per le tue esigenze 1 & 2: [Apache Flume o Kafka]

Per il vostro requisito # 3: [Esper Bolt all'interno Storm. In Redis per realizzare ciò dovrai riscrivere la logica Esper.]