2013-05-26 16 views
64

Nell'applicazione GHC Haskell che utilizza stm, conduit di rete e conduit, ho un filo per ogni socket che viene biforcato automaticamente utilizzando runTCPServer. I fili possono comunicare con altri strati attraverso l'uso di un TChan di trasmissione.Un conduit di elaborazione, 2 origini IO dello stesso tipo

Questo mette in mostra come vorrei per impostare il condotto "catena":

enter image description here

Quindi, quello che abbiamo qui è di due fonti (ciascuna destinata a condotti di supporto quale) che producono un oggetto Packet quale encoder accetterà e trasformerà in ByteString, quindi invierà il socket. Ho avuto una grande quantità di difficoltà con l'efficiente (la prestazione è una preoccupazione) fusione dei due ingressi.

Gradirei se qualcuno potesse indicarmi la giusta direzione.

Poiché sarebbe scortese da parte mia postare questa domanda senza fare un tentativo, inserirò quello che ho provato in precedenza qui;

Ho scritto/cherrypicked una funzione che (bloccando) produce una sorgente da un TMChan (canale chiudibile);

-- | Takes a generic type of STM chan and, given read and close functionality, 
-- returns a conduit 'Source' which consumes the elements of the channel. 
chanSource 
    :: (MonadIO m, MonadSTM m) 
    => a     --^The channel 
    -> (a -> STM (Maybe b)) --^The read function 
    -> (a -> STM())  --^The close/finalizer function 
    -> Source m b 
chanSource ch readCh closeCh = ConduitM pull 
    where close  = liftSTM $ closeCh ch 
      pull  = PipeM $ liftSTM $ readCh ch >>= translate 
      translate = return . maybe (Done()) (HaveOutput pull close) 

Allo stesso modo, una funzione per trasformare un Chan in un sink;

-- | Takes a stream and, given write and close functionality, returns a sink 
-- which wil consume elements and broadcast them into the channel 
chanSink 
    :: (MonadIO m, MonadSTM m) 
    => a     --^The channel 
    -> (a -> b -> STM()) --^The write function 
    -> (a -> STM())  --^The close/finalizer function 
    -> Sink b m() 
chanSink ch writeCh closeCh = ConduitM sink 
    where close = const . liftSTM $ closeCh ch 
      sink = NeedInput push close 
      write = liftSTM . writeCh ch 
      push x = PipeM $ write x >> return sink 

Quindi mergeSources è semplice; fork 2 thread (che davvero non voglio fare, ma che diamine) che può mettere i loro nuovi elementi nella lista che poi produco come sorgente;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns 
-- a source which consumes the elements of the channel. 
mergeSources 
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m) 
    => [Source (ResourceT m) a]    --^The list of sources 
    -> ResourceT m (Source (ResourceT m) a) 
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn 
    where push c s = s $$ chanSink c writeTMChan closeTMChan 
      fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x 
      retn c = return $ chanSource c readTMChan closeTMChan 

Mentre ero riuscito a rendere queste funzioni TYPECHECK, ero riuscito a ottenere qualsiasi utilizzo di queste funzioni per TYPECHECK;

-- | Helper which represents a conduit chain for each client connection 
serverApp :: Application SessionIO 
serverApp appdata = do 
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast 
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata 
    mergsrc $$ protocol $= encoder =$ appSink appdata 
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan 
      mergsrc = mergeSources [appSource appdata $= decoder, chansrc] 

-- | Structure which holds mutable information for clients 
data SessionState = SessionState 
    { _ssBroadcast  :: TMChan Packet --^Outbound packet broadcast channel 
    } 

makeLenses ''SessionState 

-- | A transformer encompassing both SessionReader and SessionState 
type Session m = ReaderT SessionReader (StateT SessionState m) 

-- | Macro providing Session applied to an IO monad 
type SessionIO = Session IO 

Vedo comunque questo metodo come imperfetto: ci sono molti elenchi e conversioni intermedie. Questo non può essere buono per le prestazioni. Cerco consiglio.


PS. Da quello che posso capire, questo non è un duplicato di; Fusing conduits with multiple inputs, poiché nella mia situazione entrambe le sorgenti producono lo stesso tipo e non mi interessa da quale origine venga prodotto l'oggetto Packet, a condizione che non stia aspettando uno mentre un altro ha oggetti pronti per essere consumato.

PPS. Mi scuso per l'utilizzo (e quindi il requisito di conoscenza) di Lens nel codice di esempio.

+2

C'è un motivo per cui non si utilizza 'Data.Conduit.TMChan' dal pacchetto' stm-conduit'? Ha tutte le funzioni che stai definendo, incluso 'mergeSources'. –

+0

In realtà c'è - Mi piacerebbe che la fonte che si fonde sia per chiudere non appena le fonti si chiudono. Il pacchetto stm-conduit usa refcounts (e attende fino a quando l'ultima sorgente si chiude per chiudere la sorgente risultante) che non è il comportamento desiderato. Chiudendo immediatamente dopo l'invalidità di una delle due fonti, mi viene data la possibilità, quando chiudo il mio TMChan globale, di chiudere tempestivamente anche ogni socket. – kvanberendonck

+3

Un pensiero inattivo: che cosa succede se prendi le unioni da TMChan, esci dal conteggio dei ref e sostituisci il bit 'ref RefContount' con il codice per chiudere tutte le fonti? – Iain

risposta

1

non so se può essere di aiuto, ma ho cercato di realizzare suggerimento di Iain e feci una variante di mergeSources' che interrompe non appena uno qualsiasi dei canali fa:

mergeSources' :: (MonadIO m, MonadBaseControl IO m) 
       => [Source (ResourceT m) a] --^The sources to merge. 
       -> Int --^The bound of the intermediate channel. 
       -> ResourceT m (Source (ResourceT m) a) 
mergeSources' sx bound = do 
    c <- liftSTM $ newTBMChan bound 
    mapM_ (\s -> resourceForkIO $ 
        s $$ chanSink c writeTBMChan closeTBMChan) sx 
    return $ sourceTBMChan c 

(Questa semplice aggiunta è disponibile here).

Alcuni commenti alla versione di mergeSources (portarli con un grano di sale, può essere non ho capito bene cosa):

  • Utilizzando ...TMChan invece di ...TBMChan sembra pericoloso. Se gli scrittori sono più veloci del lettore, il tuo mucchio esploderà. Guardando il tuo diagramma sembra che questo possa facilmente accadere, se il tuo peer TCP non legge i dati abbastanza velocemente. Quindi utilizzerei sicuramente lo ...TBMChan con un limite forse maggiore ma limitato.
  • Non è necessario il vincolo MonadSTM m. Tutta roba STM è avvolto in IO con

    liftSTM = liftIO . atomically 
    

    Forse questo ti aiuterà un po 'quando si utilizza mergeSources' in serverApp.

  • solo una questione estetica, ho trovato

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn 
    

    molto difficile da leggere a causa del suo uso di liftA2 sul (->) r monade. Direi

    do 
        c <- liftSTM newTMChan 
        fsrc sx c 
        retn c 
    

    sarebbe più lungo, ma molto più facile da leggere.

Potrebbe forse creare un progetto autonomo in cui sarebbe possibile giocare con serverApp?

+0

Grazie per il consiglio. Lo terrò a mente (dovrò rivisitare il problema a breve). – kvanberendonck