2016-04-24 21 views
14

Il mio programma basato su Hazelcast può funzionare in due modalità: submitter e worker.Weird Hazelcat IMap # put behavior()

Inviato pone alcuni POJO alla mappa distribuito da qualche chiave, es .: hazelcastInstance.getMap(MAP_NAME).put(key, value);

lavoratore ha un ciclo infinito (con Thread.sleep(1000L); all'interno per timeout) che deve processare entità da carta. Per ora sto solo stampando le dimensioni della mappa in questo ciclo.

Ora ecco il problema. Avvio app worker Quindi avvio simultaneamente quattro submitter (ognuno aggiunge una voce alla mappa e termina il suo lavoro). Ma dopo che tutte le app di submitter sono state eseguite, l'app worker stampa delle dimensioni arbitrarie: a volte rileva che è stata aggiunta una sola voce, a volte due, a volte tre (in realtà non ha mai visto tutte e quattro le voci).

Qual è il problema con questo flusso semplice? Ho letto in documenti Hazelcast che il metodo put() è sincrono, quindi garantisce che dopo il suo ritorno, la voce viene posizionata sulla mappa distribuita e viene replicata. Ma non sembra così nel mio esperimento.

UPD (codice)

Inviato da:

public void submit(String key) { 
    Object mySerializableObject = ... 
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); 
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS); 
} 

Worker:

public void process() { 
    while (true) { 
     IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); 
     System.out.println(map.size()); 

     // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess(); 
     // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess)); 
     try { 
      Thread.sleep(PAUSE); 
     } catch (InterruptedException e) { 
      LOGGER.error(e.getMessage(), e); 
     } 
    } 
} 

ho commentato fuori "trattamento" parte se stesso, perché ora sto solo cercando di ottenere stato coerente della mappa. Il codice sopra stampa ogni volta risultati diversi, ad esempio: "4, 3, 1, 1, 1, 1 ..." (quindi può anche vedere 4 attività inviate per un momento, ma poi ... scompaiono) .

UPD (log)

Worker:

... 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 1 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
... 

Inviato 1:

Before: tasksMap.size() = 0 
After: tasksMap.size() = 1 

Inviato 2:

Before: tasksMap.size() = 1 
After: tasksMap.size() = 4 

Submi tter 3:

Before: tasksMap.size() = 1 
After: tasksMap.size() = 2 

Inviato 4:

Before: tasksMap.size() = 3 
After: tasksMap.size() = 4 
+0

Il metodo IMap :: size è una stima, comunque dovrebbe stabilizzarsi alla fine. Puoi condividere qualche altro codice? – noctarius

+0

@noctarius, ho aggiornato la domanda. –

+0

Il tuo codice utilizza membri incorporati e si fermano effettivamente dopo aver inviato il valore? Potevo immaginare che i membri lasciassero rapidamente il cluster per soddisfare i requisiti di riserva in caso di ferie. – noctarius

risposta

7

Beh, credo, non ho capito il problema. Per quanto ho capito, distribuito IMap restituito da hazelcastInstance.getMap non garantisce che i dati vengano replicati su tutti i nodi esistenti nel cluster: alcune porzioni di dati possono essere replicate su alcuni nodi, un'altra porzione - su altri nodi. Ecco perché nel mio esempio alcune delle attività inviate sono state replicate non al nodo worker (che funziona perennemente), ma ad alcuni altri submitter, che terminano la loro esecuzione dopo l'invio. Quindi tali voci sono state perse all'uscita dei submitter.

Ho risolto questo problema sostituendo hazelcastInstance.getMap in hazelcastInstance.getReplicatedMap.Questo metodo restituisce ReplicatedMap, che, AFAIK, garantisce che le voci inserite in esso verranno replicate in tutti i nodi del cluster. Quindi ora tutto funziona bene nel mio sistema.