2014-07-09 12 views
8

Voglio gli stessi dati per essere divisi in due "rami" per essere processati separatamente, poi "unito" ...Come si crea un conduit "ramificato"?

       +----------+ 
       +---------+ -->| doublber |--- +--------+ 
    +--------+ |   |-- +----------+ -->|  | +------+ 
    | source |-->| splitter|      | summer |-->| sink | 
    +--------+ |   |-- +----------+ -->|  | +------+ 
       +---------+ -->| delayer |--- +--------+ 
           +----------+ 

Come posso fare questo?

Il mio tentativo:

import Data.Conduit 
import Control.Monad.IO.Class 
import qualified Data.Conduit.List as CL 
-- import Data.Conduit.Internal (zipSources) 
import Control.Arrow ((>>>)) 

source :: Source IO Int 
source = do 
    x <- liftIO $ getLine 
    yield (read x) 
    source 

splitter :: Conduit Int IO (Int, Int) 
splitter = CL.map $ \x -> (x,x) 

doubler = CL.map (* 2) 

delayer :: Conduit Int IO Int 
delayer = do 
    yield 0 
    CL.map id 

twoConduitBranches :: Monad m => Conduit a m b -> Conduit c m d -> Conduit (a,b) m (c,d) 
twoConduitBranches q w = awaitForever $ \(x, y) -> do 
    out1 <- undefined q x 
    out2 <- undefined w y 
    yield (out1, out2) 


summer :: Conduit (Int,Int) IO Int 
summer = CL.map $ \(x,y) -> x + y 

sink :: Sink Int IO() 
sink = CL.mapM_ (show >>> putStrLn) 

-- combosrc = zipSources (source $= delayer) (source $= doubler) 
main = source $= splitter $= twoConduitBranches doubler delayer $= summer $$ sink 

Cosa devo scrivere al posto dei undefined s?

risposta

3

Si può fare questo, ma è brutto, e, auspicabilmente, l'attuazione sarà mettere in chiaro il motivo per cui è brutto e non un built-in funzione del condotto:

twoConduitBranches :: Monad m => Conduit a m c -> Conduit b m d -> Conduit (a,b) m (c,d) 
twoConduitBranches q w = getZipConduit 
     (ZipConduit (CL.map fst =$= q =$= CL.map Left) 
    <* ZipConduit (CL.map snd =$= w =$= CL.map Right)) =$= collapse 
    where 
    collapse = do 
     v1 <- await 
     case v1 of 
      Nothing -> return() 
      Just (Left _) -> error "out of sequence 1" 
      Just (Right d) -> do 
       v2 <- await 
       case v2 of 
        Nothing -> error "mismatched count" 
        Just (Right _) -> error "out of sequence 2" 
        Just (Left c) -> do 
         yield (c, d) 
         collapse 

(Nota: ho ottimizzato il tipo di firma di un bit, presumo che ciò è la firma tipo si voleva davvero)

Ecco il metodo:. trasformare q in un Conduit che prende il primo valore da ogni tupla in entrata, e poi avvolgere la sua uscita con Left. Allo stesso modo, prendiamo il secondo valore da ogni tupla in entrata e lo passiamo a w, quindi avvolgiamo l'output con Right.

Ora che questi Conduit s hanno lo stesso tipo (prendono le stesse tuple di input e generano gli stessi valori di Entrambi), li uniamo usando ZipConduit, che condivide l'input tra tutti i componenti e unisce l'output in un singolo flusso .

Questo flusso è un flusso di Either c d, non lo (c, d) desiderato. Per effettuare questa conversione finale, usiamo collapse. Espelle un valore Right e Left e quindi li mette insieme in una singola tupla che produce.

Questa funzione presuppone che la sequenza dei valori di uscita sarà sempre essere un valore da w, e poi uno da q. Se succede qualcos'altro, genererà un'eccezione. Il problema è: non c'è nulla in causa che implichi che in effetti genereranno output alla stessa velocità. Infatti, il condotto è specificamente progettato per evitare tale ipotesi!

Quindi, se si sa che i conduit a due componenti produrranno sempre output alla stessa velocità, questa funzione funzionerà. Ma questo non sarà vero in generale.