2011-11-02 3 views
7

Ho un pool di thread fisso ExecutorService di larghezza 10 e un elenco di 100 Callable, ciascuno in attesa di 20 secondi e la registrazione dei loro interrupt.Java ExecutorService invokeAll() che interrompe

Sto chiamando invokeAll in tale elenco in una discussione separata e interrompendo quasi immediatamente questa discussione. ExecutorService l'esecuzione è interrotta come previsto, ma il numero effettivo di interruzioni registrato da Callable s è molto più del previsto 10 - circa 20-40. Perché è così, se ExecutorService può eseguire non più di 10 thread contemporaneamente?

sorgente completo: (Potrebbe essere necessario eseguire più di una volta a causa della concorrenza)

@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

Utilizzando WinXP 32-bit, Oracle JRE 1.6.0_27 e JUnit4

+0

Hmm ... convertendolo in un programma con un metodo principale, ottengo sempre 10 ... (Java 7 su Windows) –

+0

Fatto lo stesso, ottenuto 37 (1.6.0_27, Windows XP). Non hai un Java 7 da testare, qualcuno può confermare? –

+0

Proverò al lavoro. Forse è un bug di Java 6 ... –

risposta

4

Non sono d'accordo con l'ipotesi che dovresti ricevere solo 10 interrupt.

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

In sostanza, la mia tesi è che non v'è alcuna garanzia che il filo Watcher sarà consentito di annullare tutte le 100 istanze RunnableFuture "Cameriere" prima che abbia a cedere l'intervallo di tempo e consentire thread di lavoro del ExecutorService di avviare più Compiti cameriere

Aggiornamento: Questa codice AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

Il blocco finally che contiene f.cancel(true) è quando l'interrupt sarebbe propagato a compito che è attualmente in esecuzione. Come potete vedere, questo è un ciclo stretto, ma non vi è alcuna garanzia che il thread che esegue il ciclo sia in grado di scorrere tutte le istanze di Future in un'unica fascia temporale.

+0

Quindi dici che l'interruzione di 'invokeAll()' non implica l'annullamento immediato di tutte le attività in coda prima dell'interruzione di quelle in esecuzione? Per me, sembra una rottura diretta del principio di minimo stupore. –

+1

corretto. I thread di lavoro che elaborano le attività sono distinti dal thread che esegue 'invokeAll()'. Chiamare l'interrupt su una discussione non implica che altri thread debbano essere interrotti, quindi non sono affatto sorpreso che potresti ricevere più di 10 interrupt di thread di lavoro in alcune occasioni.Come ho menzionato nel codice annotato che ho postato, un interrupt viene inviato al thread worker che elabora l'attività come una virtù dell'argomento booleano passato al metodo 'Future.cancel'. –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

Se si vuole raggiungere lo stesso comportamento

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

L'aggiunta di questo blocco prima interrompere il ThreadPool.

Scaricherà tutta la coda di attesa in una nuova lista.

Quindi interromperà solo i thread in esecuzione.