Quale oggetto di sincronizzazione Java devo utilizzare per garantire che un numero arbitrario di attività sia completato? I vincoli sono:Oggetto di sincronizzazione per garantire che tutte le attività siano state completate
- Ogni attività richiede una quantità di tempo non trascurabile da completare ed è opportuno eseguire attività in parallelo.
- Ci sono troppe attività da adattare alla memoria (ad esempio, non è possibile inserire un
Future
per ogni attività in unCollection
e quindi chiamareget
su tutti i futures). - Non so quante attività ci saranno (ad esempio non posso usare un
CountDownLatch
). - Il
ExecutorService
può essere condivisa in modo non posso usareawaitTermination(long, TimeUnit)
Ad esempio, con Grand Central Dispatch, potrei fare qualcosa di simile:
let workQueue = dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0)
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create("countUpdateQueue", DISPATCH_QUEUE_SERIAL)
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter(latch)
dispatch_async(workQueue)
{
self.processItem(item) // method takes a non-trivial amount of time to run
dispatch_async(countUpdateQueue)
{
itemsProcessed++
}
dispatch_group_leave(latch)
}
}
dispatch_group_wait(latch, DISPATCH_TIME_FOREVER)
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate(startTime)
print("Processed \(itemsProcessed) items in \(totalTime) seconds.")
Produce in uscita che assomiglia a questo (per 128 voci): Processed 128 items in 1.846794962883 seconds.
ho provato qualcosa di simile con un Phaser
:
final Executor executor = new ThreadPoolExecutor(64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>(8), new CallerRunsPolicy());
final Phaser latch = new Phaser(0);
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger(0);
for(final String item : fetchItems()) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem(item); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute(task);
}
latch.awaitAdvance(0);
final long endTime = currentTimeMillis();
out.println("Processed " + itemsProcessed.get() + " items in " + (endTime - startTime)/1000.0 + " seconds.");
Le attività non vengono sempre completate prima dell'ultima istruzione di stampa e potrei ottenere un risultato simile a questo (per 128 articoli): Processed 121 items in 5.296 seconds.
L'oggetto corretto è lo ? La documentazione indica che supporta solo 65.535 parti, quindi avrei bisogno di raggruppare gli articoli da elaborare o introdurre una sorta di tiering Phaser
.
Quando si dice che ci sono troppe attività da adattare alla memoria, non è un vincolo di risorse? In questo modo puoi eseguire parallelamente i compiti che si adattano alla memoria e puoi utilizzare Future per questo. Non sono sicuro che un approccio ti aiuterà a superare il contrappeso delle risorse e finirai con una soluzione molto complicata. – Lokesh