2013-06-13 11 views
7

Sto facendo cose con STM e ho tra le altre cose usato la struttura dati TBQueue con grande successo. Una caratteristica utile che ho usato per implica la lettura da esso sulla base di un presupposto in un TVar, in fondo in questo modo:STM con atomicità parziale per alcuni TVars

shouldRead <- readTVar shouldReadVar 
if shouldRead 
    then do 
    a <- readTBQueue queue 
    doSomethingWith a 
    else doSomethingElse 

Se assumiamo che queue è vuota e shouldReadVar contiene True prima di eseguire questo blocco, comporterà readTBQueue chiamando retry e il blocco verrà rieseguito quando shouldReadVar contiene False o queue contiene un elemento, qualunque cosa accada prima.


Sono ora bisogno di una struttura di dati di canale sincrono, simile alla struttura descritta nel this article (Si prega di leggere, se si vuole comprendere questa domanda), tranne che deve essere leggibile con un pre-condizione di come nell'esempio precedente, e possibilmente anche comporre altre cose.

Chiamiamo questa struttura dati SyncChan con le operazioni writeSyncChan e readSyncChan definite su di esso.

Ed ecco un possibile caso d'uso: Questo codice (pseudo) (che non funziona perché mescolo concetti STM/IO):

shouldRead <- readTVar shouldReadVar 
if shouldRead 
    then do 
    a <- readSyncChan syncChan 
    doSomethingWith a 
    else doSomethingElse 

Partendo dal presupposto che nessun altro thread sta attualmente bloccando in una chiamata writeSyncChan, e shouldReadChan, contiene il True, voglio il blocco "" fino a shouldReadChan contiene False, o un diverso blocco di thread su un writeSyncChan. In altre parole: quando un threads su writeSyncChan e un altro blocco di thread raggiunge uno readSyncChan o viceversa, voglio che il valore sia trasferito lungo il canale. In tutti gli altri casi, entrambe le parti dovrebbero trovarsi nello stato retry e reagire quindi a una modifica in shouldReadVar, in modo che la lettura o la scrittura possano essere annullate.

L'approccio ingenuo descritto nell'articolo collegato sopra utilizzando due (T) MVar s non è ovviamente possibile. Poiché la struttura dei dati è sincrona, è impossibile utilizzarla entro due blocchi atomically, poiché non è possibile modificarne uno TMVar e attendere che venga modificato un altro TMVar in un contesto atomico.

Invece, sto cercando un tipo di atomicità parziale, in cui posso "commettere" una determinata parte di una transazione e restituirla solo quando alcune variabili cambiano, ma non altre. Se ho variabili "msg" e "ack" come il primo esempio nell'articolo sopra, voglio essere in grado di scrivere sulla variabile "msg", quindi attendere che un valore arrivi su "ack", o per il mio altre variabili transazionali da modificare. Se cambiano altre variabili transazionali, si dovrebbe ritentare l'intero blocco atomico e se arriva un valore "ack", la transazione dovrebbe continuare come era nello stato precedente. Per quanto riguarda la lettura, dovrebbe accadere qualcosa di simile, a parte il fatto che ovviamente leggerò "msg" e scriverò "ack".

È possibile utilizzare GHC STM o è necessario eseguire la gestione manuale MVar/rollback?

risposta

3

Questo è ciò che si vuole:

import Control.Concurrent 
import Control.Concurrent.STM 
import Control.Monad 

data SyncChan a = SyncChan (TMVar a) (TMVar()) 

newSyncChan :: IO (SyncChan a) 
newSyncChan = do 
    msg <- newEmptyTMVarIO 
    ack <- newEmptyTMVarIO 
    return (SyncChan msg ack) 

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a) 
readIf (SyncChan msg ack) shouldReadVar = do 
    b <- readTVar shouldReadVar 
    if b 
     then do 
      a <- takeTMVar msg 
      putTMVar ack() 
      return (Just a) 
     else return Nothing 

write :: SyncChan a -> a -> IO() 
write (SyncChan msg ack) a = do 
    atomically $ putTMVar msg a 
    atomically $ takeTMVar ack 

main = do 
    sc <- newSyncChan 
    tv <- newTVarIO True 
    forkIO $ forever $ forM_ [False, True] $ \b -> do 
     threadDelay 2000000 
     atomically $ writeTVar tv b 
    forkIO $ forM_ [0..] $ \i -> do 
     putStrLn "Writing..." 
     write sc i 
     putStrLn "Write Complete" 
     threadDelay 300000 
    forever $ do 
     putStrLn "Reading..." 
     a <- atomically $ readIf sc tv 
     print a 
     putStrLn "Read Complete" 

Questo dà il comportamento che aveva in mente. Mentre TVar è True, le estremità di ingresso e di uscita verranno sincronizzate tra loro. Quando lo TVar passa a False, l'estremità di lettura si interrompe liberamente e restituisce Nothing.

+0

Si presuppone che esista una struttura dati chiamata 'SyncChan' con una certa semantica. Tuttavia, non esiste una tale struttura di dati; il problema sorge quando si tenta di implementarlo. Praticamente hai preso il codice dal mio secondo blocco di codice nella domanda e ho estratto il ramo in un valore 'Maybe'. Il vero problema sta nell'implementare 'readSyncChan' e' writeSyncChan'! – dflemstr

+0

@dflemstr L'ho risolto e ho scritto l'intera implementazione, incluso il codice d'uso di esempio. –

+0

grazie per aver dedicato del tempo a scrivere tutto questo codice. Tuttavia, qui non è possibile fare scritture condizionali (con un 'shouldWriteVar', per così dire). Non funziona semplicemente aggiungendo al primo blocco 'atomically' nella funzione' write', perché se un valore è stato scritto ahd il thread sta aspettando il 'ack', non c'è modo di rispondere al cambiamento in 'shouldWriteVar'! Il modo più pulito per controllare 'shouldWriteVar' di nuovo qui, o c'è qualche altra opzione che eviti una strana situazione di deadlock che non ho considerato? – dflemstr