Nell'applicazione GHC Haskell
che utilizza stm, conduit di rete e conduit, ho un filo per ogni socket che viene biforcato automaticamente utilizzando runTCPServer
. I fili possono comunicare con altri strati attraverso l'uso di un TChan di trasmissione.Un conduit di elaborazione, 2 origini IO dello stesso tipo
Questo mette in mostra come vorrei per impostare il condotto "catena":
Quindi, quello che abbiamo qui è di due fonti (ciascuna destinata a condotti di supporto quale) che producono un oggetto Packet
quale encoder
accetterà e trasformerà in ByteString
, quindi invierà il socket. Ho avuto una grande quantità di difficoltà con l'efficiente (la prestazione è una preoccupazione) fusione dei due ingressi.
Gradirei se qualcuno potesse indicarmi la giusta direzione.
Poiché sarebbe scortese da parte mia postare questa domanda senza fare un tentativo, inserirò quello che ho provato in precedenza qui;
Ho scritto/cherrypicked una funzione che (bloccando) produce una sorgente da un TMChan (canale chiudibile);
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a --^The channel
-> (a -> STM (Maybe b)) --^The read function
-> (a -> STM()) --^The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done()) (HaveOutput pull close)
Allo stesso modo, una funzione per trasformare un Chan in un sink;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a --^The channel
-> (a -> b -> STM()) --^The write function
-> (a -> STM()) --^The close/finalizer function
-> Sink b m()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
Quindi mergeSources è semplice; fork 2 thread (che davvero non voglio fare, ma che diamine) che può mettere i loro nuovi elementi nella lista che poi produco come sorgente;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] --^The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
Mentre ero riuscito a rendere queste funzioni TYPECHECK, ero riuscito a ottenere qualsiasi utilizzo di queste funzioni per TYPECHECK;
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet --^Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
Vedo comunque questo metodo come imperfetto: ci sono molti elenchi e conversioni intermedie. Questo non può essere buono per le prestazioni. Cerco consiglio.
PS. Da quello che posso capire, questo non è un duplicato di; Fusing conduits with multiple inputs, poiché nella mia situazione entrambe le sorgenti producono lo stesso tipo e non mi interessa da quale origine venga prodotto l'oggetto Packet
, a condizione che non stia aspettando uno mentre un altro ha oggetti pronti per essere consumato.
PPS. Mi scuso per l'utilizzo (e quindi il requisito di conoscenza) di Lens nel codice di esempio.
C'è un motivo per cui non si utilizza 'Data.Conduit.TMChan' dal pacchetto' stm-conduit'? Ha tutte le funzioni che stai definendo, incluso 'mergeSources'. –
In realtà c'è - Mi piacerebbe che la fonte che si fonde sia per chiudere non appena le fonti si chiudono. Il pacchetto stm-conduit usa refcounts (e attende fino a quando l'ultima sorgente si chiude per chiudere la sorgente risultante) che non è il comportamento desiderato. Chiudendo immediatamente dopo l'invalidità di una delle due fonti, mi viene data la possibilità, quando chiudo il mio TMChan globale, di chiudere tempestivamente anche ogni socket. – kvanberendonck
Un pensiero inattivo: che cosa succede se prendi le unioni da TMChan, esci dal conteggio dei ref e sostituisci il bit 'ref RefContount' con il codice per chiudere tutte le fonti? – Iain