2013-06-18 6 views
12

È possibile in qualche modo utilizzare le raccolte parallele di Scala per parallelizzare uno Iteratorsenza valutandolo completamente prima?iteratore parallelo in Scala

Qui sto parlando di parallelizzare le trasformazioni funzionali su un Iterator, ovvero map e flatMap. Penso che ciò richieda in anticipo alcuni elementi dello Iterator, e quindi il calcolo di più, una volta che alcuni vengono consumati tramite next.

Tutto quello che riuscivo a trovare richiederebbe che l'iteratore fosse convertito in un Iterable o un Stream nella migliore delle ipotesi. Il Stream viene quindi completamente valutato quando si chiama .par su di esso.

Accolgo anche proposte di implementazione se questo non è facilmente disponibile. Le implementazioni devono supportare il parallelo map e flatMap.

+0

La risposta è probabilmente no ma puoi dire qualcosa in più su ciò che vuoi da questo? In particolare, quando dovrebbe iniziare il calcolo, dopo aver creato l'iteratore o dopo aver chiamato qualcosa che impone la valutazione? –

+0

@RexKerr Sembra una scelta di design; ma facendolo iniziare alla prima richiesta rende la prima richiesta in qualche modo speciale. Attualmente sto cercando di implementare qualcosa di simile e ho scelto di iniziare subito a correre e memorizzare i risultati successivi 'n'. Una volta che si è consumato, computo una sostituzione. – ziggystar

risposta

3

La cosa migliore con la libreria standard non è probabilmente utilizza collezioni parallele ma concurrent.Future.traverse:

import concurrent._ 
import ExecutionContext.Implicits.global 
Future.traverse(Iterator(1,2,3))(i => Future{ i*i }) 

anche se credo che questo verrà eseguito il tutto a partire non appena possibile.

1

Dalla ML, Movimento degli elementi iterazione in parallelo:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

ho spostato fuori Future.traverse per una ragione simile. Per il mio caso d'uso, mantenendo N lavori in esecuzione, mi sono ritrovato con il codice per limitare l'alimentazione del contesto di esecuzione dalla coda dei lavori.

Il mio primo tentativo ha riguardato il blocco del thread di feeder, ma questo ha rischiato anche il blocco delle attività che desideravano generare attività nel contesto di esecuzione. Che cosa sai, bloccare è malvagio.

+0

Puoi commentare perché usi '(NUM_CPUs + 1)^2' come dimensione per la coda di blocco? – ziggystar

+0

Inoltre ho scoperto che 1. Non sono adatto alla programmazione concorrente 2. 'flatMap' è più difficile. – ziggystar

+0

@ziggystar Con "tu" intendi "Juha" sulla ML.Non penso che sia un numero magico: abbastanza grande in modo che il consumatore non superi l'iteratore originale (che potrebbe fare i/o, forse) oltre alla funzione di mappatura (legata alla CPU, dice, ma lunga o breve in esecuzione?). Vedo che il futuro che alimenta la coda bloccherà senza chiamare 'blocking'; forse il +1 è rimasto dal "parallelismo desiderato". La mia soluzione ha avuto la fine del controllo della pipeline per ulteriori lavori, ovvero l'ultima cosa che farebbe un lavoro è controllare se sono in corso lavori sufficienti e, in caso contrario, alimentare la bestia. Sono d'accordo è difficile, la semplicità è la chiave. –

0

È un po 'difficile da seguire esattamente quello che stai dopo, ma forse si tratta di qualcosa di simile:

val f = (x: Int) => x + 1 
val s = (0 to 9).toStream map f splitAt(6) match { 
    case (left, right) => left.par; right 
} 

Ciò eveluate f sui primi 6 elementi in parallelo e poi tornare un ruscello sul resto .

+0

Questo non sembra funzionare in parallelo - non hai bisogno di spostare la 'mappa f' dopo il' par'? – DNA

6

Mi rendo conto che questa è una vecchia domanda, ma l'implementazione ParIterator nella libreria iterata fa ciò che stavi cercando?

scala> import com.timgroup.iterata.ParIterator.Implicits._ 
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId)) 
scala> it.map(_._2).toSet.size 
res2: Int = 8 // addition was distributed over 8 threads 
+1

Indica il problema. Potrebbe essere un po 'più efficiente, tuttavia, dal momento che si ottiene un grande blocco, se si dispone di una grande variazione nei tempi di esecuzione delle operazioni all'interno di un blocco. – ziggystar

+0

Come potrebbe essere reso più efficiente @ziggystar? –

+0

'ParIterator' divide" Iterator "in blocchi. Quindi se hai pezzi piccoli (ad esempio la dimensione 2) e un elemento richiede 1 e l'altro ne richiede 10, allora hai una cattiva parallelizzazione. Una diversa implementazione potrebbe nutrire i lavoratori nuovi elementi dall'iteratore una volta che un lavoratore diventa libero. – ziggystar