2016-04-21 51 views
113

Stiamo sviluppando un programma che riceve e inoltra "messaggi", mantenendo una cronologia temporanea di tali messaggi, in modo che possa dirvi la cronologia dei messaggi, se richiesta. I messaggi sono identificati numericamente, hanno in genere circa 1 kilobyte di dimensione e dobbiamo conservare centinaia di migliaia di questi messaggi.Riduzione del tempo di pausa della raccolta dati obsoleti in un programma Haskell

Vogliamo ottimizzare questo programma per la latenza: il tempo tra l'invio e la ricezione di un messaggio deve essere inferiore a 10 millisecondi.

Il programma è scritto in Haskell e compilato con GHC. Tuttavia, abbiamo riscontrato che le pause nella raccolta dei rifiuti sono troppo lunghe per i nostri requisiti di latenza: oltre 100 millisecondi nel nostro programma reale.

Il seguente programma è una versione semplificata della nostra applicazione. Utilizza uno Data.Map.Strict per archiviare i messaggi. I messaggi sono ByteString s identificati da un Int. 1.000.000 di messaggi vengono inseriti in ordine numerico crescente e i messaggi meno recenti vengono continuamente rimossi per mantenere la cronologia a un massimo di 200.000 messaggi.

module Main (main) where 

import qualified Control.Exception as Exception 
import qualified Control.Monad as Monad 
import qualified Data.ByteString as ByteString 
import qualified Data.Map.Strict as Map 

data Msg = Msg !Int !ByteString.ByteString 

type Chan = Map.Map Int ByteString.ByteString 

message :: Int -> Msg 
message n = Msg n (ByteString.replicate 1024 (fromIntegral n)) 

pushMsg :: Chan -> Msg -> IO Chan 
pushMsg chan (Msg msgId msgContent) = 
    Exception.evaluate $ 
    let 
     inserted = Map.insert msgId msgContent chan 
    in 
     if 200000 < Map.size inserted 
     then Map.deleteMin inserted 
     else inserted 

main :: IO() 
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) 

Abbiamo compilato ed eseguito questo programma utilizzando:

$ ghc --version 
The Glorious Glasgow Haskell Compilation System, version 7.10.3 
$ ghc -O2 -optc-O3 Main.hs 
$ ./Main +RTS -s 
    3,116,460,096 bytes allocated in the heap 
    385,101,600 bytes copied during GC 
    235,234,800 bytes maximum residency (14 sample(s)) 
    124,137,808 bytes maximum slop 
      600 MB total memory in use (0 MB lost due to fragmentation) 

            Tot time (elapsed) Avg pause Max pause 
    Gen 0  6558 colls,  0 par 0.238s 0.280s  0.0000s 0.0012s 
    Gen 1  14 colls,  0 par 0.179s 0.250s  0.0179s 0.0515s 

    INIT time 0.000s ( 0.000s elapsed) 
    MUT  time 0.652s ( 0.745s elapsed) 
    GC  time 0.417s ( 0.530s elapsed) 
    EXIT time 0.010s ( 0.052s elapsed) 
    Total time 1.079s ( 1.326s elapsed) 

    %GC  time  38.6% (40.0% elapsed) 

    Alloc rate 4,780,213,353 bytes per MUT second 

    Productivity 61.4% of total user, 49.9% of total elapsed 

L'importante qui è la metrica "pausa max" di 0.0515s, o 51 millisecondi. Vogliamo ridurre questo almeno di un ordine di grandezza.

La sperimentazione mostra che la durata di una pausa GC è determinata dal numero di messaggi nella cronologia. La relazione è approssimativamente lineare, o forse super-lineare. La seguente tabella mostra questa relazione. (You can see our benchmarking tests here, e some charts here.)

msgs history length max GC pause (ms) 
=================== ================= 
12500        3 
25000        6 
50000        13 
100000        30 
200000        56 
400000        104 
800000        199 
1600000       487 
3200000       1957 
6400000       5378 

Abbiamo sperimentato diverse altre variabili per trovare se sono in grado di ridurre questa latenza, nessuno dei quali fanno una grande differenza. Tra queste variabili non importanti ci sono: ottimizzazione (-O, -O2); Opzioni RTS GC (-G, -H, -A, -c), numero di core (-N), diverse strutture di dati (Data.Sequence), la dimensione dei messaggi e la quantità di rifiuti generati di breve durata. Il fattore determinante travolgente è il numero di messaggi nella cronologia.

La nostra teoria di lavoro è che le pause sono lineari nel numero di messaggi, perché ogni ciclo GC deve camminare su tutta la memoria accessibile di lavoro e copiarlo, che sono chiaramente operazioni lineari.

Domande:

  • È questa teoria lineare tempo corretto? La durata delle pause GC può essere espressa in questo modo semplice oppure la realtà è più complessa?
  • Se GC pausa è lineare nella memoria di lavoro, esiste un modo per ridurre i fattori costanti coinvolti?
  • Ci sono delle opzioni per GC incrementale, o qualcosa di simile? Possiamo vedere solo documenti di ricerca. Siamo molto disposti a scambiare il throughput per una latenza inferiore.
  • Ci sono dei modi per la memoria "partizione" per i più piccoli, i cicli di GC, diverso da dividere in più processi?
+0

Io purtroppo non può fornire alcun aiuto. Presumo che tu già [leggi questa domanda correlata] (http://stackoverflow.com/q/12404031/510937). Comunque avere un * must * richiesto per la latenza di essere al massimo di 10 millisecondi suona come un vincolo in tempo reale ... per raggiungere questo probabilmente si desidera ottimizzare lo scheduler del sistema operativo per le attività in tempo reale anche perché non ha senso ottimizzare un sacco il codice Haskell quando il sistema operativo decide che è possibile attendere 100 millisecondi ... – Bakuriu

+1

@Bakuriu: giusto, ma 10 ms dovrebbero essere ottenibili con praticamente tutti i sistemi operativi moderni senza apportare modifiche. Quando eseguo programmi C semplicistici, anche sul mio vecchio Raspberry pi, ottengono facilmente latenze nell'intervallo di 5 ms, o almeno _reliably_ qualcosa come 15 ms. – leftaroundabout

+3

Sei sicuro che il tuo test-case sia utile (ad esempio, non stai usando 'COntrol.Concurrent.Chan'? Gli oggetti mutabili cambiano l'equazione)? Ti suggerisco di iniziare assicurandoti di sapere quali rifiuti stai generando e facendo il meno possibile (ad esempio assicurati che la fusione avvenga, prova '-funbox-strict'). Forse provare a utilizzare una libreria di streaming (iostreams, pipe, conduit, streaming) e chiamare 'performGC' direttamente a intervalli più frequenti. – jberryman

risposta

77

In realtà stai facendo abbastanza bene per avere un tempo di pausa di 51 ms con oltre 200 Mb di dati live. Il sistema su cui lavoro ha un tempo di pausa massimo maggiore con metà della quantità di dati live.

Il presupposto è corretto, il tempo di pausa GC principale è direttamente proporzionale alla quantità di dati in tempo reale, e sfortunatamente non c'è modo di aggirare questo con GHC così com'è. Abbiamo sperimentato GC incrementali in passato, ma era un progetto di ricerca e non abbiamo raggiunto il livello di maturità necessario per piegarlo nel GHC rilasciato.

Una cosa che speriamo possa aiutare in questo futuro sono le regioni compatte: https://phabricator.haskell.org/D1264. È un tipo di gestione manuale della memoria in cui si compatta una struttura nell'heap e il GC non deve attraversarlo. Funziona meglio per i dati di lunga durata, ma forse sarà abbastanza buono da utilizzare per i singoli messaggi nelle impostazioni. Puntiamo ad averlo in GHC 8.2.0.

Se ci si trova in un ambiente distribuito e si dispone di un bilanciamento del carico di qualche tipo, ci sono trucchi che si possono giocare per evitare di fare il colpo di pausa, in pratica si assicura che il bilanciatore del carico non invii richieste alle macchine che stanno per fare un importante GC e, naturalmente, assicurati che la macchina continui a completare il GC anche se non riceve richieste.

+13

Ciao Simon, grazie mille per la tua risposta dettagliata! È una cattiva notizia, ma è bello aver chiuso. Al momento ci stiamo muovendo verso un'implementazione mutabile essendo l'unica alternativa adatta. Alcune cose che non capiamo: (1) Quali sono i trucchi coinvolti nello schema di bilanciamento del carico - coinvolgono manualmente 'performGC'? (2) Perché la compattazione con '-c' ha un rendimento peggiore - supponiamo perché non trova molte cose che può lasciare sul posto? (3) Ci sono ulteriori dettagli sui compatti? Sembra molto interessante ma sfortunatamente è un po 'troppo lontano nel futuro per noi da considerare. – jameshfisher

+0

Qualche notizia sulle prestazioni di questo nuovo GC? – mljrg

3

Bene, hai trovato la limitazione delle lingue con GC: non sono adatte per i sistemi hardcore in tempo reale.

hai 2 opzioni:

1 ° Aumentare la taglia del mucchio e utilizzare un sistema di caching livello 2, i messaggi più vecchi vengono inviati a disco e mantenere il messaggio più recente sulla memoria, è possibile farlo utilizzando OS paging. Il problema, sebbene con questa soluzione, è che il paging può essere costoso a seconda delle capacità di lettura dell'unità di memoria secondaria utilizzata.

2nd Programmare la soluzione utilizzando 'C' e interfacciarla con FFI ad haskell. In questo modo puoi gestire la tua memoria personale. Questa sarebbe l'opzione migliore in quanto puoi controllare da solo la memoria che ti serve.

+0

Ciao Fernando. Grazie per questo. Il nostro sistema è solo "soft" in tempo reale, ma nel nostro caso abbiamo trovato GC troppo punitivo anche per il soft in tempo reale. Ci stiamo decisamente appoggiando alla tua soluzione n. 2. – jameshfisher

7

Devo essere d'accordo con gli altri - se si hanno dei vincoli reali in tempo reale, quindi l'uso di un linguaggio GC non è l'ideale.

Tuttavia, si potrebbe considerare di sperimentare con altre strutture di dati disponibili anziché solo Data.Map.

ho riscritto usando Data.Sequence e ottenuto alcuni miglioramenti promettenti:

msgs history length max GC pause (ms) 
=================== ================= 
12500        0.7 
25000        1.4 
50000        2.8 
100000        5.4 
200000       10.9 
400000       21.8 
800000       46 
1600000       87 
3200000       175 
6400000       350 

Anche se si sta ottimizzando per la latenza, ho notato altre metriche migliorando anche. Nel caso 200000, il tempo di esecuzione scende da 1,5 a 0,2 secondi e l'utilizzo della memoria totale scende da 600 MB a 27 MB.

Vorrei sottolineare che ho barato modificando il design:

  • ho rimosso il Int dalla Msg, quindi non è in due punti.
  • Invece di usare una mappa da Int s per ByteString s, ho usato un Sequence di ByteString s, e, invece di uno Int per messaggio, penso che può essere fatto con una Int per l'intera Sequence. Supponendo che i messaggi non possano essere riordinati, puoi usare un singolo offset per tradurre quale messaggio vuoi dove si trova nella coda.

(ho inserito una funzione aggiuntiva getMsg di dimostrare che.)

{-# LANGUAGE BangPatterns #-} 

import qualified Control.Exception as Exception 
import qualified Control.Monad as Monad 
import qualified Data.ByteString as ByteString 
import Data.Sequence as S 

newtype Msg = Msg ByteString.ByteString 

data Chan = Chan Int (Seq ByteString.ByteString) 

message :: Int -> Msg 
message n = Msg (ByteString.replicate 1024 (fromIntegral n)) 

maxSize :: Int 
maxSize = 200000 

pushMsg :: Chan -> Msg -> IO Chan 
pushMsg (Chan !offset sq) (Msg msgContent) = 
    Exception.evaluate $ 
     let newSize = 1 + S.length sq 
      newSq = sq |> msgContent 
     in 
     if newSize <= maxSize 
      then Chan offset newSq 
      else 
       case S.viewl newSq of 
        (_ :< newSq') -> Chan (offset+1) newSq' 
        S.EmptyL -> error "Can't happen" 

getMsg :: Chan -> Int -> Maybe Msg 
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset) 
    where 
    getMsg' i 
     | i < 0   = Nothing 
     | i >= S.length sq = Nothing 
     | otherwise  = Just (Msg (S.index sq i)) 

main :: IO() 
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize]) 
+3

Ciao! Grazie per la tua risposta. I tuoi risultati mostrano sicuramente ancora il rallentamento lineare, ma è piuttosto interessante che tu abbia ottenuto una simile accelerazione da 'Data.Sequence' - l'abbiamo testato e trovato che sia effettivamente peggiore di Data.Map! Non sono sicuro di quale sia la differenza, quindi dovrò investigare ... – jameshfisher

9

Ho provato lo snippet di codice con un approccio ringbuffer utilizzando IOVector come struttura dati sottostante. Sul mio sistema (GHC 7.10.3, stesse opzioni di compilazione) questo ha comportato una riduzione del tempo massimo (la metrica che hai menzionato nel tuo OP) del ~ 22%.

NB. Ho fatto due ipotesi qui:

  1. Una struttura di dati mutabile è una misura va bene per il problema (credo messaggio passaggio implica IO comunque)
  2. vostri Messageid di sono continui

Con qualche ulteriore parametro Int e aritmetica (come quando i messaggi ID sono reimpostati su 0 o minBound) dovrebbe quindi essere semplice determinare se un determinato messaggio è ancora nella cronologia e recuperarlo dall'indice corrispondente nel ringbuffer.

Per il vostro piacere di prova:

import qualified Control.Exception as Exception 
import qualified Control.Monad as Monad 
import qualified Data.ByteString as ByteString 
import qualified Data.Map.Strict as Map 

import qualified Data.Vector.Mutable as Vector 

data Msg = Msg !Int !ByteString.ByteString 

type Chan = Map.Map Int ByteString.ByteString 

data Chan2 = Chan2 
    { next   :: !Int 
    , maxId   :: !Int 
    , ringBuffer :: !(Vector.IOVector ByteString.ByteString) 
    } 

chanSize :: Int 
chanSize = 200000 

message :: Int -> Msg 
message n = Msg n (ByteString.replicate 1024 (fromIntegral n)) 


newChan2 :: IO Chan2 
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize 

pushMsg2 :: Chan2 -> Msg -> IO Chan2 
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) = 
    let ix' = if ix == chanSize then 0 else ix + 1 
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store) 

pushMsg :: Chan -> Msg -> IO Chan 
pushMsg chan (Msg msgId msgContent) = 
    Exception.evaluate $ 
    let 
     inserted = Map.insert msgId msgContent chan 
    in 
     if chanSize < Map.size inserted 
     then Map.deleteMin inserted 
     else inserted 

main, main1, main2 :: IO() 

main = main2 

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) 

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000]) 
+2

Ciao! Bella risposta. Sospetto che il motivo per cui questo ha ottenuto solo un aumento del 22% è dovuto al fatto che GC deve ancora percorrere i valori di 'IOVector' e (immutabili, GC'd) in ciascun indice. Stiamo attualmente esaminando le opzioni per la reimplementazione utilizzando strutture mutabili. È probabile che sia simile al tuo sistema di buffer ad anello. Ma lo stiamo spostando completamente al di fuori dello spazio di memoria di Haskell per eseguire la nostra gestione manuale della memoria. – jameshfisher

+8

@jamesfisher: In realtà stavo affrontando un problema simile, ma ho deciso di mantenere la gestione dei mem sul lato Haskell. La soluzione era in effetti un buffer circolare, che conserva una copia bytewise dei dati originali in un unico blocco continuo di memoria, risultando così in un singolo valore Haskell. Date un'occhiata a questo [Gist RingBuffer.hs] (https://gist.github.com/mgmeier/d0febcc79e79b25155ac18180057ea16). L'ho testato con il tuo codice di esempio e ho ottenuto una velocità di circa il 90% della metrica critica. Sentiti libero di usare il codice a tuo piacimento. – mgmeier