2015-08-31 4 views
6

Ho pensato che usare i futures mi avrebbe permesso facilmente di sparare blocchi di codice, ma a quanto pare posso avere solo 4 futures alla volta.Posso fare solo 4 futures simultanei come massimo in Scala

Da dove viene questa restrizione o sto abusando di Futures usandolo in questo modo?

import scala.concurrent._ 
import ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 
import java.util.Calendar 

object Main extends App{ 

    val rand = scala.util.Random 

    for (x <- 1 to 100) { 
    val f = Future { 
     //val sleepTime = rand.nextInt(1000) 
     val sleepTime = 2000 
     Thread.sleep(sleepTime) 

     val today = Calendar.getInstance().getTime() 
     println("Future: " + x + " - sleep was: " + sleepTime + " - " + today) 
     1; 
    } 
    } 

    Thread.sleep(10000) 
} 

uscita:

Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015 
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015 
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015 
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015 
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015 
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015 
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015 
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015 
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015 
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015 
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015 
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015 
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015 
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015 
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015 
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015 

Mi aspettavo che tutti mostrano allo stesso tempo.

Per dare un contesto, pensavo di poter usare questo costrutto ed estenderlo avendo un ciclo principale, nel quale esso dorme ogni ciclo secondo un valore tratto da una distribuzione esponenziale, per emulare l'arrivo/esecuzione dell'utente di una query . Dopo ogni sospensione mi piacerebbe eseguire la query inviandola al driver del programma (in questo caso Spark, e il driver consente l'uso di più thread). C'è un modo più ovvio di usare Futures?

risposta

10

Quando si utilizza import ExecutionContext.Implicits.global, Crea pool di thread che ha la stessa dimensione del numero di CPU.

Dalla sorgente del ExecutionContext.scala

L'implementazione predefinita ExecutionContext è sostenuta da un pool di thread di lavoro furto. Per impostazione predefinita, il pool di thread utilizza un numero di destinazione di thread di lavoro uguale al numero di [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- processori disponibili]].

E c'è una buona domanda StackOverflow: What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?

Dal momento che la dimensione predefinita del pool di thread dipende dal numero di CPU, se si desidera utilizzare più grande pool di thread, devi scrivere qualcosa di simile

import scala.concurrent.ExecutionContext 
import java.util.concurrent.Executors 
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8)) 

prima di eseguire lo Future.

(. Nel codice, è necessario metterlo prima for anello)

notare che i lavori rubare piscina è stato aggiunto in Java 8, Scala ha la propria ForkJoinPool che fa il lavoro rubando: scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool

anche se si desidera un thread per Future, è possibile scrivere qualcosa di simile a

implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor) 

Pertanto, il seguente codice viene eseguito 100 thread in parallelo

import scala.concurrent._ 
import java.util.concurrent.Executors 

object Main extends App{ 
    for (x <- 1 to 100) { 
    implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor) 
    val f = Future { 
     val sleepTime = 2000 
     Thread.sleep(sleepTime) 

     val today = Calendar.getInstance().getTime() 
     println("Future: " + x + " - sleep was: " + sleepTime + " - " + today) 
     1; 
    } 
    } 

    Thread.sleep(10000) 
} 

Oltre a lavorare rubare pool di thread e esecutori singolo thread, ci sono alcuni altri esecutori: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html

leggere la documentazione per il dettaglio: http://docs.scala-lang.org/overviews/core/futures.html

+0

In quest'ultimo caso con newSingleThreadExecutor ti imbatti ancora nel fatto che un futuro può bloccare il successivo. È persino peggio del default. – hbogert

+0

@hbogert hai posizionato 'Executors.newSingleThreadExecutor' prima del' Future'? Ho aggiornato la domanda quindi per favore dai un'occhiata. – ymonad

+0

per impostazione predefinita, crea effettivamente pool fork-join, non solo pool fisso – dk14

1

Il pool di default quando si utilizza import scala.concurrent.ExecutionContext.Implicits.global ha infatti come molti thread come te hanno core sulla tua macchina.Questo è l'ideale per il codice non bloccante (non sincrono io/sleep/...) ma può essere problematico e persino causare deadlock quando lo si utilizza per il codice di blocco.

Tuttavia, questo pool può effettivamente crescere se si contrassegna il codice di blocco in un blocco scala.concurrent.blocking. Lo stesso indicatore è ad esempio in uso quando si utilizzano le funzioni e Await.ready che si bloccano mentre si attende un Futuro.

vedere la documentazione API per blocking

Quindi, tutto quello che dovete fare è aggiornare il tuo esempio:

import scala.concurrent.blocking 
... 
val sleepTime = 2000 
blocking{ 
    Thread.sleep(sleepTime) 
} 
... 

Ora tutti i futuri termineranno dopo 2000 ms

0

si possono anche utilizzare

`implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))` 

in NUMBEROFTHREADSYOUWANT puoi dare il numero di legge vuole iniziare. Questo verrà utilizzato prima del futuro.

+0

Questo non risolve il problema più profondo. Il problema è che i thread non ad alta intensità di CPU non dovrebbero occupare un thread OS completo. Nella tua soluzione, avendo 100 task non intensivi in ​​CPU, sarebbe necessario un FixedThreadPool di 100. – hbogert