{-# LANGUAGE GADTs, EmptyDataDecls #-} module CHS ( CHS, Chan, newChan, readChan, writeChan, chsThread, test1, test2, test3 ) where import Control.Monad import Control.Applicative import Control.Concurrent.STM hiding (orElse) import Unsafe.Coerce(unsafeCoerce) -- for unique channels import System.IO.Unsafe (unsafePerformIO) -- for allocating a global variable import Data.Unique -- Interface type CHS = CHSState -- instance Functor CHS -- instance Applicative CHS -- instance Monad CHS -- instance MonadPlus CHS synchronize :: CHS a -> IO a data Chan a = Chan Unique deriving Eq -- instance Show (Chan a) newChan :: IO (Chan a) readChan :: Chan a -> CHS a writeChan :: Chan a -> a -> CHS () -- Internals instance Show (Chan a) where show (Chan u) = "Chan " ++ show (hashUnique u) -- represents the single-thread state of a "synchronize" data CHSState a where Complete :: a -> CHSState a BlockedRead :: Chan a -> (a -> CHSState b) -> CHSState b BlockedWrite :: a -> Chan a -> CHSState b -> CHSState b OrElse :: CHSState a -> CHSState a -> CHSState a Failed :: CHSState a instance Show a => Show (CHSState a) where show (Complete a) = "Complete " ++ show a show (BlockedRead c _) = "BlockedRead " ++ show c show (BlockedWrite _ c _) = "BlockedWrite " ++ show c show Failed = "Failed" show (OrElse a b) = show a ++ " `OrElse` " ++ show b instance Functor CHSState where fmap = liftM instance Applicative CHSState where pure = return (<*>) = ap instance Monad CHSState where return = Complete fail _ = Failed Complete a >>= f = f a BlockedRead c k >>= f = BlockedRead c $ \a -> (k a >>= f) BlockedWrite a c k >>= f = BlockedWrite a c (k >>= f) Failed >>= _ = Failed OrElse a b >>= f = OrElse (a >>= f) (b >>= f) instance MonadPlus CHSState where mzero = Failed mplus = OrElse data LZipper a = Zip [a] [a] fromList :: [a] -> LZipper a fromList xs = Zip [] xs lastp :: LZipper a -> Bool lastp (Zip _ [_]) = True lastp _ = False endp :: LZipper a -> Bool endp (Zip _ rs) = null rs zright :: LZipper a -> LZipper a zright (Zip ls (a:rs)) = Zip (a:ls) rs zright z = z zleft :: LZipper a -> LZipper a zleft (Zip (a:ls) rs) = Zip ls (a:rs) zleft z = z toList :: LZipper a -> [a] toList (Zip ls rs) = reverse ls ++ rs cursor :: LZipper a -> a cursor (Zip _ (a:_)) = a pop :: LZipper a -> LZipper a pop (Zip ls (_:rs)) = Zip ls rs pop z = z ins :: a -> LZipper a -> LZipper a ins a (Zip ls rs) = Zip ls (a:rs) select :: [a] -> [(a,LZipper a)] select xs = select' (fromList xs) where select' z | endp z = [] | otherwise = (cursor z, pop z) : select' (zright z) data TypeEq a b where Refl :: TypeEq a a chanEq :: MonadPlus m => Chan a -> Chan b -> m (TypeEq a b) chanEq (Chan a) (Chan b) | a == b = return (unsafeCoerce Refl) | otherwise = mzero stepSynchronize :: [CHSState a] -> [[CHSState a]] stepSynchronize [] = [] stepSynchronize (Failed : _) = [] stepSynchronize (Complete a : xs) = do xs' <- stepSynchronize xs return (Complete a : xs') stepSynchronize (BlockedRead c k : xs) = mplus (do (BlockedWrite a c2 k2, z) <- select xs Refl <- chanEq c c2 return (k a : (toList $ ins k2 z)) ) (do xs' <- stepSynchronize xs return (BlockedRead c k : xs')) stepSynchronize (BlockedWrite a c k : xs) = mplus (do (BlockedRead c2 k2, z) <- select xs Refl <- chanEq c c2 return (k : (toList $ ins (k2 a) z)) ) (do xs' <- stepSynchronize xs return (BlockedWrite a c k : xs')) stepSynchronize (OrElse a b : xs) = [a : xs, b: xs] readChan c = BlockedRead c return writeChan c a = BlockedWrite a c $ return () -- it's actually important that we put the later -- subsets first; It means we will complete the -- oldest set of computations that can successfully complete -- with the current data -- (although it probably means we waste work retrying lots of -- combinations of computations that are guaranteed to fail; -- an optimization would be to track these somehow and not -- try them again except with new computations together) splitSets :: [a] -> [([a], [a])] splitSets [] = [([], [])] splitSets (x:xs) = [ (l, x:r) | (l,r) <- splitSets xs ] ++ [ (x:l, r) | (l,r) <- splitSets xs ] trySynchronize :: [CHSState a] -> [([a], [CHSState a])] trySynchronize gang = do (g, r) <- splitSets gang guard (not $ null g) res <- runSynch gang return (res, r) -- depth-first search of the connection space runSynch :: [CHSState a]-> [[a]] runSynch gang | complete gang = return [ x | Complete x <- gang ] | otherwise = stepSynchronize gang >>= runSynch where complete g = all isComplete g isComplete (Complete _) = True isComplete _ = False data CHSRes where CHSRes :: TVar (Maybe a) -> a -> CHSRes writeResult :: CHSRes -> STM () writeResult (CHSRes v a) = writeTVar v (Just a) chsThread :: IO () chsThread = forever $ atomically $ do gang <- readTVar chsBlocked case (trySynchronize gang) of [] -> retry ((results, gang') : _) -> do writeTVar v gang' mapM_ writeResult results chsBlocked :: TVar [CHSState CHSRes] chsBlocked = unsafePerformIO $ newTVarIO [] {-# NOINLINE chsBlocked #-} fromJustM :: MonadPlus m => m (Maybe a) -> m a fromJustM m = do x <- m case x of (Just a) -> return a Nothing -> mzero synchronize computation = do v <- newTVarIO Nothing let genericComp = CHSRes v <$> computation atomically $ modifyTVar chsBlocked (genericComp :) atomically $ fromJustM $ readTVar v -- blocks until var is written newChan = do u <- newUnique return (Chan u) modifyTVar v f = do x <- readTVar v writeTVar v (f x) ----------- -- TESTS -- ----------- testCh :: Chan Int testCh = unsafePerformIO newChan {-# NOINLINE testCh #-} test1 = do x <- readChan testCh if x == 0 then Failed else do writeChan testCh (100 `div` x) return x test2 = do writeChan testCh 0 `OrElse` writeChan testCh 5 readChan testCh test3 = return 100 step m = do x <- m; stepSynchronize x gang = [test3, test1, test2, test1, test1, test2]