2015-12-14 27 views
10

Quale oggetto di sincronizzazione Java devo utilizzare per garantire che un numero arbitrario di attività sia completato? I vincoli sono:Oggetto di sincronizzazione per garantire che tutte le attività siano state completate

  1. Ogni attività richiede una quantità di tempo non trascurabile da completare ed è opportuno eseguire attività in parallelo.
  2. Ci sono troppe attività da adattare alla memoria (ad esempio, non è possibile inserire un Future per ogni attività in un Collection e quindi chiamare get su tutti i futures).
  3. Non so quante attività ci saranno (ad esempio non posso usare un CountDownLatch).
  4. Il ExecutorService può essere condivisa in modo non posso usare awaitTermination(long, TimeUnit)

Ad esempio, con Grand Central Dispatch, potrei fare qualcosa di simile:

let workQueue = dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0) 
let latch = dispatch_group_create() 
let startTime = NSDate() 
var itemsProcessed = 0 
let countUpdateQueue = dispatch_queue_create("countUpdateQueue", DISPATCH_QUEUE_SERIAL) 
for item in fetchItems() // generator returns too many items to store in memory 
{ 
    dispatch_group_enter(latch) 
    dispatch_async(workQueue) 
    { 
     self.processItem(item) // method takes a non-trivial amount of time to run 
     dispatch_async(countUpdateQueue) 
     { 
      itemsProcessed++ 
     } 
     dispatch_group_leave(latch) 
    } 
} 
dispatch_group_wait(latch, DISPATCH_TIME_FOREVER) 
let endTime = NSDate() 
let totalTime = endTime.timeIntervalSinceDate(startTime) 
print("Processed \(itemsProcessed) items in \(totalTime) seconds.") 

Produce in uscita che assomiglia a questo (per 128 voci): Processed 128 items in 1.846794962883 seconds.

ho provato qualcosa di simile con un Phaser:

final Executor executor = new ThreadPoolExecutor(64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>(8), new CallerRunsPolicy()); 
final Phaser latch = new Phaser(0); 
final long startTime = currentTimeMillis(); 
final AtomicInteger itemsProcessed = new AtomicInteger(0); 
for(final String item : fetchItems()) // iterator returns too many items to store in memory 
{ 
    latch.register(); 
    final Runnable task = new Runnable() { 
     public void run() { 
      processItem(item); // method takes a non-trivial amount of time to run 
      itemsProcessed.incrementAndGet(); 
      latch.arrive(); 
     } 
    }; 
    executor.execute(task); 
} 
latch.awaitAdvance(0); 
final long endTime = currentTimeMillis(); 
out.println("Processed " + itemsProcessed.get() + " items in " + (endTime - startTime)/1000.0 + " seconds."); 

Le attività non vengono sempre completate prima dell'ultima istruzione di stampa e potrei ottenere un risultato simile a questo (per 128 articoli): Processed 121 items in 5.296 seconds. L'oggetto corretto è lo ? La documentazione indica che supporta solo 65.535 parti, quindi avrei bisogno di raggruppare gli articoli da elaborare o introdurre una sorta di tiering Phaser.

+1

Quando si dice che ci sono troppe attività da adattare alla memoria, non è un vincolo di risorse? In questo modo puoi eseguire parallelamente i compiti che si adattano alla memoria e puoi utilizzare Future per questo. Non sono sicuro che un approccio ti aiuterà a superare il contrappeso delle risorse e finirai con una soluzione molto complicata. – Lokesh

risposta

1

"per garantire un numero arbitrariamente elevato di attività completate" - il modo più semplice è quello di mantenere un contatore delle attività completate, con l'operazione di blocco per attendere il raggiungimento di un determinato numero di attività. Non esiste una classe di pronto, ma è facile fare uno:

class EventCounter { 
    long counter=0; 

    synchronized void up() { 
    counter++; 
    notifyAll(); 
    } 
    synchronized void ensure (long count) { 
    while (counter<count) wait(); 
    } 
} 

"Ci sono troppi compiti per adattarsi nella memoria" - in modo che il processo di presentazione delle nuove attività devono essere sospesi quando il numero di task in esecuzione è troppo altoIl modo più semplice è quello di considerare il numero di attività in esecuzione come una risorsa e contare con un semaforo:

Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks); 
EventCounter eventCounter =new EventCounter(); 

for(final String item : fetchItems()) { 
    final Runnable task = new Runnable() { 
     public void run() { 
      processItem(item); 
      runningTasksSema.release(); 
      eventCounter.up(); 
     } 
    }; 
    runningTasksSema.aquire(); 
    executor.execute(task); 
} 

Quando un thread vuole assicurarsi un dato numero di compiti sono stati completati, invoca:

eventCounter.ensure(givenNumberOfFinishedTasks); 

Le versioni asincrone (non bloccanti) delle operazioni runningTasksSema.aquire() e eventCounter.ensure() possono essere progettate, ma sarebbero più complesse.

1

Il problema con l'utilizzo di Phaser in questo esempio è che lo CallerRunsPolicy consente l'esecuzione di un'attività sul thread di avvio. Pertanto, mentre il ciclo è ancora in corso, il numero di parti arrivate può essere uguale al numero di parti registrate, facendo aumentare la fase. La soluzione è di inizializzare lo Phaser con 1 partito, quindi, quando il ciclo è finito, arriva e aspetta che arrivino le altre parti. Ciò garantisce che la fase non passi a 1 finché tutte le attività non sono state completate.

final Executor executor = new ThreadPoolExecutor(64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>(8), new CallerRunsPolicy()); 
final Phaser latch = new Phaser(1); 
final long startTime = currentTimeMillis(); 
final AtomicInteger itemsProcessed = new AtomicInteger(0); 
for(final String item : fetchItems()) // iterator returns too many items to store in memory 
{ 
    latch.register(); 
    final Runnable task = new Runnable() { 
     public void run() { 
      processItem(item); // method takes a non-trivial amount of time to run 
      itemsProcessed.incrementAndGet(); 
      final int arrivalPhase = latch.arrive(); 
     } 
    }; 
    executor.execute(task); 
} 
latch.arriveAndAwaitAdvance(); 
final long endTime = currentTimeMillis(); 
out.println("Processed " + itemsProcessed.get() + " items in " + (endTime - startTime)/1000.0 + " seconds."); 
0

Nel caso in cui, se siete su java8 è possibile utilizzare CompletableFuture

java.util.concurrent.CompletableFuture.allOf(CompletableFuture<?>... cfs) 

che aspettare i risultati di tutte le future in matrice passata.