2011-09-12 12 views
6

Ho bisogno di un generatore di byte che generi valori da Byte.MIN_VALUE a Byte.MAX_VALUE. Quando raggiunge MAX_VALUE, dovrebbe ricominciare da MIN_VALUE.Cosa c'è di sbagliato in questo generatore di sequenze di byte sicuro per i thread?

Ho scritto il codice utilizzando AtomicInteger (vedi sotto); tuttavia, sembra che il codice non si comporti correttamente se acceduto in modo concorrente e se reso artificialmente lento con Thread.sleep() (in caso contrario, funziona in modo corretto, tuttavia, sospetto che sia troppo veloce per la visualizzazione dei problemi di concorrenza).

Il codice (con un po 'di codice di debug aggiunto):

public class ByteGenerator { 

    private static final int INITIAL_VALUE = Byte.MIN_VALUE-1; 

    private AtomicInteger counter = new AtomicInteger(INITIAL_VALUE); 
    private AtomicInteger resetCounter = new AtomicInteger(0); 

    private boolean isSlow = false; 
    private long startTime; 

    public byte nextValue() { 
     int next = counter.incrementAndGet(); 
     //if (isSlow) slowDown(5); 
     if (next > Byte.MAX_VALUE) { 
      synchronized(counter) { 
       int i = counter.get(); 
       //if value is still larger than max byte value, we reset it 
       if (i > Byte.MAX_VALUE) { 
        counter.set(INITIAL_VALUE); 
        resetCounter.incrementAndGet(); 
        if (isSlow) slowDownAndLog(10, "resetting"); 
       } else { 
        if (isSlow) slowDownAndLog(1, "missed"); 
       } 
       next = counter.incrementAndGet(); 
      } 
     } 
     return (byte) next; 
    } 

    private void slowDown(long millis) { 
     try { 
      Thread.sleep(millis); 
     } catch (InterruptedException e) { 
     } 
    } 
    private void slowDownAndLog(long millis, String msg) { 
     slowDown(millis); 
     System.out.println(resetCounter + " " 
          + (System.currentTimeMillis()-startTime) + " " 
          + Thread.currentThread().getName() + ": " + msg); 
    } 

    public void setSlow(boolean isSlow) { 
     this.isSlow = isSlow; 
    } 
    public void setStartTime(long startTime) { 
     this.startTime = startTime; 
    } 

} 

E, il test:

public class ByteGeneratorTest { 

    @Test 
    public void testGenerate() throws Exception { 
     ByteGenerator g = new ByteGenerator(); 
     for (int n = 0; n < 10; n++) { 
      for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { 
       assertEquals(i, g.nextValue()); 
      } 
     } 
    } 

    @Test 
    public void testGenerateMultiThreaded() throws Exception { 
     final ByteGenerator g = new ByteGenerator(); 
     g.setSlow(true); 
     final AtomicInteger[] counters = new AtomicInteger[Byte.MAX_VALUE-Byte.MIN_VALUE+1]; 
     for (int i = 0; i < counters.length; i++) { 
      counters[i] = new AtomicInteger(0); 
     } 
     Thread[] threads = new Thread[100]; 
     final CountDownLatch latch = new CountDownLatch(threads.length); 
     for (int i = 0; i < threads.length; i++) { 
      threads[i] = new Thread(new Runnable() { 
       public void run() { 
        try { 
         for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { 
          byte value = g.nextValue(); 
          counters[value-Byte.MIN_VALUE].incrementAndGet(); 
         } 
        } finally { 
         latch.countDown(); 
        } 
       } 
      }, "generator-client-" + i); 
      threads[i].setDaemon(true); 
     } 
     g.setStartTime(System.currentTimeMillis()); 
     for (int i = 0; i < threads.length; i++) { 
      threads[i].start(); 
     } 
     latch.await(); 
     for (int i = 0; i < counters.length; i++) { 
      System.out.println("value #" + (i+Byte.MIN_VALUE) + ": " + counters[i].get()); 
     } 
     //print out the number of hits for each value 
     for (int i = 0; i < counters.length; i++) { 
      assertEquals("value #" + (i+Byte.MIN_VALUE), threads.length, counters[i].get()); 
     } 
    } 

} 

Il risultato sulla mia macchina 2-core è che il valore # -128 ottiene 146 colpi (tutti dovrebbero ottenere 100 hit uguali perché abbiamo 100 thread).

Se qualcuno ha qualche idea, cosa c'è di sbagliato in questo codice, sono tutto orecchie/occhi.

UPDATE: per coloro che sono di fretta e non vuole scorrere verso il basso, il modo corretto (e più breve e più elegante) per risolvere questo in Java sarebbe come questo:

public byte nextValue() { 
    return (byte) counter.incrementAndGet(); 
} 

Grazie, Heinz!

risposta

5

Si decide di incrementareAndGet() in base a un valore precedente di counter.get(). Il valore del contatore può raggiungere nuovamente MAX_VALUE prima di eseguire l'operazione incrementAndGet() sul contatore.

if (next > Byte.MAX_VALUE) { 
    synchronized(counter) { 
     int i = counter.get(); //here You make sure the the counter is not over the MAX_VALUE 
     if (i > Byte.MAX_VALUE) { 
      counter.set(INITIAL_VALUE); 
      resetCounter.incrementAndGet(); 
      if (isSlow) slowDownAndLog(10, "resetting"); 
     } else { 
      if (isSlow) slowDownAndLog(1, "missed"); //the counter can reach MAX_VALUE again if you wait here long enough 
     } 
     next = counter.incrementAndGet(); //here you increment on return the counter that can reach >MAX_VALUE in the meantime 
    } 
} 

per farlo funzionare bisogna assicurarsi che le non decisioni vengono prese sulla informazioni stantio. Ripristinare il contatore o restituire il vecchio valore.

public byte nextValue() { 
    int next = counter.incrementAndGet(); 

    if (next > Byte.MAX_VALUE) { 
     synchronized(counter) { 
      next = counter.incrementAndGet(); 
      //if value is still larger than max byte value, we reset it 
      if (next > Byte.MAX_VALUE) { 
       counter.set(INITIAL_VALUE + 1); 
       next = INITIAL_VALUE + 1; 
       resetCounter.incrementAndGet(); 
       if (isSlow) slowDownAndLog(10, "resetting"); 
      } else { 
       if (isSlow) slowDownAndLog(1, "missed"); 
      } 
     } 
    } 

    return (byte) next; 
} 
+0

Perché hai counter.incrementAndGet() nel blocco di sincronizzazione, che è come un incremento multiplo per tutto il thread che entra in quella sezione e successivamente viene reimpostato su "INITIAL_VALUE + 1" –

3

Il blocco sincronizzato contiene solo il corpo if. Dovrebbe avvolgere tutto il metodo inclusa la dichiarazione if. O semplicemente sincronizza il tuo metodo nextValue. A proposito, in questo caso non hai bisogno di variabili atomiche.

Spero che questo funzioni per voi. Prova ad usare le variabili atomiche solo se hai davvero bisogno del codice più performante, cioè la dichiarazione synchronized ti infastidisce. IMHO nella maggior parte dei casi non è così.

+0

La tua risposta non spiega il motivo per cui l'attuale approccio non funziona. La mia comprensione è che dovrebbe funzionare: usa la variabile atomica quando non è sincronizzata e si sincronizza quando si eseguono operazioni non atomiche - quindi perché non funziona? –

2

Se ho capito bene, si cura che i risultati di nextValue sono nel range di Byte.MIN_VALUE e Byte.MAX_VALUE e non si preoccupano il valore memorizzato nel contatore. Quindi è possibile mappare gli interi su byte in modo tale che avete richiesto il comportamento di enumerazione è esposto:

private static final int VALUE_RANGE = Byte.MAX_VALUE - Byte.MIN_VALUE + 1; 
private final AtomicInteger counter = new AtomicInteger(0); 

public byte nextValue() { 
    return (byte) (counter.incrementAndGet() % VALUE_RANGE + Byte.MIN_VALUE - 1); 
} 

attenzione, questo è il codice non testato. Ma l'idea dovrebbe funzionare.

+0

Mentre questa è un'idea molto accurata (perché non ci ho pensato :-P) e funziona davvero - non spiega ancora perché la mia soluzione originale non funziona. –

+0

Bene, nella tua soluzione originale è il seguente difetto. È possibile che avvenga un cambio di contesto tra il primo 'incrementAndGet()' e il blocco 'synchronized'. Cioè due thread ottengono un valore 'next' che è oltre Byte.MAX_VALUE. Uno di questi thread resetterà il contatore, l'altro restituirà un valore troppo grande. – jmg

+0

No, se entrambi ottengono un valore troppo grande, il primo reimposterà il contatore e il prossimo otterrà solo un nuovo valore dal contatore (già ripristinato). –

1

Ho codificato la seguente versione di nextValue utilizzando compareAndSet che è progettato per essere utilizzato in un blocco non sincronizzato. Ha superato i test di unità:

Oh, e ho introdotto nuove costanti per MIN_VALUE e MAX_VALUE ma puoi ignorarle se preferisci.

static final int LOWEST_VALUE = Byte.MIN_VALUE; 
static final int HIGHEST_VALUE = Byte.MAX_VALUE; 

private AtomicInteger counter = new AtomicInteger(LOWEST_VALUE - 1); 
private AtomicInteger resetCounter = new AtomicInteger(0); 

public byte nextValue() { 
    int oldValue; 
    int newValue; 

    do { 
     oldValue = counter.get(); 
     if (oldValue >= HIGHEST_VALUE) { 
      newValue = LOWEST_VALUE; 
      resetCounter.incrementAndGet(); 
      if (isSlow) slowDownAndLog(10, "resetting"); 
     } else { 
      newValue = oldValue + 1;  
      if (isSlow) slowDownAndLog(1, "missed"); 
     } 
    } while (!counter.compareAndSet(oldValue, newValue)); 
    return (byte) newValue; 
} 

compareAndSet() funziona in congiunzione con get() per gestire la concorrenza.

All'inizio della sezione critica, si esegue un get() per recuperare il vecchio valore. Quindi si esegue una funzione dipendente solo dal vecchio valore per calcolare un nuovo valore. Quindi si utilizza compareAndSet() per impostare il nuovo valore.Se AtomicInteger non è più uguale al vecchio valore nel momento in cui viene eseguito compareAndSet() (a causa dell'attività concomitante), non riesce e si deve ricominciare.

Se si dispone di una quantità eccessiva di concorrenza e il tempo di calcolo è lungo, è possibile che lo compareAndSet() non riesca a fallire molte volte prima di riuscire e potrebbe valere la pena raccogliere statistiche su di esso per quanto riguarda l'utente.

Non sto suggerendo che questo sia un approccio migliore o peggiore di un semplice blocco sincronizzato come altri hanno suggerito, ma personalmente avrei probabilmente usato un blocco sincronizzato per semplicità.

EDIT: Risponderò alla tua domanda "Perché il mio lavoro non funziona?"

il codice ha:

int next = counter.incrementAndGet(); 
    if (next > Byte.MAX_VALUE) { 

Poiché queste due linee non sono protetti da un blocco sincronizzato, più thread essi può eseguire contemporaneamente e tutti i valori di next>Byte.MAX_VALUE ottenere. Tutti quindi verranno inseriti nel blocco sincronizzato e verranno impostati counter su INITIAL_VALUE (uno dopo l'altro mentre si attendono reciprocamente).

Nel corso degli anni, c'è stata un'enorme quantità scritta sulle insidie ​​del tentativo di ottenere un miglioramento delle prestazioni non sincronizzando quando non sembra necessario. Ad esempio, vedere Double Checked Locking

+0

Sì, è sbagliato, ma penso anche che la tua spiegazione non sia corretta. Il secondo thread non resetterà nuovamente il contatore perché c'è un 'int i = counter.get();' nel blocco sincronizzato. –

+0

Anche se sembra un doppio blocco controllato, non dovrebbe risentire degli stessi problemi, poiché AtomicInteger dovrebbe garantire che tutti i thread vedano lo stesso valore contatore (nessuna memorizzazione nella cache). O c'è qualcosa di più in corso? –

8

Inizialmente, Java memorizzava tutti i campi come valori di 4 o 8 byte, anche brevi e byte. Le operazioni sui campi farebbero semplicemente il masking dei bit per ridurre i byte. Così potremmo facilmente fare questo:

public byte nextValue() { 
    return (byte) counter.incrementAndGet(); 
} 

divertente puzzle po ', grazie Neeme :-)