summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMattGlazar <>2012-11-09 04:20:27 (GMT)
committerhdiff <hdiff@luite.com>2012-11-09 04:20:27 (GMT)
commitba6dc79db8344a9d0568a67a105f48407861040d (patch)
tree66faebcd38a490ac9237ae32f209ea9c104af080
version 0.3.1.00.3.1.0
-rw-r--r--Control/Concurrent/STM/TEvent.hs54
-rw-r--r--Control/Concurrent/STM/TEventVar.hs35
-rw-r--r--Data/Any.hs29
-rw-r--r--LICENSE25
-rw-r--r--Reactive/Thread.hs5
-rw-r--r--Reactive/Thread/Internal/DumbSTM.hs144
-rw-r--r--Reactive/Thread/Internal/Thread.hs78
-rw-r--r--Reactive/Thread/Internal/VarSource.hs18
-rw-r--r--Reactive/Thread/Update.hs76
-rw-r--r--Setup.hs2
-rw-r--r--reactive-thread.cabal53
11 files changed, 519 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/TEvent.hs b/Control/Concurrent/STM/TEvent.hs
new file mode 100644
index 0000000..48d26c4
--- /dev/null
+++ b/Control/Concurrent/STM/TEvent.hs
@@ -0,0 +1,54 @@
+{-# LANGUAGE CPP #-}
+
+module Control.Concurrent.STM.TEvent
+ ( TEvent
+ , newTEvent
+ , newFiredTEvent
+ , fireTEvent
+ , blockTEvent
+
+ , swapTVar
+ ) where
+
+import Control.Applicative
+import Control.Concurrent.STM
+
+-- | A signal which, when fired, notifies all observers and
+-- remains fired for new observers.
+newtype TEvent = TEvent (TVar (Maybe [TMVar ()]))
+ deriving (Eq)
+-- If 'Nothing', the event has fired.
+--
+-- If 'Just', contained is a list of handlers to be notified
+-- when the event fires.
+
+fromMaybe_ :: (Applicative m) => (a -> m ()) -> Maybe a -> m ()
+fromMaybe_ = maybe (pure ())
+
+-- | Creates a new, unfired 'TEvent'.
+newTEvent :: STM (TEvent)
+newTEvent = TEvent <$> newTVar (Just [])
+
+-- | Creates a new, already-fired 'TEvent'.
+newFiredTEvent :: STM (TEvent)
+newFiredTEvent = TEvent <$> newTVar Nothing
+
+#if !MIN_VERSION_stm(2, 3, 0)
+swapTVar :: TVar a -> a -> STM a
+swapTVar var x = readTVar var <* writeTVar var x
+#endif
+
+-- | Wakes up all listeners blocking (via 'blockTEvent') on
+-- the event.
+fireTEvent :: TEvent -> STM ()
+fireTEvent (TEvent event)
+ = swapTVar event Nothing >>= fromMaybe_
+ (mapM_ $ \ var -> putTMVar var ())
+
+-- | Waits for an event to be fired.
+blockTEvent :: TEvent -> STM ()
+blockTEvent (TEvent event)
+ = readTVar event >>= fromMaybe_ (\ listeners -> do
+ myListener <- newEmptyTMVar
+ writeTVar event $ Just (myListener : listeners)
+ takeTMVar myListener)
diff --git a/Control/Concurrent/STM/TEventVar.hs b/Control/Concurrent/STM/TEventVar.hs
new file mode 100644
index 0000000..16a93e3
--- /dev/null
+++ b/Control/Concurrent/STM/TEventVar.hs
@@ -0,0 +1,35 @@
+module Control.Concurrent.STM.TEventVar
+ ( TEventVar
+ , newTEventVar
+ , readTEventVar
+ , writeTEventVar
+ ) where
+
+import Control.Applicative
+import Control.Concurrent.STM
+
+import Control.Concurrent.STM.TEvent
+
+-- | A var which fires an event when modified.
+newtype TEventVar a = TEventVar (TVar (a, TEvent))
+ deriving (Eq)
+
+-- | Creates a new 'TEventVar' with an initial value.
+newTEventVar :: a -> STM (TEventVar a)
+newTEventVar x = do
+ event <- newTEvent
+ TEventVar <$> newTVar (x, event)
+
+-- | Reads the value of a 'TEventVar', yielding an event
+-- which fires when the 'TEventVar' is modified.
+readTEventVar :: TEventVar a -> STM (a, TEvent)
+readTEventVar (TEventVar var) = readTVar var
+
+-- | Writes the value of a 'TEventVar', notifying readers
+-- that the var has been modified.
+writeTEventVar :: TEventVar a -> a -> STM TEvent
+writeTEventVar (TEventVar var) x = do
+ event <- newTEvent
+ (_old, oldEvent) <- swapTVar var (x, event)
+ fireTEvent oldEvent
+ return event
diff --git a/Data/Any.hs b/Data/Any.hs
new file mode 100644
index 0000000..40e862e
--- /dev/null
+++ b/Data/Any.hs
@@ -0,0 +1,29 @@
+{-# LANGUAGE GADTs #-}
+
+module Data.Any
+ ( Any0(..)
+ , any0
+
+ , Any1(..)
+ , any1
+ ) where
+
+import Unsafe.Coerce
+
+-- | Contains any value.
+data Any0 where Any0 :: a -> Any0
+
+-- | Unwraps any value.
+--
+-- > 'any0' . 'Any0' = 'id'
+any0 :: Any0 -> a
+any0 (Any0 x) = unsafeCoerce x
+
+-- | Contains any value with kind @* -> *@.
+data Any1 k1 where Any1 :: k1 a -> Any1 k1
+
+-- | Unwraps any value with kind @* -> *@.
+--
+-- > 'any1' . 'Any1' = 'id'
+any1 :: Any1 k1 -> k1 a
+any1 (Any1 x) = unsafeCoerce x
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..920073b
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2012, Matt "strager" Glazar
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+ * Neither the name of the Matt Glazar nor the names of its contributors may be
+ used to endorse or promote products derived from this software without
+ specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL MATT GLAZAR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Reactive/Thread.hs b/Reactive/Thread.hs
new file mode 100644
index 0000000..470c9b9
--- /dev/null
+++ b/Reactive/Thread.hs
@@ -0,0 +1,5 @@
+module Reactive.Thread
+ ( module Reactive.Thread.Update
+ ) where
+
+import Reactive.Thread.Update
diff --git a/Reactive/Thread/Internal/DumbSTM.hs b/Reactive/Thread/Internal/DumbSTM.hs
new file mode 100644
index 0000000..db1b293
--- /dev/null
+++ b/Reactive/Thread/Internal/DumbSTM.hs
@@ -0,0 +1,144 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE Rank2Types #-}
+
+module Reactive.Thread.Internal.DumbSTM
+ ( DumbSTMVar
+ , DumbSTM
+ , runDumbSTM
+
+ , newDumbSTMVar
+ , readDumbSTMVar
+ , writeDumbSTMVar
+
+ , blockRead
+ ) where
+
+import Control.Applicative
+import Control.Concurrent.STM
+import Control.Monad
+import Control.Monad.Parallel
+import Control.Monad.Trans.State
+import Control.Monad.IO.Class
+import Data.List
+
+import Control.Concurrent.STM.TEvent
+import Control.Concurrent.STM.TEventVar
+import Data.Any
+import Reactive.Thread.Internal.VarSource
+
+newtype DumbSTMVar a = DumbSTMVar
+ { unDumbSTMVar :: TEventVar a }
+ deriving (Eq)
+
+newtype VarCache = VarCache [CacheEntry]
+
+data CacheEntry = CacheEntry
+ { cacheToken :: !(Any1 DumbSTMVar)
+ , cacheUpdateEvent :: !TEvent
+ , cacheValue :: Any0
+ }
+
+entryIsVar :: forall a. DumbSTMVar a -> CacheEntry -> Bool
+entryIsVar var entry = var == any1 (cacheToken entry)
+
+emptyCache :: VarCache
+emptyCache = VarCache []
+
+addCache
+ :: (Monad m)
+ => DumbSTMVar a
+ -> a
+ -> TEvent
+ -> StateT VarCache m ()
+addCache var x event
+ = modify $ \ (VarCache entries)
+ -> VarCache $ cacheEntry : entries
+ where
+ cacheEntry = CacheEntry
+ { cacheToken = Any1 var
+ , cacheUpdateEvent = event
+ , cacheValue = Any0 x
+ }
+
+readCacheEntry
+ :: (Monad m)
+ => DumbSTMVar a
+ -> StateT VarCache m (Maybe CacheEntry)
+readCacheEntry var = do
+ VarCache vars <- get
+ return $ find (entryIsVar var) vars
+
+readCacheValue
+ :: (Monad m)
+ => DumbSTMVar a
+ -> StateT VarCache m (Maybe a)
+readCacheValue
+ = liftM (fmap (any0 . cacheValue))
+ . readCacheEntry
+
+cachedEvents
+ :: (Monad m)
+ => StateT VarCache m [TEvent]
+cachedEvents = do
+ VarCache vars <- get
+ return $ map cacheUpdateEvent vars
+
+newtype DumbSTM a = DumbSTM (StateT VarCache IO a)
+ deriving
+ ( Monad
+ , Functor
+ , Applicative
+ , MonadIO
+ )
+
+instance MonadParallel DumbSTM where
+ bindM2 = error "bindM2: TODO"
+
+instance MonadFork DumbSTM where
+ forkExec m = do
+ joiner <- liftIO $ forkExec (runDumbSTM m)
+ return $ liftIO joiner
+
+runDumbSTM :: DumbSTM a -> IO a
+runDumbSTM (DumbSTM m) = evalStateT m emptyCache
+
+newDumbSTMVar :: a -> DumbSTM (DumbSTMVar a)
+newDumbSTMVar
+ = DumbSTM . liftIO . atomically
+ . liftM DumbSTMVar . newTEventVar
+
+readDumbSTMVar :: DumbSTMVar a -> DumbSTM a
+readDumbSTMVar var = DumbSTM $ do
+ mX <- readCacheValue var
+ case mX of
+ Just x -> return x
+ Nothing -> do
+ (x, event) <- liftIO . atomically
+ . readTEventVar $ unDumbSTMVar var
+ addCache var x event
+ return x
+
+writeDumbSTMVar :: DumbSTMVar a -> a -> DumbSTM ()
+writeDumbSTMVar var x = DumbSTM
+ . void . liftIO . atomically
+ $ writeTEventVar (unDumbSTMVar var) x
+
+foldl1Default
+ :: a -> (a -> a -> a) -> [a] -> a
+foldl1Default _ f (x:xs) = foldl f x xs
+foldl1Default d _ [] = d
+
+blockRead :: DumbSTM ()
+blockRead = DumbSTM $ do
+ events <- cachedEvents
+ put emptyCache
+ liftIO . atomically
+ . foldl1Default (return ()) orElse
+ $ map blockTEvent events
+
+instance NewVar DumbSTMVar DumbSTM where
+ newVar = newDumbSTMVar
+
+instance WriteVar DumbSTMVar DumbSTM where
+ writeVar = writeDumbSTMVar
diff --git a/Reactive/Thread/Internal/Thread.hs b/Reactive/Thread/Internal/Thread.hs
new file mode 100644
index 0000000..693baa0
--- /dev/null
+++ b/Reactive/Thread/Internal/Thread.hs
@@ -0,0 +1,78 @@
+{-# LANGUAGE GADTs #-}
+
+module Reactive.Thread.Internal.Thread
+ ( Thread
+ , fork
+ , yield
+ , runThread
+ ) where
+
+import Control.Monad
+import Control.Monad.IO.Class
+import Control.Monad.Parallel
+import Control.Monad.Trans.Class
+
+import Reactive.Thread.Internal.VarSource
+
+-- | A thread of execution.
+data Thread v o m a where
+ Pure :: a -> Thread v o m a
+ Lift :: m (Thread v o m a) -> Thread v o m a
+ Yield :: o -> Thread v o m a -> Thread v o m a
+
+ Fork
+ :: Thread v b m () -- ^ Thread computation.
+ -> v b -- ^ Output variable.
+ -> Thread v o m a -- ^ Rest.
+ -> Thread v o m a
+
+instance (Monad m) => Monad (Thread v o m) where
+ return = Pure
+
+ Pure x >>= f = f x
+ Lift m >>= f = Lift (m >>= \ t -> return (t >>= f))
+ Yield x m >>= f = Yield x (m >>= f)
+ Fork thread var m >>= f = Fork thread var (m >>= f)
+
+instance (Functor m) => Functor (Thread v o m) where
+ fmap f (Pure x) = Pure $ f x
+ fmap f (Lift m) = Lift $ fmap (fmap f) m
+ fmap f (Yield x m) = Yield x $ fmap f m
+ fmap f (Fork thread var m) = Fork thread var $ fmap f m
+
+instance MonadTrans (Thread v o) where
+ lift m = Lift (liftM Pure m)
+
+instance (MonadIO m) => MonadIO (Thread v o m) where
+ liftIO = lift . liftIO
+
+-- | Creates a new thread which executes in parallel,
+-- returning the forked thread's output variable.
+fork
+ :: (Functor m, Monad m, NewVar v m)
+ => b -- ^ Initial value.
+ -> Thread v b m a -- ^ Computation to fork.
+ -> Thread v o m (v b) -- ^ Var.
+fork z m = do
+ var <- lift $ newVar z
+ Fork (void m) var (Pure var)
+
+-- | Yields a value to this thread's output variable.
+yield :: (Monad m) => o -> Thread v o m ()
+yield x = Yield x (return ())
+
+-- | Executes a thread in the parent monad.
+runThread
+ :: ( MonadFork m
+ , WriteVar v m
+ )
+ => v o -- ^ Output variable.
+ -> Thread v o m a
+ -> m a
+runThread output = go
+ where
+ go (Pure x) = return x
+ go (Lift m) = m >>= go
+ go (Yield x m) = writeVar output x >> go m
+ go (Fork thread var m)
+ = forkExec (runThread var thread) >> go m
diff --git a/Reactive/Thread/Internal/VarSource.hs b/Reactive/Thread/Internal/VarSource.hs
new file mode 100644
index 0000000..5df933c
--- /dev/null
+++ b/Reactive/Thread/Internal/VarSource.hs
@@ -0,0 +1,18 @@
+{-# LANGUAGE MultiParamTypeClasses #-}
+
+module Reactive.Thread.Internal.VarSource
+ ( NewVar(..)
+ , WriteVar(..)
+ ) where
+
+import Data.IORef
+
+class (Monad m) => NewVar v m where
+ newVar :: a -> m (v a)
+class (Monad m) => WriteVar v m where
+ writeVar :: v a -> a -> m ()
+
+instance NewVar IORef IO where
+ newVar = newIORef
+instance WriteVar IORef IO where
+ writeVar = writeIORef
diff --git a/Reactive/Thread/Update.hs b/Reactive/Thread/Update.hs
new file mode 100644
index 0000000..e8c2309
--- /dev/null
+++ b/Reactive/Thread/Update.hs
@@ -0,0 +1,76 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+
+module Reactive.Thread.Update
+ ( Update
+ , runUpdate
+
+ , UpdateVar
+ , parallel
+ , query
+ , yield
+
+ , forever
+ , foreverT
+ ) where
+
+-- import Control.Applicative
+import Control.Monad hiding (forever)
+import Control.Monad.IO.Class
+import Control.Monad.Trans.Class
+
+import qualified Control.Monad
+
+import Reactive.Thread.Internal.DumbSTM
+
+import qualified Reactive.Thread.Internal.Thread as Thread
+
+newtype Update o a = Update
+ { unUpdate :: Thread.Thread DumbSTMVar o DumbSTM a }
+ deriving
+ ( Monad
+ , Functor
+ -- , Applicative
+ , MonadIO
+ )
+
+-- | A read-only handle to a thread's output variable.
+newtype UpdateVar a = UpdateVar
+ { unUpdateVar :: DumbSTMVar a }
+
+-- | Executes an 'Update' in parallel, returning a var
+-- representing the thread's output variable.
+parallel
+ :: b
+ -> Update b a
+ -> Update o (UpdateVar b)
+parallel z m = Update . liftM UpdateVar
+ $ Thread.fork z (unUpdate m)
+
+-- | Mutates this thread's output variable.
+yield :: o -> Update o ()
+yield = Update . Thread.yield
+
+-- | Reads a thread's output variable.
+--
+-- Reads are cached and tracked; see 'forever'.
+query :: UpdateVar a -> Update o a
+query = Update . lift . readDumbSTMVar . unUpdateVar
+
+-- | Runs an 'Update' in a loop forever. Between each loop,
+-- execution is blocked until a 'query'ied variable is
+-- modified (since it was last read).
+forever :: Update o a -> Update o b
+forever m = Control.Monad.forever
+ $ Update (lift blockRead) >> m
+
+foreverT
+ :: (Monad (m (Update o)), MonadTrans m)
+ => m (Update o) a -> m (Update o) b
+foreverT m = Control.Monad.forever
+ $ lift (Update (lift blockRead)) >> m
+
+-- | Runs an 'Update' in the 'IO' monad.
+runUpdate :: Update () a -> IO a
+runUpdate m = runDumbSTM $ do
+ var <- newDumbSTMVar ()
+ Thread.runThread var (unUpdate m)
diff --git a/Setup.hs b/Setup.hs
new file mode 100644
index 0000000..9a994af
--- /dev/null
+++ b/Setup.hs
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
diff --git a/reactive-thread.cabal b/reactive-thread.cabal
new file mode 100644
index 0000000..4d991f0
--- /dev/null
+++ b/reactive-thread.cabal
@@ -0,0 +1,53 @@
+-- Update checklist:
+--
+-- 1. Update the version number.
+-- 2. Update the version number again, in source-repository.
+-- 3. Commit with the version number.
+-- 4. Tag with the version number.
+-- 5. Push branches and tags.
+
+name: reactive-thread
+version: 0.3.1.0
+
+author: Matt "strager" Glazar
+build-type: Simple
+cabal-version: >=1.8
+category: FRP
+homepage: https://github.com/strager/reactive-thread
+license-file: LICENSE
+license: BSD3
+maintainer: strager.nds@gmail.com
+synopsis: Reactive programming via imperative threads
+
+description:
+ Reactive programming with threads.
+
+source-repository this
+ type: git
+ location: https://github.com/strager/reactive-thread.git
+ tag: 0.3.1.0
+
+library
+ exposed-modules:
+ Reactive.Thread,
+ Reactive.Thread.Update
+
+ other-modules:
+ Data.Any,
+
+ Control.Concurrent.STM.TEvent,
+ Control.Concurrent.STM.TEventVar,
+
+ Reactive.Thread.Internal.DumbSTM,
+ Reactive.Thread.Internal.Thread,
+ Reactive.Thread.Internal.VarSource
+
+ build-depends:
+ base == 4.5.*,
+ monad-parallel,
+ SDL,
+ stm,
+ transformers
+
+ ghc-options:
+ -Wall