6

Sto provando a calcolare alcuni grandi numeri. Per accelerare il calcolo, mi piacerebbe utilizzare il multithreading. Ogni thread dovrebbe calcolare un numero e alla fine viene calcolata una somma.Il modo migliore per sommare simultaneamente

una volta ho visto qualcosa che ha funzionato con un SumThread e un Collector che sembrava come segue:

public BigInteger compute(int p) { 
    Collector c = new Collector(p); 

    for(T element : Collection<T> bigCollection) { 
     new SumThread(c) { 

      @Override 
      protected void doTheJob() { 
       long big = someVeryComplexCalculation(element, ...); //n! 
       receive(BigInteger.valueOf(big)); 
      } 

     } 
    } 

    if(collector.isReady()) 
     return collector.getResult(); 

    return null; 
} 

public class Collector { 

    private int numberOfProcesses; 
    private int numberOfAllowedProcesses; 
    private BigInteger result; 

    public Collector(int n) { 
     numberOfAllowedProcesses = n; 
     numberOfProcesses = 0; 
     result = BigInteger.ZERO; 
    } 

    synchronized public void enter() throws InterruptedException { 
     if (numberOfProcesses == numberOfAllowedProcesses) wait(); 
     numberOfProcesses++; 
    } 

    synchronized public void leave() { 
     numberOfProcesses--; 
     notify(); 
    } 

    synchronized public void register(BigInteger v) { 
     result = result.add(v); 
    } 

    synchronized public boolean isReady() throws InterruptedException { 
     while (numberOfProcesses > 0) wait(); 
     return true; 
    } 

    ... 
} 

public abstract class SumThread extends Thread { 

    private Collector collector; 

    public SumThread(Collector c) throws InterruptedException { 
     collector = c; 
     collector.enter(); 
    } 

    abstract protected void doTheJob(); //complex calculations can be done in here 

    public void receive(BigInteger t) { 
     collector.register(t); 
    } 

    public void run() { 
     doTheJob(); 
     collector.leave(); 
    } 
} 

ho pensato che avrei potuto facilmente superare questo utilizzando invece di fare nuove Thread s costantemente come:

public BigInteger compute(int p) { 
    ExecutorService pool = Executors.newFixedThreadPool(p); 
    Future<BigInteger>[] futures = new Future<BigInteger>[bigCollection.size()]; 
    int i = 0; 

    for(T element : Collection<T> bigCollection) { 
     futures[i++] = p.submit(new Callable<BigInteger>() { 

      @Override 
      public BigInteger call() { 
       long big = someVeryComplexCalculation(element, ...); //n! 
       return BigInteger.valueOf(big); 
      } 

     } 
    } 

    // or with ExecutorCompletionService, but the loop remains I guess 
    BigInteger res = BigInteger.ZERO 
    for(Future<BigInteger> f : futures) 
     res = res.add(f.get()); 

    return res; 
} 

Questo codice non è riuscito a sovraperformare la soluzione SumThread - Collector. Ho anche visto cose su LongAdder per esempio, ma avrei bisogno di un sommatore per BigInteger s ...

La mia domanda è quindi: qual è il modo migliore per calcolare una somma contemporaneamente? È uno dei precedenti o c'è un modo completamente diverso (ma migliore)?

risposta

7

Come si menziona LongAdder che è stato aggiunto in Java-8 e si utilizzano variabili efficacemente definitive, presumo che si stia utilizzando Java-8. In questa versione il modo migliore per risolvere il vostro compito è quello di utilizzare il Stream API:

BigInteger result = bigCollection.parallelStream() 
        .map(e -> BigInteger.valueOf(someVeryComplexCalculation(e, ...))) 
        .reduce(BigInteger.ZERO, BigInteger::add); 

Il tuo problema è la classica carta-ridurre compito dove si dovrebbe trasformare ogni elemento di una certa collezione, quindi combinare i risultati delle singole trasformazioni nel risultato finale. L'API Stream è in grado di parallelizzare tali compiti in modo abbastanza efficace senza alcun lavoro manuale. In Oracle JDK le attività vengono eseguite nello common ForkJoinPool pool, che per impostazione predefinita crea più thread di tutti i core della CPU che si hanno.

+0

Questo è davvero molto efficace! Solo per curiosità: ci sono possibilità di applicare questo alle istanze di 'Iterable ' per esempio? C'è un modo per comprimere i flussi? (ad esempio: _complexCalculation() = part1() * part2() quindi mi piacerebbe mappare part1() su bigCollection e part2() e parallelamente unire i risultati di entrambi i flussi risultanti con moltiplicazione_) –

+1

@MrTsjolder, per elaborare il flusso in parallelo, la sorgente del flusso (Iterable o qualsiasi altra cosa) deve fornire uno splitterator che può dividere bene la sorgente. Se dubiti della tua fonte, puoi semplicemente copiare il contenuto di 'Iterable ' in 'ArrayList '. Per quanto riguarda lo zipping, non posso rispondere senza vedere il codice sorgente esatto.Puoi fare una domanda diversa fornendo tutti i dettagli necessari (usa il tag "java-stream", lo controllo sempre). –

2

Tou hanno due soluzioni:

In un primo momento, vorrei proporre di usare fork-join quadro da JDK7 per questo compito:

Lo faresti necessario implementare un RecursiveTask

Come seconda soluzione (JDK8) si potrebbe usare lo stream pararell, proprio come ha suggerito @ tagir-valeev.

In entrambi i casi, dipende da cosa si sta utilizzando e dalla versione di Java che si sta utilizzando.