11

A deadlock stazionamento thread si verifica in un normale pool di thread se tutti i thread nel pool sono in attesa di attività in coda nello stesso pool da completare. ForkJoinPool evita questo problema rubando il lavoro da altri thread all'interno della chiamata join(), piuttosto che semplicemente in attesa. Per esempio:È possibile utilizzare il comportamento di sottrazione del lavoro di ForkJoinPool per evitare un deadlock di inattività del thread?

private static class ForkableTask extends RecursiveTask<Integer> { 
    private final CyclicBarrier barrier; 

    ForkableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    protected Integer compute() { 
     try { 
      barrier.await(); 
      return 1; 
     } catch (InterruptedException | BrokenBarrierException e) { 
      throw new RuntimeException(e); 
     } 
    } 
} 

@Test 
public void testForkJoinPool() throws Exception { 
    final int parallelism = 4; 
    final ForkJoinPool pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism); 
    for (int i = 0; i < parallelism; ++i) { 
     forkableTasks.add(new ForkableTask(barrier)); 
    } 

    int result = pool.invoke(new RecursiveTask<Integer>() { 
     @Override 
     protected Integer compute() { 
      for (ForkableTask task : forkableTasks) { 
       task.fork(); 
      } 

      int result = 0; 
      for (ForkableTask task : forkableTasks) { 
       result += task.join(); 
      } 
      return result; 
     } 
    }); 
    assertThat(result, equalTo(parallelism)); 
} 

Ma quando si utilizza l'interfaccia ExecutorService ad un ForkJoinPool, work-furto non sembra verificarsi. Per esempio:

private static class CallableTask implements Callable<Integer> { 
    private final CyclicBarrier barrier; 

    CallableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    public Integer call() throws Exception { 
     barrier.await(); 
     return 1; 
    } 
} 

@Test 
public void testWorkStealing() throws Exception { 
    final int parallelism = 4; 
    final ExecutorService pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier)); 
    int result = pool.submit(new Callable<Integer>() { 
     @Override 
     public Integer call() throws Exception { 
      int result = 0; 
      // Deadlock in invokeAll(), rather than stealing work 
      for (Future<Integer> future : pool.invokeAll(callableTasks)) { 
       result += future.get(); 
      } 
      return result; 
     } 
    }).get(); 
    assertThat(result, equalTo(parallelism)); 
} 

Da un rapido sguardo ForkJoinPool s' implementazione, tutti i regolari ExecutorService API sono implementate utilizzando ForkJoinTask s, quindi non sono sicuro del perché si verifica una situazione di stallo.

+2

Non penso che il furto di lavoro eviti lo stallo. Quando sei in una situazione di stallo, non puoi fare progressi. Il furto di lavoro evita solo code non bilanciate consentendo ai thread di rubare da altre code se la loro coda è vuota. – markspace

+0

@markspace Nell'implementazione di 'ForkJoinTask',' join() 'tenta di eseguire altri lavori dalla deque anziché dallo stallo, evitando così il deadlock. Poiché 'ForkJoinPool.invokeAll()' converte il 'Callable's in' ForkJoinTask's, mi aspettavo che funzionasse anche. –

risposta

20

Stai quasi rispondendo alla tua stessa domanda. La soluzione è l'affermazione che "ForkJoinPool evita questo problema rubando il lavoro da altri thread all'interno della chiamata join()". Ogni volta che i thread sono bloccati per qualche altro motivo tranne ForkJoinPool.join(), questo furto di lavoro non si verifica e i thread attendono e non fanno nulla.

La ragione di ciò è che in Java non è possibile che ForkJoinPool impedisca il blocco dei thread e invece fornisca loro qualcos'altro su cui lavorare. Il thread stesso deve evitare il blocco e invece chiedere al pool il lavoro che dovrebbe fare. E questo è implementato solo nel metodo ForkJoinTask.join(), non in nessun altro metodo di blocco. Se si utilizza uno Future all'interno di uno ForkJoinPool, verrà visualizzato anche il deadlock di inattività.

Perché il furto del lavoro è implementato solo in ForkJoinTask.join() e non in altri metodi di blocco nell'API Java? Bene, ci sono molti di questi metodi di blocco (Object.wait(), Future.get(), uno qualsiasi dei primitivi di concorrenza in java.util.concurrent, metodi I/O ecc.) E non hanno nulla a che fare con ForkJoinPool, che è solo una classe arbitraria nell'API, quindi aggiungendo casi speciali per tutti questi metodi sarebbero un cattivo design. Porterebbe anche a effetti forse molto sorprendenti e indesiderati.Immaginate ad esempio un utente che passa un'attività a un ExecutorService che aspetta su un Future, e poi scopre che l'attività si blocca molto a lungo in Future.get() solo perché il thread in esecuzione ha rubato un altro oggetto di lavoro (di lunga esecuzione) invece di attendere il Future e continua subito dopo che il risultato è disponibile. Una volta che un thread inizia a lavorare su un'altra attività, non può tornare all'attività originale fino al termine della seconda attività. Quindi è in realtà una buona cosa che altri metodi di blocco non rubino il lavoro. Per un ForkJoinTask, questo problema non esiste, poiché non è importante che l'attività primaria venga proseguita il prima possibile, è importante che tutte le attività vengano gestite nel modo più efficiente possibile.

Non è inoltre possibile implementare il proprio metodo per eseguire il furto di lavoro all'interno di ForkJoinPool, poiché tutte le parti rilevanti non sono pubbliche.

Tuttavia, esiste in realtà un secondo metodo su come evitare i deadlock di inattività. Questo è chiamato blocco gestito. Non usa il furto del lavoro (per evitare il problema menzionato sopra), ma ha anche bisogno del thread che diventerà blocco per cooperare attivamente con il pool di thread. Con il blocco gestito, il thread comunica al pool di thread che può essere bloccato prima del, chiama il metodo potenzialmente di blocco e informa il pool anche quando il metodo di blocco è terminato. Il pool di thread sa quindi che esiste il rischio di un deadlock inattivo e può generare thread aggiuntivi se tutti i thread sono attualmente in qualche operazione di blocco e ci sono ancora altre attività da eseguire. Si noti che questo è meno efficiente del furto del lavoro, a causa del sovraccarico dei thread aggiuntivi. Se si implementa un algoritmo parallelo ricorsivo con futures ordinari e blocco gestito anziché con ForkJoinTask e il furto del lavoro, il numero di thread aggiuntivi può diventare molto grande (poiché nella fase "divide" dell'algoritmo, verranno create molte attività e dato a thread che bloccano e attendono immediatamente i risultati delle sotto-attività). Tuttavia, un deadlock di inedia è ancora impedito ed evita il problema che un'attività deve attendere a lungo perché il thread ha iniziato a lavorare su un'altra attività nel frattempo.

Il ForkJoinPool di Java supporta anche il blocco gestito. Per utilizzare ciò, è necessario implementare l'interfaccia ForkJoinPool.ManagedBlocker in modo tale che il metodo di blocco potenziale che l'attività desidera eseguire venga chiamato all'interno del metodo block di questa interfaccia. Quindi l'attività non può chiamare direttamente il metodo di blocco, ma deve chiamare il metodo statico ForkJoinPool.managedBlock(ManagedBlocker). Questo metodo gestisce la comunicazione con il pool di thread prima e dopo il blocco. Funziona anche se l'attività corrente non viene eseguita all'interno di un ForkJoinPool, quindi chiama solo il metodo di blocco.

L'unico posto che ho trovato nell'API Java (per Java 7) che effettivamente utilizza il blocco gestito è la classe Phaser. (Questa classe è una barriera di sincronizzazione come mutex e latch, ma più flessibile e potente.) Quindi la sincronizzazione con un Phaser all'interno di un'attività ForkJoinPool deve utilizzare il blocco gestito e può evitare deadlock di inedia (ma ForkJoinTask.join() è ancora preferibile perché utilizza il furto del lavoro invece di blocco gestito). Funziona indipendentemente dal fatto che si utilizzi lo ForkJoinPool direttamente o tramite l'interfaccia ExecutorService. Tuttavia, non funzionerà se si utilizzano altri ExecutorService come quelli creati dalla classe Executors, perché questi non supportano il blocco gestito.

In Scala, l'uso del blocco gestito è più diffuso (description, API).

+2

Grazie per questa risposta, molto completa. Un pochino però, l'implementazione di 'ForkJoinTask' fa lo stesso rubare in' get() 'come in' join() '. Il deadlock nella mia domanda deriva principalmente dal tentativo di sincronizzare senza 'ForkJoinPool.managedBlock()' (in realtà, entrambi gli esempi di deadlock su Java 7). Usando 'Phaser's, invece, entrambi possono essere fatti funzionare. –

+0

Forse puoi chiarire in che modo [lo stato di blocco del thread di Java] (http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.State.html) corrisponde a un thread di blocco nel moderno Senso della Scala. Ogni operazione di blocco attende su un blocco del monitor Java come descritto su quel collegamento? o il pool di thread tiene traccia dello stato di blocco in qualche altro modo? – matanster

+0

@matt Non sono sicuro di cosa intendi per "blocco del filo nel moderno senso di Scala". La connessione tra 'State.BLOCKED' e il blocco gestito è solo che un thread che sa che potrebbe essere in stato BLOCCATO presto dovrebbe dire al ForkJoinPool di questo in anticipo chiamando' managedBlock'. ForkJoinPool non tenta di rilevare se i thread sono attualmente bloccati. E se un thread utilizza 'managedBlock' ma non viene effettivamente BLOCCATO, ForkJoinPool incrementerà comunque il numero di thread. –

0

Vedo quello che stai facendo ma non so perché. L'idea di una barriera è che fili così indipendenti possono attendere l'un l'altro per raggiungere un punto comune. Non hai thread indipendenti. pool di thread, F/J, sono per Data Parallelism

State facendo qualcosa di più in sintonia con Task Parallelism

Il motivo F/J continua è che il quadro crea "thread continuazione" di continuare il recupero lavoro dalle deque quando tutti i thread di lavoro sono in attesa.

+0

Le barriere sono lì solo per assicurarsi che ogni attività sia pianificata su un thread separato. E non penso che i "thread di continuazione" siano la risposta, se stampi 'Thread.currentThread(). GetId()', vedrai che uno dei 'ForkableTasks' viene eseguito nello stesso thread di quello che invoca il resto e solo 4 thread sono usati in totale. –

+0

Non è possibile garantire quale thread elabori il compito con il furto del lavoro. Tutte le attività eseguono il dump nella stessa coda di invio. A seconda della release che si utilizza (Java7/8) i thread di lavoro che il blocco viene sostituito con i thread "continuation" o "compensation". Quello che stai facendo non è il punto di forza di F/J (Data Parallelism.) – edharned

0

Quando si utilizza un pool di thread con un numero limitato di thread e attività che possono essere bloccati (ad esempio da future.get()), c'è sempre possibilità di inattività thread. Utilizzare un pool di thread illimitato (e essere pronti per OutOfMemoryError) oppure utilizzare attività non bloccanti suddividendo le attività di blocco sulle parti di sblocco che vengono attivate quando viene soddisfatta la condizione richiesta. Class Future non può farlo (attiva un'attività), ma è possibile completare CompletableFuture da Java8. Guarda anche numerose librerie di attori e flussi di dati, ad es. miniera df4j2

+0

Questo è sicuramente vero per i pool di thread tradizionali. Ma 'ForkJoinPool' implementa le API' ExecutorService' sopra a 'ForkJoinTask's, quindi mi aspettavo che il comportamento corrispondesse. –