summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorConalElliott <>2008-12-17 00:07:44 (GMT)
committerLuite Stegeman <luite@luite.com>2008-12-17 00:07:44 (GMT)
commit8449c2fa0fc00450bc882f30486a64c9f3a41945 (patch)
treeefef5446d2b180a0555da03a3cca676c95890777 /src
parenta1ee974fbd1d4a928cb3f4410166bce7bf5c9f64 (diff)
version 0.10.50.10.5
Diffstat (limited to 'src')
-rw-r--r--src/FRP/Reactive/Internal/Chan.hs149
-rwxr-xr-xsrc/FRP/Reactive/PrimReactive.hs2
-rw-r--r--src/Test/Snap.hs28
3 files changed, 178 insertions, 1 deletions
diff --git a/src/FRP/Reactive/Internal/Chan.hs b/src/FRP/Reactive/Internal/Chan.hs
new file mode 100644
index 0000000..46728b6
--- /dev/null
+++ b/src/FRP/Reactive/Internal/Chan.hs
@@ -0,0 +1,149 @@
+{-# LANGUAGE CPP #-}
+{-# OPTIONS_GHC -Wall #-}
+-----------------------------------------------------------------------------
+-- |
+-- Module : FRP.Reactive.Internal.Chan
+-- Copyright : (c) The University of Glasgow 2001
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable (concurrency)
+--
+-- Unbounded channels.
+--
+-----------------------------------------------------------------------------
+
+module FRP.Reactive.Internal.Chan
+ (
+ -- * The 'Chan' type
+ Chan, -- abstract
+
+ -- * Operations
+ newChan, -- :: IO (Chan a)
+ writeChan, -- :: Chan a -> a -> IO ()
+ readChan, -- :: Chan a -> IO a
+ dupChan, -- :: Chan a -> IO (Chan a)
+ unGetChan, -- :: Chan a -> a -> IO ()
+ isEmptyChan, -- :: Chan a -> IO Bool
+
+ -- * Stream interface
+ getChanContents, -- :: Chan a -> IO [a]
+ writeList2Chan, -- :: Chan a -> [a] -> IO ()
+ -- * New stuff
+ weakChanWriter
+ ) where
+
+import Prelude
+
+import System.IO.Unsafe ( unsafeInterleaveIO )
+import Control.Concurrent.MVar
+import Data.Typeable
+
+
+import System.Mem.Weak (mkWeak,deRefWeak)
+
+
+#include "Typeable.h"
+
+-- A channel is represented by two @MVar@s keeping track of the two ends
+-- of the channel contents,i.e., the read- and write ends. Empty @MVar@s
+-- are used to handle consumers trying to read from an empty channel.
+
+-- |'Chan' is an abstract type representing an unbounded FIFO channel.
+data Chan a
+ = Chan (MVar (Stream a))
+ (MVar (Stream a))
+
+INSTANCE_TYPEABLE1(Chan,chanTc,"Chan")
+
+type Stream a = MVar (ChItem a)
+
+data ChItem a = ChItem a (Stream a)
+
+-- See the Concurrent Haskell paper for a diagram explaining the
+-- how the different channel operations proceed.
+
+-- @newChan@ sets up the read and write end of a channel by initialising
+-- these two @MVar@s with an empty @MVar@.
+
+-- |Build and returns a new instance of 'Chan'.
+newChan :: IO (Chan a)
+newChan = do
+ hole <- newEmptyMVar
+ readVar <- newMVar hole
+ writeVar <- newMVar hole
+ return (Chan readVar writeVar)
+
+-- To put an element on a channel, a new hole at the write end is created.
+-- What was previously the empty @MVar@ at the back of the channel is then
+-- filled in with a new stream element holding the entered value and the
+-- new hole.
+
+-- |Write a value to a 'Chan'.
+writeChan :: Chan a -> a -> IO ()
+writeChan (Chan _ writeVar) val = do
+ new_hole <- newEmptyMVar
+ modifyMVar_ writeVar $ \old_hole -> do
+ putMVar old_hole (ChItem val new_hole)
+ return new_hole
+
+-- |Read the next value from the 'Chan'.
+readChan :: Chan a -> IO a
+readChan (Chan readVar _) = do
+ modifyMVar readVar $ \read_end -> do
+ (ChItem val new_read_end) <- readMVar read_end
+ -- Use readMVar here, not takeMVar,
+ -- else dupChan doesn't work
+ return (new_read_end, val)
+
+-- |Duplicate a 'Chan': the duplicate channel begins empty, but data written to
+-- either channel from then on will be available from both. Hence this creates
+-- a kind of broadcast channel, where data written by anyone is seen by
+-- everyone else.
+dupChan :: Chan a -> IO (Chan a)
+dupChan (Chan _ writeVar) = do
+ hole <- readMVar writeVar
+ newReadVar <- newMVar hole
+ return (Chan newReadVar writeVar)
+
+-- |Put a data item back onto a channel, where it will be the next item read.
+unGetChan :: Chan a -> a -> IO ()
+unGetChan (Chan readVar _) val = do
+ new_read_end <- newEmptyMVar
+ modifyMVar_ readVar $ \read_end -> do
+ putMVar new_read_end (ChItem val read_end)
+ return new_read_end
+
+-- |Returns 'True' if the supplied 'Chan' is empty.
+isEmptyChan :: Chan a -> IO Bool
+isEmptyChan (Chan readVar writeVar) = do
+ withMVar readVar $ \r -> do
+ w <- readMVar writeVar
+ let eq = r == w
+ eq `seq` return eq
+
+-- Operators for interfacing with functional streams.
+
+-- |Return a lazy list representing the contents of the supplied
+-- 'Chan', much like 'System.IO.hGetContents'.
+getChanContents :: Chan a -> IO [a]
+getChanContents ch
+ = unsafeInterleaveIO (do
+ x <- readChan ch
+ xs <- getChanContents ch
+ return (x:xs)
+ )
+
+-- |Write an entire list of items to a 'Chan'.
+writeList2Chan :: Chan a -> [a] -> IO ()
+writeList2Chan ch ls = sequence_ (map (writeChan ch) ls)
+
+
+---- New bit:
+
+-- | A weak channel writer. Sustained by the read head. Thus channel
+-- consumers keep channel producers alive.
+weakChanWriter :: Chan a -> IO (IO (Maybe (a -> IO ())))
+weakChanWriter ch@(Chan readVar _) =
+ fmap deRefWeak (mkWeak readVar (writeChan ch) Nothing)
diff --git a/src/FRP/Reactive/PrimReactive.hs b/src/FRP/Reactive/PrimReactive.hs
index 1f0ae15..7675515 100755
--- a/src/FRP/Reactive/PrimReactive.hs
+++ b/src/FRP/Reactive/PrimReactive.hs
@@ -44,7 +44,7 @@ module FRP.Reactive.PrimReactive
-- * Operations on events and reactive values
, stepper, switcher, withTimeGE, withTimeGR
, futuresE, futureStreamE, listEG, atTimesG, atTimeG
- , snapshotWith, accumE, accumR, once
+ , snap, snapshotWith, accumE, accumR, once
, withRestE, untilE
, justE, filterE
-- , traceE, traceR
diff --git a/src/Test/Snap.hs b/src/Test/Snap.hs
new file mode 100644
index 0000000..189c95f
--- /dev/null
+++ b/src/Test/Snap.hs
@@ -0,0 +1,28 @@
+-- From Beelsebob's: http://hpaste.org/13096
+
+-- *FRP.Reactive.Behavior FRP.Reactive.Reactive FRP.Reactive.Improving FRP.Reactive.Fun FRP.Reactive.Internal.Fun> paddlePosR
+-- 0.0 `Stepper` (1.0,5.0e-2)->(2.0,0.0)->(3.0,5.0e-2)->(*** Exception: Prelude.undefined
+-- *FRP.Reactive.Behavior FRP.Reactive.Reactive FRP.Reactive.Improving FRP.Reactive.Fun FRP.Reactive.Internal.Fun> paddlePosR `FRP.Reactive.Reactive.snapshot_` (listEG [(exactly (2.5 :: TimeT), ()),(exactly 3.5, ())])
+-- (2.5,0.0)->(3.5,0.0)
+
+-- I was unable to reproduce the error:
+
+import FRP.Reactive.Improving
+import FRP.Reactive.PrimReactive
+import FRP.Reactive.Reactive
+
+r :: Reactive Int
+r = 0 `stepper` listEG [(exactly 1,1),(exactly 2,2),(exactly 3,3),(after 4,17)]
+
+e :: Event ()
+e = listEG [(exactly 2.5, ()),(exactly 3.5, ())]
+
+e1 :: Event Int
+e1 = r `snapshot_` e
+
+-- (Imp 2.5,2)->(Imp 3.5,3)
+
+e2 :: EventG ITime (Maybe (), Int)
+e2 = r `snap` e
+
+-- (Imp 1.0,(Nothing,1))->(Imp 2.0,(Nothing,2))->(Imp 2.5,(Just (),2))->(Imp 3.0,(Nothing,3))->(Imp 3.5,(Just (),3))->(Imp *** Exception: Prelude.undefined