{-# LANGUAGE GADTs, EmptyDataDecls #-}
module CHS (
CHS,
Chan,
newChan,
readChan,
writeChan,
chsThread,
test1, test2, test3
) where
import Control.Monad
import Control.Applicative
import Control.Concurrent.STM hiding (orElse)
import Unsafe.Coerce(unsafeCoerce) -- for unique channels
import System.IO.Unsafe (unsafePerformIO) -- for allocating a global variable
import Data.Unique
-- Interface
type CHS = CHSState
-- instance Functor CHS
-- instance Applicative CHS
-- instance Monad CHS
-- instance MonadPlus CHS
synchronize :: CHS a -> IO a
data Chan a = Chan Unique deriving Eq
-- instance Show (Chan a)
newChan :: IO (Chan a)
readChan :: Chan a -> CHS a
writeChan :: Chan a -> a -> CHS ()
-- Internals
instance Show (Chan a) where
show (Chan u) = "Chan " ++ show (hashUnique u)
-- represents the single-thread state of a "synchronize"
data CHSState a where
Complete :: a -> CHSState a
BlockedRead :: Chan a -> (a -> CHSState b) -> CHSState b
BlockedWrite :: a -> Chan a -> CHSState b -> CHSState b
OrElse :: CHSState a -> CHSState a -> CHSState a
Failed :: CHSState a
instance Show a => Show (CHSState a) where
show (Complete a) = "Complete " ++ show a
show (BlockedRead c _) = "BlockedRead " ++ show c
show (BlockedWrite _ c _) = "BlockedWrite " ++ show c
show Failed = "Failed"
show (OrElse a b) = show a ++ " `OrElse` " ++ show b
instance Functor CHSState where
fmap = liftM
instance Applicative CHSState where
pure = return
(<*>) = ap
instance Monad CHSState where
return = Complete
fail _ = Failed
Complete a >>= f = f a
BlockedRead c k >>= f = BlockedRead c $ \a -> (k a >>= f)
BlockedWrite a c k >>= f = BlockedWrite a c (k >>= f)
Failed >>= _ = Failed
OrElse a b >>= f = OrElse (a >>= f) (b >>= f)
instance MonadPlus CHSState where
mzero = Failed
mplus = OrElse
data LZipper a = Zip [a] [a]
fromList :: [a] -> LZipper a
fromList xs = Zip [] xs
lastp :: LZipper a -> Bool
lastp (Zip _ [_]) = True
lastp _ = False
endp :: LZipper a -> Bool
endp (Zip _ rs) = null rs
zright :: LZipper a -> LZipper a
zright (Zip ls (a:rs)) = Zip (a:ls) rs
zright z = z
zleft :: LZipper a -> LZipper a
zleft (Zip (a:ls) rs) = Zip ls (a:rs)
zleft z = z
toList :: LZipper a -> [a]
toList (Zip ls rs) = reverse ls ++ rs
cursor :: LZipper a -> a
cursor (Zip _ (a:_)) = a
pop :: LZipper a -> LZipper a
pop (Zip ls (_:rs)) = Zip ls rs
pop z = z
ins :: a -> LZipper a -> LZipper a
ins a (Zip ls rs) = Zip ls (a:rs)
select :: [a] -> [(a,LZipper a)]
select xs = select' (fromList xs) where
select' z | endp z = []
| otherwise = (cursor z, pop z) : select' (zright z)
data TypeEq a b where Refl :: TypeEq a a
chanEq :: MonadPlus m => Chan a -> Chan b -> m (TypeEq a b)
chanEq (Chan a) (Chan b)
| a == b = return (unsafeCoerce Refl)
| otherwise = mzero
stepSynchronize :: [CHSState a] -> [[CHSState a]]
stepSynchronize [] = []
stepSynchronize (Failed : _) = []
stepSynchronize (Complete a : xs) = do
xs' <- stepSynchronize xs
return (Complete a : xs')
stepSynchronize (BlockedRead c k : xs) = mplus
(do (BlockedWrite a c2 k2, z) <- select xs
Refl <- chanEq c c2
return (k a : (toList $ ins k2 z))
)
(do xs' <- stepSynchronize xs
return (BlockedRead c k : xs'))
stepSynchronize (BlockedWrite a c k : xs) = mplus
(do (BlockedRead c2 k2, z) <- select xs
Refl <- chanEq c c2
return (k : (toList $ ins (k2 a) z))
)
(do xs' <- stepSynchronize xs
return (BlockedWrite a c k : xs'))
stepSynchronize (OrElse a b : xs) = [a : xs, b: xs]
readChan c = BlockedRead c return
writeChan c a = BlockedWrite a c $ return ()
-- it's actually important that we put the later
-- subsets first; It means we will complete the
-- oldest set of computations that can successfully complete
-- with the current data
-- (although it probably means we waste work retrying lots of
-- combinations of computations that are guaranteed to fail;
-- an optimization would be to track these somehow and not
-- try them again except with new computations together)
splitSets :: [a] -> [([a], [a])]
splitSets [] = [([], [])]
splitSets (x:xs) = [ (l, x:r) | (l,r) <- splitSets xs ]
++ [ (x:l, r) | (l,r) <- splitSets xs ]
trySynchronize :: [CHSState a] -> [([a], [CHSState a])]
trySynchronize gang = do
(g, r) <- splitSets gang
guard (not $ null g)
res <- runSynch gang
return (res, r)
-- depth-first search of the connection space
runSynch :: [CHSState a]-> [[a]]
runSynch gang | complete gang = return [ x | Complete x <- gang ]
| otherwise = stepSynchronize gang >>= runSynch
where
complete g = all isComplete g
isComplete (Complete _) = True
isComplete _ = False
data CHSRes where
CHSRes :: TVar (Maybe a) -> a -> CHSRes
writeResult :: CHSRes -> STM ()
writeResult (CHSRes v a) = writeTVar v (Just a)
chsThread :: IO ()
chsThread = forever $ atomically $ do
gang <- readTVar chsBlocked
case (trySynchronize gang) of
[] -> retry
((results, gang') : _) -> do
writeTVar v gang'
mapM_ writeResult results
chsBlocked :: TVar [CHSState CHSRes]
chsBlocked = unsafePerformIO $ newTVarIO []
{-# NOINLINE chsBlocked #-}
fromJustM :: MonadPlus m => m (Maybe a) -> m a
fromJustM m = do
x <- m
case x of (Just a) -> return a
Nothing -> mzero
synchronize computation = do
v <- newTVarIO Nothing
let genericComp = CHSRes v <$> computation
atomically $ modifyTVar chsBlocked (genericComp :)
atomically $ fromJustM $ readTVar v -- blocks until var is written
newChan = do
u <- newUnique
return (Chan u)
modifyTVar v f = do
x <- readTVar v
writeTVar v (f x)
-----------
-- TESTS --
-----------
testCh :: Chan Int
testCh = unsafePerformIO newChan
{-# NOINLINE testCh #-}
test1 = do
x <- readChan testCh
if x == 0 then Failed else do
writeChan testCh (100 `div` x)
return x
test2 = do
writeChan testCh 0 `OrElse` writeChan testCh 5
readChan testCh
test3 = return 100
step m = do x <- m; stepSynchronize x
gang = [test3, test1, test2, test1, test1, test2]