2009-11-27 5 views
5

Ho esteso FutureTask da java.util.concurrent per fornire richiamate per tenere traccia dell'esecuzione di attività inoltrate a ExecutorService.Estensione di FutureTask, come gestire cancel

public class StatusTask<V> extends FutureTask<V> { 

    private final ITaskStatusHandler<V> statusHandler; 

    public StatusTask(Callable<V> callable, ITaskStatusHandler<V> statusHandler){ 
     super(callable); 
     if (statusHandler == null) 
      throw new NullPointerException("statusHandler cannot be null"); 
     this.statusHandler = statusHandler; 
     statusHandler.TaskCreated(this); 
    } 

    @Override 
    public void run() { 
     statusHandler.TaskRunning(this); 
     super.run(); 
    } 

    @Override 
    protected void done() { 
     super.done(); 
     statusHandler.TaskCompleted(this); 
    } 

} 

Ora, quello che vedo è se il compito è presentata, ma finisce in coda e ho cancel(true); il compito - il metodo run() viene ancora chiamata - e la FutureTask.run() (probabile) controlla che il compito viene annullato e doesn chiamiamo il callable avvolto.

Devo fare ad es.

@Override 
public void run() { 
    if(!isCancelled()) { 
    statusHandler.TaskRunning(this); 
    super.run(); 
    } 
} 

Oppure devo chiamare ancora super.run()? Entrambi questi approcci sembrano suscettibili alle condizioni di gara tra il controllo dell'annullamento e il fare qualcosa al riguardo. Ogni pensiero è apprezzato.

risposta

3

Hai ragione che c'è una gara lì. FutureTask#done() verrà chiamato al massimo al massimo una volta, quindi se l'attività è già stata annullata prima che venga mai eseguita tramite RunnableFuture#run(), avrai perso la chiamata a FutureTask#done().

Avete considerato un approccio più semplice che emette sempre un insieme simmetrico di chiamate abbinate a ITaskStatusHandler#taskRunning() e ITaskStatusHandler#taskCompleted(), in questo modo?

@Override 
public void run() { 
    statusHandler.TaskRunning(this); 
    try { 
    super.run(); 
    finally { 
    statusHandler.TaskCompleted(this); 
    } 
} 

volta RunnableFuture#run() si chiama, è vero che il vostro compito nella corsa, o almeno cercare di essere eseguito. Una volta terminato lo FutureTask#run(), l'attività non è più in esecuzione. Accade così che in caso di cancellazione, la transizione sia (quasi) immediata.

Cercando di evitare di chiamare ITaskStatusHandler#taskRunning() se l'interno Callable o Runnable non è mai invocato da FutureTask#run() si richiede all'utente di stabilire una struttura condivisa tra la Callable o Runnable e il tipo -derived FutureTask stesso, in modo che quando la funzione interna viene chiamata prima imposta un flag che il tipo esterno FutureTask può osservare come un latch, indicando che sì, la funzione ha iniziato a funzionare prima che sia stata annullata. Tuttavia, a quel punto, dovevi impegnarti a chiamare ITaskStatusHandler#taskRunning(), quindi la distinzione non è così utile.

ho lottato con un problema di progettazione simile ultimamente, e finii di stabilirsi sul simmetrica prima e dopo le operazioni nella mia ignorato FutureTask#run() metodo.

+0

Presumo che la chiamata nella clausola finally debba essere statusHandler.TaskCompleted (this); C'è qualche caso in cui il metodo run non sarebbe chiamato ma il metodo done() sarebbe? – nos

+0

Grazie per aver colto l'errore. Ho risolto la chiamata nel blocco finally. Sì, è possibile che done() possa essere chiamato senza essere eseguito(). Vedi FutureTask # SynC# innerCancel (booleano). Lì, a condizione che l'attività non abbia finito di funzionare, il che include che non ha mai iniziato a funzionare, puoi vedere che done() sarà chiamato. Nota che done() sarà chiamato zero o una volta per qualsiasi istanza FutureTask: zero se né run() né cancel() non vengono mai chiamati, una volta altrimenti. – seh

+0

Sembra però che se lo si invia a un executor done() verrà chiamato se l'attività viene annullata prima dell'esecuzione, anche se l'executor non lo sa. L'executor conosce solo i runnables/callables. Quindi, run() verrà chiamato quando alla fine diventa eseguibile in quel caso FutureTask # run() essenzialmente non fa nulla però. – nos

2

Il tuo problema è che i tuoi compiti futuri sono ancora eseguiti dopo aver chiamato annullare su di loro, giusto?

Dopo che un'attività è stata inviata al servizio executor, deve essere gestita dall'esecutore. (Puoi comunque annullare una singola attività, se preferisci.) Dovresti annullare l'esecuzione con lo executor shutdownNow method. (Questo chiamerà il metodo cancel di tutte le attività inviate.) Un arresto eseguirà comunque tutte le attività inviate.

L'esecutore, in caso contrario, non "sa" che un'attività è stata annullata. Chiamerà il metodo indipendentemente dallo stato interno dell'attività futura.

L'approccio più semplice sarebbe utilizzare il framework Executor così com'è e scrivere un decoratore Callable.

class CallableDecorator{ 

    CallableDecorator(Decorated decorated){ 
    ... 
    } 

    setTask(FutureTask task){ 
    statusHandler.taskCreated(task); 
    } 

    void call(){ 
    try{ 
     statusHandler.taskRunning(task); 
     decorated.call(); 
    }finally{ 
     statusHandler.taskCompleted(task); 
    } 
    } 
} 

L'unico problema è che l'attività non può essere nel costruttore del decoratore. (È un parametro del futuro costruttore di task.) Per interrompere questo ciclo devi usare un setter o qualche proxy per aggirare l'iniezione del costruttore. Forse questo non è necessario per il callback e puoi dire: statusHandler.callableStarted(decorated).

A seconda delle esigenze, potrebbe essere necessario segnalare eccezioni e interruzioni.

implementazione di base:

class CallableDecorator<T> implements Callable<T> { 

    private final Callable<T> decorated; 
    CallableDecorator(Callable<T> decorated){ 
     this.decorated = decorated; 
    } 

    @Override public T call() throws Exception { 
     out.println("before " + currentThread()); 
     try { 
      return decorated.call(); 
     }catch(InterruptedException e){ 
      out.println("interupted " + currentThread()); 
      throw e; 
     } 
     finally { 
      out.println("after " + currentThread()); 
     } 
    } 
} 

ExecutorService executor = newFixedThreadPool(1); 
Future<Long> normal = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() { 
      @Override 
      public Long call() throws Exception { 
       return System.currentTimeMillis(); 
      } 
     })); 
out.println(normal.get()); 

Future<Long> blocking = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() { 
      @Override 
      public Long call() throws Exception { 
       sleep(MINUTES.toMillis(2)); // blocking call 
       return null; 
      } 
     })); 

sleep(SECONDS.toMillis(1)); 
blocking.cancel(true); // or executor.shutdownNow(); 

uscita:

before Thread[pool-1-thread-1,5,main] 
after Thread[pool-1-thread-1,5,main] 
1259347519104 
before Thread[pool-1-thread-1,5,main] 
interupted Thread[pool-1-thread-1,5,main] 
after Thread[pool-1-thread-1,5,main] 
+2

seriamente, vuoi controllare un singolo chiamabile uccidendo l'intero esecutore? funzionerebbe benissimo se tu avessi un esecutore per ogni callable. in tal caso, perché persino usare gli esecutori? in genere si desidera annullare selettivamente i lavori, quindi il metodo cancel su Future. – james

+0

No. Il codice di esempio mostra come uccidere l'executor o una singola attività. –