2011-01-19 1 views
9

Attualmente sto cercando di migliorare le prestazioni di un programma F # per renderlo veloce quanto il suo equivalente C#. Il programma applica un array di filtri a un buffer di pixel. L'accesso alla memoria avviene sempre tramite puntatori.F # problema di prestazioni di manipolazione dell'immagine

Ecco il codice C# che viene applicato a ogni pixel di un'immagine:

unsafe private static byte getPixelValue(byte* buffer, double* filter, int filterLength, double filterSum) 
{ 
    double sum = 0.0; 
    for (int i = 0; i < filterLength; ++i) 
    { 
     sum += (*buffer) * (*filter); 
     ++buffer; 
     ++filter; 
    } 

    sum = sum/filterSum; 

    if (sum > 255) return 255; 
    if (sum < 0) return 0; 
    return (byte) sum; 
} 

Il codice F # simile a questo e prende tre volte fino a quando il programma di C#:

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
     if i > 0 then 
      let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
      accumulatePixel newAcc (NativePtr.add buffer 1) (NativePtr.add filter 1) (i-1) 
     else 
      acc 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

L'uso di variabili mutabili e un ciclo for in F # ha come risultato la ricorsione. Tutti i progetti sono configurati per l'esecuzione in modalità di rilascio con l'ottimizzazione del codice attivata.

Come è possibile migliorare le prestazioni della versione F #?

EDIT:

Il collo di bottiglia sembra essere in (NativePtr.get buffer offset). Se sostituisco questo codice con un valore fisso e sostituisco anche il codice corrispondente nella versione C# con un valore fisso, ottengo la stessa velocità per entrambi i programmi. Infatti, in C# la velocità non cambia affatto, ma in F # fa un'enorme differenza.

Questo comportamento può essere eventualmente modificato o radicato profondamente nell'architettura di F #?

EDIT 2:

ho riscritta nuovamente il codice da utilizzare per-loop. La velocità di esecuzione rimane la stessa:

let mutable acc <- 0.0 
let mutable f <- filterData 
let mutable b <- tBuffer 
for i in 1 .. filter.FilterLength do 
    acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f) 
    f <- NativePtr.add f 1 
    b <- NativePtr.add b 1 

Se confronto il codice IL di una versione che utilizza (NativePtr.read b) e un'altra versione che è lo stesso eccetto che utilizza un valore fisso 111uy invece di leggere dal puntatore, Solo le seguenti righe nel codice iL cambiamento:

111uy comprende iL-codice ldc.i4.s 0x6f (0,3 secondi)

(NativePtr.read b) ha linee di iL-code ldloc.s b e ldobj uint8 (1,4 secondi)

Per confronto: C# esegue il filtraggio in 0,4 secondi.

Il fatto che la lettura del filtro non influenzi le prestazioni durante la lettura dal buffer di immagini è in qualche modo fonte di confusione. Prima di filtrare una linea dell'immagine, copio la linea in un buffer che ha la lunghezza di una linea. Ecco perché le operazioni di lettura non sono distribuite su tutta l'immagine ma sono all'interno di questo buffer, che ha una dimensione di circa 800 byte.

+0

In base al più recente commento, mi chiedo se il fatto che il compilatore C# utilizza 'ldind.u1' invece di 'ldobj uint8' (che è l'IL semanticamente equivalente usato da F #) fa la differenza. Si può provare a eseguire ildasm sull'eseguibile F #, sostituendo 'ldobj uint8' con' ldind.u1', e eseguendo ilmisura su di esso per vedere se le prestazioni differiscono. – kvb

+0

L'ho sostituito ma non c'era differenza. Grazie comunque. –

risposta

12

Se osserviamo il codice IL effettivo del ciclo interno che attraversa entrambi i buffer in parallelo generati dal compilatore C# (parte pertinente):

L_0017: ldarg.0 
L_0018: ldc.i4.1 
L_0019: conv.i 
L_001a: add 
L_001b: starg.s buffer 
L_001d: ldarg.1 
L_001e: ldc.i4.8 
L_001f: conv.i 
L_0020: add 

e F # compilatore:

L_0017: ldc.i4.1 
L_0018: conv.i 
L_0019: sizeof uint8 
L_001f: mul 
L_0020: add 
L_0021: ldarg.2 
L_0022: ldc.i4.1 
L_0023: conv.i 
L_0024: sizeof float64 
L_002a: mul 
L_002b: add 

noteremo che mentre il codice C# utilizza solo l'operatore add mentre F # richiede sia mul sia add. Ma ovviamente su ogni passaggio abbiamo solo bisogno di incrementare i puntatori (rispettivamente per i valori "sizeof byte" e "sizeof float"), non per calcolare l'indirizzo (addrBase + (sizeof byte)) F # mul non è necessario (si moltiplica sempre per 1).

La causa di ciò è che C# definisce ++ operatore per i puntatori, mentre F # offre solo add : nativeptr<'T> -> int -> nativeptr<'T> operatore:

[<NoDynamicInvocation>] 
let inline add (x : nativeptr<'a>) (n:int) : nativeptr<'a> = to_nativeint x + nativeint n * (# "sizeof !0" type('a) : nativeint #) |> of_nativeint 

quindi non è "profondamente radicata" in F #, è solo che module NativePtr manca inc e dec funzioni.

Btw, ho il sospetto che il suddetto esempio possa essere scritto in un modo più conciso se gli argomenti fossero passati come matrici invece di puntatori grezzi.

UPDATE:

Così fa il seguente codice hanno solo l'1% della velocità fino (sembra generare molto simile a C# IL):

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
     if i > 0 then 
      let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
      accumulatePixel newAcc (NativePtr.ofNativeInt <| (NativePtr.toNativeInt buffer) + (nativeint 1)) (NativePtr.ofNativeInt <| (NativePtr.toNativeInt filter) + (nativeint 8)) (i-1) 
     else 
      acc 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

Un altro pensiero: Potrebbe anche dipendere il numero di chiamate a getPixelValue del test fa (F # divide questa funzione in due metodi mentre C# lo fa in uno).

È possibile che si inserisca qui il codice di prova?

Per quanto riguarda la matrice, mi aspetto che il codice sia almeno più conciso (e non unsafe).

UPDATE # 2:

Sembra che il collo di bottiglia reale qui è byte->float conversione.

C#:

L_0003: ldarg.1 
L_0004: ldind.u1 
L_0005: conv.r8 

F #:

L_000c: ldarg.1 
L_000d: ldobj uint8 
L_0012: conv.r.un 
L_0013: conv.r8 

Per qualche ragione F # utilizza il seguente percorso: byte->float32->float64 mentre C# solo byte->float64 fa. Non sono sicuro del perché, ma con il seguente hack la mia versione F # gira con la stessa velocità di C# sul campione di prova gradbot (BTW, grazie gradbot per il test!):

let inline preadConvert (p : nativeptr<byte>) = (# "conv.r8" (# "ldobj !0" type (byte) p : byte #) : float #) 
let inline pinc (x : nativeptr<'a>) : nativeptr<'a> = NativePtr.toNativeInt x + (# "sizeof !0" type('a) : nativeint #) |> NativePtr.ofNativeInt 

let rec accumulatePixel_ed (acc, buffer, filter, i) = 
     if i > 0 then 
      accumulatePixel_ed 
       (acc + (preadConvert buffer) * (NativePtr.read filter), 
       (pinc buffer), 
       (pinc filter), 
       (i-1)) 
     else 
      acc 

Risultati:

adrian 6374985677.162810 1408.870900 ms 
    gradbot 6374985677.162810 1218.908200 ms 
     C# 6374985677.162810 227.832800 ms 
C# Offset 6374985677.162810 224.921000 ms 
    mutable 6374985677.162810 1254.337300 ms 
    ed'ka 6374985677.162810 227.543100 ms 

ULTIMO AGGIORNAMENTO Si è scoperto che siamo in grado di raggiungere la stessa velocità anche senza alcun hack:

let rec accumulatePixel_ed_last (acc, buffer, filter, i) = 
     if i > 0 then 
      accumulatePixel_ed_last 
       (acc + (float << int16 <| NativePtr.read buffer) * (NativePtr.read filter), 
       (NativePtr.add buffer 1), 
       (NativePtr.add filter 1), 
       (i-1)) 
     else 
      acc 

Tutto quello che dobbiamo fare è quello di convertire byte in, ad esempio int16 e quindi in float. In tal modo si eviteranno istruzioni "costose" conv.r.un.

PS codice di conversione rilevante da "Prim-types.fs":

let inline float (x: ^a) = 
    (^a : (static member ToDouble : ^a -> float) (x)) 
    when ^a : float  = (# "" x : float #) 
    when ^a : float32 = (# "conv.r8" x : float #) 
    // [skipped] 
    when ^a : int16  = (# "conv.r8" x : float #) 
    // [skipped] 
    when ^a : byte  = (# "conv.r.un conv.r8" x : float #) 
    when ^a : decimal = (System.Convert.ToDouble((# "" x : decimal #))) 
+0

Grazie per questa interessante idea! Ho creato la mia funzione di aggiunta personalizzata per i puntatori che non utilizzano la moltiplicazione. L'accelerazione è solo circa l'1% del tempo totale di esecuzione. Pensi che la versione dell'array avrebbe comunque un rendimento elevato? –

+4

Bel lavoro investigativo! – kvb

+0

Wow! Sono davvero impressionato :-) –

1

Come si confronta? Ha meno chiamate a NativePtr.

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 
    let accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) length = 
     let rec accumulate acc offset = 
      if offset < length then 
       let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset)) 
       accumulate newAcc (offset + 1) 
      else 
       acc 
     accumulate acc 0 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

F # codice sorgente di NativePtr.

[<NoDynamicInvocation>] 
[<CompiledName("AddPointerInlined")>] 
let inline add (x : nativeptr<'T>) (n:int) : nativeptr<'T> = toNativeInt x + nativeint n * (# "sizeof !0" type('T) : nativeint #) |> ofNativeInt 

[<NoDynamicInvocation>] 
[<CompiledName("GetPointerInlined")>] 
let inline get (p : nativeptr<'T>) n = (# "ldobj !0" type ('T) (add p n) : 'T #) 
+0

Grazie! Questa è una buona idea, ma il miglioramento delle prestazioni è marginale. –

+0

Penso che (offset del buffer NativePtr.get) sia veloce come (NativePtr.read (buffer buffer NativePtr.add)), probabilmente fa lo stesso dietro le quinte ... –

+0

Hai ragione. Ho aggiunto il codice sorgente F # alla mia risposta. – gradbot

1

I miei risultati su un test più grande.

adrian 6374730426.098020 1561.102500 ms 
    gradbot 6374730426.098020 1842.768000 ms 
     C# 6374730426.098020 150.793500 ms 
C# Offset 6374730426.098020 150.318900 ms 
    mutable 6374730426.098020 1446.616700 ms 

codice F # prova

open Microsoft.FSharp.NativeInterop 
open System.Runtime.InteropServices 
open System.Diagnostics 

open AccumulatePixel 
#nowarn "9" 

let test size fn = 
    let bufferByte = Marshal.AllocHGlobal(size * 4) 
    let bufferFloat = Marshal.AllocHGlobal(size * 8) 

    let bi = NativePtr.ofNativeInt bufferByte 
    let bf = NativePtr.ofNativeInt bufferFloat 

    let random = System.Random() 

    for i in 1 .. size do 
     NativePtr.set bi i (byte <| random.Next() % 256) 
     NativePtr.set bf i (random.NextDouble()) 

    let duration (f, name) = 
     let stopWatch = Stopwatch.StartNew() 
     let time = f(0.0, bi, bf, size) 
     stopWatch.Stop() 
     printfn "%10s %f %f ms" name time stopWatch.Elapsed.TotalMilliseconds 

    List.iter duration fn 

    Marshal.FreeHGlobal bufferFloat 
    Marshal.FreeHGlobal bufferByte 

let rec accumulatePixel_adrian (acc, buffer, filter, i) = 
    if i > 0 then 
     let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
     accumulatePixel_adrian (newAcc, (NativePtr.add buffer 1), (NativePtr.add filter 1), (i - 1)) 
    else 
     acc 

let accumulatePixel_gradbot (acc, buffer, filter, length) = 
    let rec accumulate acc offset = 
     if offset < length then 
      let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset)) 
      accumulate newAcc (offset + 1) 
     else 
      acc 
    accumulate acc 0 

let accumulatePixel_mutable (acc, buffer, filter, length) = 
    let mutable acc = 0.0 
    let mutable f = filter 
    let mutable b = buffer 
    for i in 1 .. length do 
     acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f) 
     f <- NativePtr.add f 1 
     b <- NativePtr.add b 1 
    acc 

[ 
    accumulatePixel_adrian, "adrian"; 
    accumulatePixel_gradbot, "gradbot"; 
    AccumulatePixel.getPixelValue, "C#"; 
    AccumulatePixel.getPixelValueOffset, "C# Offset"; 
    accumulatePixel_mutable, "mutable"; 
] 
|> test 100000000 

System.Console.ReadLine() |> ignore 

codice C# prova

namespace AccumulatePixel 
{ 
    public class AccumulatePixel 
    { 
     unsafe public static double getPixelValue(double sum, byte* buffer, double* filter, int filterLength) 
     { 
      for (int i = 0; i < filterLength; ++i) 
      { 
       sum += (*buffer) * (*filter); 
       ++buffer; 
       ++filter; 
      } 

      return sum; 
     } 

     unsafe public static double getPixelValueOffset(double sum, byte* buffer, double* filter, int filterLength) 
     { 
      for (int i = 0; i < filterLength; ++i) 
      { 
       sum += buffer[i] * filter[i]; 
      } 

      return sum; 
     } 
    } 
}