2009-09-30 5 views
8
 
(fileNameToCharStream "bigfile" 
|>> fuse [length; 
      splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; 
      splitBy (fun x -> x = '\n') keepEmpty |>> length; 
     ]) 
    (*fuse "fuses" the three functions to run concurrently*) 
|> run 2 (*forces to run in parallel on two threads*) 
|> (fun [num_chars; num_words; num_lines] -> 
     printfn "%d %d %d" 
      num_chars num_words, num_lines)) 

Voglio fare in modo che questo codice funzioni nel modo seguente: dividere il flusso originale in due esattamente nel mezzo; quindi per ogni semestre eseguire un calcolo separato che calcola 3 elementi: la lunghezza (ovvero il numero di caratteri), il numero di parole, il numero di righe. Tuttavia, non voglio avere un problema se mi divido erroneamente su una parola. Questo deve essere curato. Il file dovrebbe essere letto solo una volta.Pipeline parallela

Come si devono programmare le funzioni specificate e l'operatore | >>? È possibile?

+0

Può darsi che gli Stati Uniti non si è ancora svegliato, ma in attesa che, si potrebbe desiderare di cercare la parola chiave 'async' per ottenere un migliore idea di ciò che è possibile. – Benjol

+0

Quali firme immaginate fuse, corri, e | >> avrebbe? Ad esempio, dove viene trasformata la tua lista di tre elementi in una tupla da 3? – Gabriel

+0

A destra, intendo: |> (divertente [num_chars; num_words; num_lines] -> –

risposta

8

Sembra che tu chieda un bel po '. Lascerò a te decidere la manipolazione delle stringhe, ma ti mostrerò come definire un operatore che esegue una serie di operazioni in parallelo.

Fase 1: Scrivere una funzione fuse

La vostra funzione fusibile appare per mappare un singolo ingresso utilizzando molteplici funzioni, che è abbastanza facile da scrivere come segue:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list 
let fuse functionList input = [ for f in functionList -> f input] 

Nota che tutti le tue funzioni di mappatura devono avere lo stesso tipo.

Fase 2: Definire all'operatore di eseguire le funzioni in parallelo

La funzione mappa parallela standard può essere scritto come segue:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array 
let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

A mia conoscenza, Async.Parallel eseguirà operazioni asincrone in parallelo, dove il numero di attività parallele eseguite in un dato momento è uguale al numero di core su una macchina (qualcuno può correggermi se ho torto). Quindi su una macchina dual core, dovremmo avere al massimo 2 thread in esecuzione sulla mia macchina quando viene chiamata questa funzione. Questa è una buona cosa, dal momento che non prevediamo alcuna accelerazione eseguendo più di un thread per core (infatti la commutazione di contesto in più potrebbe rallentare).

Possiamo definire un operatore |>> in termini di pmap e fuse:

//val (|>>) : seq<'a> -> seq<('a -> 'b)> -> 'b list array 
let (|>>) input functionList = pmap (fuse functionList) input 

Così l'operatore |>> prende un sacco di input e le mappe utilizzando un sacco di uscite diverse. Finora, se mettiamo insieme tutto questo, si ottiene la seguente (in FSI):

> let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];; 

val countOccurrences : 'a -> seq<'a> -> int 
val length : string -> int 
val testData : string [] = 
    [|"Juliet is awesome"; "Someone should give her a medal"|] 
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] 

testOutput contiene due elementi, entrambi i quali sono stati calcolati in parallelo.

Fase 3: aggregato elementi in una singola uscita

Bene, così ora disponiamo risultati parziali rappresentati da ciascun elemento nel nostro array, e vogliamo unire i nostri risultati parziali in un unico aggregato. Presumo che ogni elemento nell'array debba essere unito alla stessa funzione, poiché ogni elemento nell'input ha lo stesso tipo di dati.

Ecco davvero un brutto funzione che ho scritto per il lavoro:

> let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);; 

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list 

> reduceMany (+) testOutput;; 
val it : int list = [48; 1; 4] 

reduceMany prende sequenza di sequenze n di lunghezza, e restituisce un array n-length come uscita. Se si può pensare a un modo migliore di scrivere questa funzione, sarà mio ospite :)

per decodificare l'output sopra:

  • 48 = somma delle lunghezze dei miei due stringhe di input. Nota che la stringa originale era di 49 caratteri, ma dividendola sul "|" mangiato fino a un carattere per "|".
  • 1 = somma di tutte le istanze di "J" nel mio input
  • 4 = somma di tutte le istanze di "O".

Fase 4: Mettere tutto insieme

let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

let fuse functionList input = [ for f in functionList -> f input] 

let (|>>) input functionList = pmap (fuse functionList) input 

let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]) 

let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'] 
    |> reduceMany (+)