summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuhaPaananen <>2012-03-01 18:58:33 (GMT)
committerhdiff <hdiff@luite.com>2012-03-01 18:58:33 (GMT)
commitdc4c8c8c24a82c76e73277801079e281f33f8fb9 (patch)
tree815427185588eb6ca94edc8ad89cf93e9deabc8b
version 0.10.1
-rw-r--r--LICENSE30
-rw-r--r--Setup.hs2
-rw-r--r--reactive-bacon.cabal49
-rw-r--r--src/Reactive/Bacon.hs18
-rw-r--r--src/Reactive/Bacon/Core.hs72
-rw-r--r--src/Reactive/Bacon/EventStream.hs57
-rw-r--r--src/Reactive/Bacon/EventStream/Combinators.hs69
-rw-r--r--src/Reactive/Bacon/EventStream/IO.hs29
-rw-r--r--src/Reactive/Bacon/EventStream/Monadic.hs75
-rw-r--r--src/Reactive/Bacon/EventStream/Timed.hs53
-rw-r--r--src/Reactive/Bacon/Property.hs141
-rw-r--r--src/Reactive/Bacon/PushStream.hs75
-rw-r--r--test/Specs.hs10
13 files changed, 680 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..34a698b
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright (c)2012, Juha Paananen
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * Neither the name of Juha Paananen nor the names of other
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Setup.hs b/Setup.hs
new file mode 100644
index 0000000..9a994af
--- /dev/null
+++ b/Setup.hs
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
diff --git a/reactive-bacon.cabal b/reactive-bacon.cabal
new file mode 100644
index 0000000..174a04e
--- /dev/null
+++ b/reactive-bacon.cabal
@@ -0,0 +1,49 @@
+Name: reactive-bacon
+Version: 0.1
+Stability: experimental
+Synopsis: FRP (functional reactive programming) framework
+Description: FRP (functional reactive programming) framework inspired by RX, reactive-banana and Iteratee
+License: BSD3
+License-file: LICENSE
+Author: Juha Paananen
+Maintainer: juha.paananen@gmail.com
+Homepage: http://github.com/raimohanska/reactive-bacon
+Bug-reports: http://github.com/raimohanska/reactive-bacon/issues
+Category: Reactive
+Build-type: Simple
+Cabal-version: >=1.10
+
+
+Library
+ hs-source-dirs: src
+ Exposed-modules: Reactive.Bacon,
+ Reactive.Bacon.Core,
+ Reactive.Bacon.EventStream,
+ Reactive.Bacon.EventStream.Combinators,
+ Reactive.Bacon.EventStream.IO,
+ Reactive.Bacon.EventStream.Monadic,
+ Reactive.Bacon.EventStream.Timed,
+ Reactive.Bacon.PushStream,
+ Reactive.Bacon.Property
+ Build-depends:
+ base >= 4 && < 5,
+ stm >= 2.2.0.1,
+ old-time >= 1.0.0.6
+ default-language: Haskell2010
+
+Test-Suite tests
+ hs-source-dirs: src, test
+ Type: exitcode-stdio-1.0
+ Main-is: Specs.hs
+ Build-depends:
+ base >= 4 && < 5,
+ stm >= 2.2.0.1,
+ old-time >= 1.0.0.6,
+ HUnit >= 1.2.2.3,
+ containers >= 0.4.0.0
+ ghc-options: -threaded
+ default-language: Haskell2010
+
+source-repository head
+ type: Git
+ location: https://github.com/raimohanska/reactive-bacon
diff --git a/src/Reactive/Bacon.hs b/src/Reactive/Bacon.hs
new file mode 100644
index 0000000..26c508f
--- /dev/null
+++ b/src/Reactive/Bacon.hs
@@ -0,0 +1,18 @@
+module Reactive.Bacon (
+ module Reactive.Bacon.Core,
+ module Reactive.Bacon.EventStream.Combinators,
+ module Reactive.Bacon.EventStream.IO,
+ module Reactive.Bacon.EventStream.Monadic,
+ module Reactive.Bacon.EventStream,
+ module Reactive.Bacon.Property,
+ module Reactive.Bacon.PushStream,
+ module Reactive.Bacon.EventStream.Timed
+ ) where
+ import Reactive.Bacon.Core
+ import Reactive.Bacon.EventStream.Combinators
+ import Reactive.Bacon.EventStream.IO
+ import Reactive.Bacon.EventStream.Monadic
+ import Reactive.Bacon.EventStream
+ import Reactive.Bacon.Property
+ import Reactive.Bacon.PushStream
+ import Reactive.Bacon.EventStream.Timed
diff --git a/src/Reactive/Bacon/Core.hs b/src/Reactive/Bacon/Core.hs
new file mode 100644
index 0000000..6da3c24
--- /dev/null
+++ b/src/Reactive/Bacon/Core.hs
@@ -0,0 +1,72 @@
+module Reactive.Bacon.Core where
+
+import Control.Monad
+import Prelude hiding (map, filter)
+
+class Observable s where
+ (==>) :: s a -> (a -> IO()) -> IO ()
+ (>>=!) :: IO (s a) -> (a -> IO()) -> IO ()
+ (>>=!) action f = action >>= \observable -> (observable ==> f)
+ infixl 1 >>=!
+
+data EventStream a = EventStream { subscribe :: (EventSink a -> IO Disposable) }
+type EventSink a = (Event a -> IO (HandleResult))
+data Event a = Next a | End
+
+data HandleResult = More | NoMore
+type Disposable = IO ()
+
+class EventSource s where
+ toEventStream :: s a -> EventStream a
+
+instance EventSource EventStream where
+ toEventStream = id
+
+instance Observable EventStream where
+ (==>) src f = void $ subscribe (toEventStream src) $ toObserver f
+
+instance Functor Event where
+ fmap f (Next a) = Next (f a)
+ fmap _ End = End
+
+instance Show a => Show (Event a) where
+ show (Next x) = show x
+ show End = "<END>"
+
+instance Eq a => Eq (Event a) where
+ (==) End End = True
+ (==) (Next x) (Next y) = (x==y)
+ (==) _ _ = False
+
+obs :: EventSource s => s a -> EventStream a
+obs = toEventStream
+
+neverE :: EventStream a
+neverE = EventStream $ \_ -> return $ (return ())
+
+toObserver :: (a -> IO()) -> EventSink a
+toObserver next = sink
+ where sink (Next x) = next x >> return More
+ sink End = return NoMore
+
+toEventObserver :: (Event a -> IO()) -> EventSink a
+toEventObserver next = sink
+ where sink event = next event >> return More
+
+-- | Reactive property. Differences from EventStream:
+-- - addListener function must always deliver the latest known value to the new listener
+--
+-- So a Property is roughly an EventStream that stores its latest value so
+-- that it is always available for new listeners. Doesn't mean it has to be
+-- up to date if it has been without listeners for a while.
+data Property a = Property { addPropertyListener :: PropertySink a -> IO Disposable }
+
+class PropertySource s where
+ toProperty :: s a -> Property a
+
+instance PropertySource Property where
+ toProperty = id
+
+data PropertyEvent a = Initial a | Update a | EndUpdate
+
+type PropertySink a = PropertyEvent a -> IO HandleResult
diff --git a/src/Reactive/Bacon/EventStream.hs b/src/Reactive/Bacon/EventStream.hs
new file mode 100644
index 0000000..2a25dc9
--- /dev/null
+++ b/src/Reactive/Bacon/EventStream.hs
@@ -0,0 +1,57 @@
+module Reactive.Bacon.EventStream where
+
+import Reactive.Bacon.Core
+import Reactive.Bacon.PushStream
+import Control.Monad
+import Data.Maybe
+import Data.IORef
+import Control.Concurrent.STM
+
+instance Functor EventStream where
+ fmap f = sinkMap mappedSink
+ where mappedSink sink event = sink (fmap f event)
+
+mapE :: EventSource s => (a -> b) -> s a -> IO (EventStream b)
+mapE f = return . (fmap f) . obs
+
+scanE :: EventSource s => (b -> a -> b) -> b -> s a -> IO (EventStream b)
+scanE f seed src = do acc <- newTVarIO seed
+ wrap $ sinkMap (scanSink acc) src
+ where scanSink acc sink End = sink End >> return NoMore
+ scanSink acc sink (Next x) = do y <- update acc x
+ sink (Next y)
+ update acc x = atomically $ do accVal <- readTVar acc
+ let next = f accVal x
+ writeTVar acc next
+ return next
+
+filterE :: EventSource s => (a -> Bool) -> s a -> IO (EventStream a)
+filterE f = return . sinkMap filteredSink
+ where filteredSink sink End = sink End
+ filteredSink sink (Next x) | f x = sink (Next x)
+ | otherwise = return More
+
+takeWhileE :: EventSource s => (a -> Bool) -> s a -> IO (EventStream a)
+takeWhileE f src = do stopFlag <- newIORef False
+ wrap $ sinkMap (guardedSink stopFlag) src
+ where guardedSink stopFlag sink x = do stop <- readIORef stopFlag
+ if stop
+ then return NoMore
+ else limitedSink stopFlag sink x
+ limitedSink stopFlag sink End = sink End >> return NoMore
+ limitedSink stopFlag sink (Next x) | f x = sink (Next x)
+ | otherwise = writeIORef stopFlag True >> sink End >> return NoMore
+
+takeE :: EventSource s => Num n => Ord n => n -> s a -> IO (EventStream a)
+takeE n src = scanE numbered (Nothing, 0) src >>= takeWhileE atMostN >>= mapE (fromJust . fst)
+ where numbered (_, i) x = (Just x, i + 1)
+ atMostN (_, i) | i <= n = True
+ | otherwise = False
+
+sinkMap :: EventSource s => (EventSink b -> EventSink a) -> s a -> EventStream b
+sinkMap sinkMapper src = EventStream $ subscribe'
+ where subscribe' sink = subscribe (toEventStream src) $ sinkMapper sink
+
+(===>) :: EventSource s => s a -> (Event a -> IO()) -> IO()
+(===>) src f = void $ subscribe (toEventStream src) $ toEventObserver f
+
diff --git a/src/Reactive/Bacon/EventStream/Combinators.hs b/src/Reactive/Bacon/EventStream/Combinators.hs
new file mode 100644
index 0000000..6e383c1
--- /dev/null
+++ b/src/Reactive/Bacon/EventStream/Combinators.hs
@@ -0,0 +1,69 @@
+module Reactive.Bacon.EventStream.Combinators(mergeE, takeUntilE, eitherE, combineLatestE) where
+
+import Reactive.Bacon.Core
+import Reactive.Bacon.EventStream
+import Reactive.Bacon.PushStream
+import Reactive.Bacon.Property
+import Data.IORef
+import Control.Monad
+import Control.Applicative
+
+instance Show a => Show (EventStream a) where
+ show = const "EventStream"
+
+instance Eq a => Eq (EventStream a) where
+ (==) = \x y -> False
+
+mergeE :: EventSource s1 => EventSource s2 => s1 a -> s2 a -> IO (EventStream a)
+mergeE xs ys = eitherE xs ys >>= mapE simplify
+ where simplify (Right x) = x
+ simplify (Left x) = x
+
+takeUntilE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> IO (EventStream b)
+takeUntilE stopper src = wrap $ sinkMap takeUntil' $ mergeRawE src stopper
+ where takeUntil' sink (Next (Left (Next x))) = sink (Next x)
+ takeUntil' sink (Next (Left End)) = sink End >> return NoMore
+ takeUntil' sink (Next (Right (Next x))) = sink End >> return NoMore
+ takeUntil' sink (Next (Right End)) = return More
+
+eitherE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> IO (EventStream (Either a b))
+eitherE left right = do endFlag <- newIORef False
+ return $ sinkMap (skipFirstEnd endFlag) (mergeRawE left right)
+ where skipFirstEnd flag sink event | isEnd event = do done <- readIORef flag
+ writeIORef flag True
+ handleEnd done sink
+ | otherwise = send sink skipFirstEnd event
+ handleEnd True sink = sink End >> return NoMore
+ handleEnd False sink = return More
+ send sink mapper (Next (Right (Next x))) = sink (Next (Right x))
+ send sink mapper (Next (Left (Next x))) = sink (Next (Left x))
+
+combineLatestE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> IO (EventStream (a, b))
+combineLatestE left right = do leftP <- fromEventSource left
+ rightP <- fromEventSource right
+ changesP $ combineP leftP rightP
+
+mergeRawE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> EventStream (Either (Event a) (Event b))
+mergeRawE left right = EventStream $ \sink -> do
+ disposeRightHolder <- newIORef Nothing
+ disposeLeft <- subscribe (toEventStream left) (barrier Left sink (disposeIfPossible disposeRightHolder))
+ disposeRight <- subscribe (toEventStream right) (barrier Right sink disposeLeft)
+ writeIORef disposeRightHolder (Just disposeRight)
+ return $ disposeLeft >> disposeRight
+ where barrier mapping sink disposeOther event = do
+ result <- sink $ Next (mapping event)
+ case result of
+ More -> do
+ return More
+ NoMore -> do
+ disposeOther
+ return NoMore
+ disposeIfPossible ref = do
+ dispose <- readIORef ref
+ case dispose of Nothing -> return () -- TODO: is it necessary to dispose later in this case?
+ Just f -> f
+
+isEnd :: Event (Either (Event a) (Event b)) -> Bool
+isEnd (Next (Right End)) = True
+isEnd (Next (Left End)) = True
+isEnd _ = False
diff --git a/src/Reactive/Bacon/EventStream/IO.hs b/src/Reactive/Bacon/EventStream/IO.hs
new file mode 100644
index 0000000..47cd941
--- /dev/null
+++ b/src/Reactive/Bacon/EventStream/IO.hs
@@ -0,0 +1,29 @@
+module Reactive.Bacon.EventStream.IO where
+
+import Reactive.Bacon.Core
+import Reactive.Bacon.EventStream.Monadic
+import Reactive.Bacon.EventStream
+import Reactive.Bacon.PushStream
+import Data.IORef
+import Control.Concurrent(forkIO)
+import Control.Monad
+
+-- | startProcess is a function whose params are "event sink" and "stop sign"
+fromStoppableProcess :: ((Event a -> IO ()) -> IO Bool -> IO ()) -> IO (EventStream a, IO ())
+fromStoppableProcess startProcess = do
+ (stream, pushEvent) <- newPushStream
+ stopSignal <- newIORef False
+ let getStopState = (readIORef stopSignal)
+ startProcess (guardedPush pushEvent getStopState) getStopState
+ return (stream, (writeIORef stopSignal True))
+ where guardedPush pushEvent getStopState event = do stop <- getStopState
+ unless stop $ pushEvent event
+
+fromNonStoppableProcess :: ((Event a -> IO ()) -> IO ()) -> IO (EventStream a)
+fromNonStoppableProcess startProcess = do
+ (stream, pushEvent) <- newPushStream
+ startProcess (pushEvent)
+ return stream
+
+fromIO :: IO a -> IO (EventStream a)
+fromIO action = fromNonStoppableProcess $ \sink -> void $ forkIO $ action >>= sink . Next >> sink End
diff --git a/src/Reactive/Bacon/EventStream/Monadic.hs b/src/Reactive/Bacon/EventStream/Monadic.hs
new file mode 100644
index 0000000..958d35b
--- /dev/null
+++ b/src/Reactive/Bacon/EventStream/Monadic.hs
@@ -0,0 +1,75 @@
+module Reactive.Bacon.EventStream.Monadic(selectManyE, switchE) where
+
+import Data.IORef
+import Reactive.Bacon.Core
+import Reactive.Bacon.EventStream.Combinators
+import Reactive.Bacon.EventStream
+import Reactive.Bacon.PushStream(wrap)
+import Control.Concurrent.STM
+import Control.Monad
+
+-- EventStream is not a Monad
+-- However, selectManyE and switchE have a signature that's pretty close
+-- to monadic bind. The difference is that IO is allowed in the bind step.
+selectManyE :: EventSource s => (a -> IO (EventStream b)) -> s a -> IO (EventStream b)
+selectManyE binder xs = wrap $ EventStream $ \sink -> do
+ state <- newTVarIO $ State sink Nothing 1 [] [] False
+ dispose <- subscribe (obs xs) $ mainEventSink state
+ atomically $ modifyTVar state $ \state -> state { dispose = Just dispose }
+ return $ disposeAll state
+ where mainEventSink state eventA = do
+ case eventA of
+ End -> do
+ (end, sink) <- withState state $ \s -> do
+ modifyTVar state $ \s -> s { mainEnded = True }
+ return (null (childIds s), currentSink s)
+ when end $ void $ sink End
+ return NoMore
+ Next x -> do
+ id <- withState state $ \s -> do
+ let id = counter s
+ writeTVar state $ s { counter = (counter s + 1), childIds = id : (childIds s) }
+ return id
+ childStream <- binder x
+ childDispose <- subscribe childStream $ childEventSink id state
+ atomically $ modifyTVar state $ \s -> s { childDisposables = (childDispose : childDisposables s) }
+ return More
+ childEventSink id state = \eventB -> do
+ case eventB of
+ End -> do
+ (end, sink) <- withState state $ \s -> do
+ let newState = removeChild s id
+ writeTVar state newState
+ let end = (null (childIds newState) && mainEnded newState)
+ return (end, currentSink newState)
+ when end $ void $ sink End
+ return NoMore
+ Next y -> do
+ sink <- withState state $ return.currentSink
+ result <- sink (Next y)
+ case result of
+ NoMore -> disposeAll state >> return NoMore
+ More -> return More
+ disposeAll state = do
+ (maybeDispose, children) <- withState state $ \s -> return (dispose s, childDisposables s)
+ sequence_ children
+ case maybeDispose of
+ Nothing -> return () -- TODO should dispose later?
+ Just dispose -> dispose
+ removeChild state id = state { childIds = filter (/= id) (childIds state) }
+ withState state action = atomically (readTVar state >>= action)
+
+switchE :: EventSource s => (a -> IO (EventStream b)) -> s a -> IO (EventStream b)
+switchE binder src = selectManyE (binder >=> (takeUntilE src)) src
+
+data State a = State { currentSink :: EventSink a,
+ dispose :: Maybe Disposable,
+ counter :: Int,
+ childIds :: [Int],
+ childDisposables :: [Disposable],
+ mainEnded :: Bool }
+
+modifyTVar :: TVar a -> (a -> a) -> STM ()
+modifyTVar var f = do
+ val <- readTVar var
+ writeTVar var (f val)
diff --git a/src/Reactive/Bacon/EventStream/Timed.hs b/src/Reactive/Bacon/EventStream/Timed.hs
new file mode 100644
index 0000000..b1935db
--- /dev/null
+++ b/src/Reactive/Bacon/EventStream/Timed.hs
@@ -0,0 +1,53 @@
+{-# LANGUAGE TupleSections #-}
+module Reactive.Bacon.EventStream.Timed where
+
+import Reactive.Bacon.Core
+import Reactive.Bacon.EventStream.IO
+import Reactive.Bacon.EventStream.Monadic
+import System.Time
+import Control.Concurrent(threadDelay, forkIO)
+import Control.Monad(void, unless)
+import Data.IORef
+
+laterE :: TimeDiff -> a -> IO (EventStream a)
+laterE diff x = timedE [(diff, x)] >>= return . fst
+
+periodicallyE :: TimeDiff -> a -> IO (EventStream a, Disposable)
+periodicallyE diff x = timedE (repeat (diff, x))
+
+sequentiallyE :: TimeDiff -> [a] -> IO (EventStream a)
+sequentiallyE delay xs = timedE (map (delay,) xs) >>= return . fst
+
+timedE :: [(TimeDiff, a)] -> IO (EventStream a, Disposable)
+timedE events = fromStoppableProcess $ \sink stopper -> void $ forkIO $ serve sink events stopper
+ where serve sink [] _ = sink End
+ serve sink ((diff, event) : events) getStopState = do
+ stop <- getStopState
+ unless stop $ do
+ threadDelay (toMicros diff)
+ sink $ Next event
+ serve sink events getStopState
+
+delayE :: EventSource s => TimeDiff -> s a -> IO (EventStream a)
+delayE diff = selectManyE (laterE diff)
+
+throttleE :: EventSource s => TimeDiff -> s a -> IO (EventStream a)
+throttleE diff = switchE (laterE diff)
+
+toMicros :: TimeDiff -> Int
+toMicros diff = fromInteger((toPicos diff) `div` 1000000)
+ where
+ toPicos :: TimeDiff -> Integer
+ toPicos (TimeDiff 0 0 0 h m s p) = p + (fromHours h) + (fromMinutes m) + (fromSeconds s)
+ where fromSeconds s = 1000000000000 * (toInteger s)
+ fromMinutes m = 60 * (fromSeconds m)
+ fromHours h = 60 * (fromMinutes h)
+
+-- | Milliseconds to TimeDiff
+milliseconds :: Integral a => a -> TimeDiff
+milliseconds ms = noTimeDiff { tdPicosec = (toInteger ms) * 1000000000}
+
+-- | Seconds to TimeDiff
+seconds :: Integral a => a -> TimeDiff
+seconds = milliseconds . (*1000)
+
diff --git a/src/Reactive/Bacon/Property.hs b/src/Reactive/Bacon/Property.hs
new file mode 100644
index 0000000..728ea23
--- /dev/null
+++ b/src/Reactive/Bacon/Property.hs
@@ -0,0 +1,141 @@
+module Reactive.Bacon.Property(changesP,
+ mapP,
+ combineP,
+ combineWithP,
+ combineWithLatestOfP,
+ constantP,
+ fromEventSource,
+ fromEventSourceWithStartValue,
+ newPushProperty)
+where
+
+import Reactive.Bacon.Core
+import Reactive.Bacon.EventStream
+import Reactive.Bacon.PushStream
+import Control.Applicative
+import Data.IORef
+
+
+instance Functor Property where
+ fmap = mapP
+
+instance Applicative Property where
+ (<*>) = applyP
+ pure = constantP
+
+instance Show a => Show (Property a) where
+ show = const "Property"
+
+instance Eq a => Eq (Property a) where
+ (==) = \x y -> False
+
+instance (Show a, Eq a, Num a) => Num (Property a) where
+ (+) xs ys = (+) <$> xs <*> ys
+ (*) xs ys = (*) <$> xs <*> ys
+ abs = fmap abs
+ signum = fmap signum
+ fromInteger = pure . fromInteger
+
+instance Observable Property where
+ (==>) p f = changesP p >>=! f
+
+changesP :: PropertySource s => s a -> IO (EventStream a)
+changesP s = wrap $ EventStream $ addListener (toProperty s)
+ where addListener (Property addL) sink = addL $ propertySink sink
+ propertySink sink (Initial _) = return More
+ propertySink sink (Update x) = sink (Next x)
+ propertySink sink (EndUpdate) = sink End
+
+fromEventSourceWithStartValue :: EventSource s => Maybe a -> s a -> IO (Property a)
+fromEventSourceWithStartValue start src = do
+ currentRef <- newIORef start
+ return $ Property (addListener' currentRef)
+ where addListener' currentRef sink = do
+ result <- pushCurrent currentRef sink
+ case result of
+ NoMore -> return $ return ()
+ More -> subscribe (obs src) (toEventSink $ sink)
+ pushCurrent currentRef sink = do
+ current <- readIORef currentRef
+ case current of
+ Just x -> sink (Initial x)
+ Nothing -> return $ More
+ toEventSink propertySink End = propertySink EndUpdate
+ toEventSink propertySink (Next x) = propertySink (Update x)
+
+fromEventSource :: EventSource s => s a -> IO (Property a)
+fromEventSource = fromEventSourceWithStartValue Nothing
+
+filterP :: PropertySource s => (a -> Bool) -> s a -> Property a
+filterP f = mapFilterP filter'
+ where filter' x | f x = Just x
+ | otherwise = Nothing
+
+mapP :: PropertySource s => (a -> b) -> s a -> Property b
+mapP f = mapFilterP (Just . f)
+
+mapFilterP :: PropertySource s => (a -> Maybe b) -> s a -> Property b
+mapFilterP f src = Property $ addListener' (toProperty src)
+ where addListener' (Property addL) sink = addL $ mapSink sink
+ mapSink sink EndUpdate = sink EndUpdate
+ mapSink sink (Initial x) = send sink Initial $ f x
+ mapSink sink (Update x) = send sink Update $ f x
+ send sink constructor Nothing = return More
+ send sink constructor (Just x) = sink (constructor x)
+
+
+combineRaw :: PropertySource s1 => PropertySource s2 => s1 a -> s2 b -> Property (Either (a, Maybe b) (b, Maybe a))
+combineRaw x y = Property addL
+ where addL sink = do
+ curX <- newIORef Nothing
+ curY <- newIORef Nothing
+ sinkRef <- newIORef sink
+ endCount <- newIORef 0
+ disposeX <- addPropertyListener (toProperty x) $ combineWith Left curX curY endCount sinkRef
+ disposeY <- addPropertyListener (toProperty y) $ combineWith Right curY curX endCount sinkRef
+ return (disposeX >> disposeY)
+ combineWith f this other endCountRef sinkRef EndUpdate = do
+ endCount <- readIORef endCountRef
+ case endCount of
+ 0 -> writeIORef endCountRef 1 >> return NoMore
+ _ -> readIORef sinkRef >>= \sink -> sink EndUpdate >> return NoMore
+ combineWith f this other endCount sinkRef (Initial x) =
+ combineWith' f this other endCount sinkRef Update x
+ combineWith f this other endCount sinkRef (Update x) =
+ combineWith' f this other endCount sinkRef Update x
+ combineWith' f this other endCount sinkRef constructor x = do
+ writeIORef this $ Just x
+ otherVal <- readIORef other
+ sink <- readIORef sinkRef
+ result <- sink $ constructor $ f (x, otherVal)
+ case result of
+ NoMore -> writeIORef sinkRef nullSink >> return NoMore
+ More -> return More
+ nullSink _ = return NoMore
+
+combineWithP :: PropertySource s1 => PropertySource s2 => (a -> b -> c) -> s1 a -> s2 b -> Property c
+combineWithP f xs ys = mapFilterP mapP' $ combineRaw xs ys
+ where mapP' (Left (x, Just y)) = Just $ f x y
+ mapP' (Right (y, Just x)) = Just $ f x y
+ mapP' _ = Nothing
+
+combineP :: PropertySource s1 => PropertySource s2 => s1 a -> s2 b -> Property (a, b)
+combineP = combineWithP (,)
+
+-- | Combines the values from the first source to the current value of the second source
+combineWithLatestOfP :: PropertySource s1 => PropertySource s2 => (a -> b -> c) -> s1 a -> s2 b -> Property c
+combineWithLatestOfP f xs ys = mapFilterP mapP' $ combineRaw ys xs
+ where mapP' (Right (x, Just y)) = Just $ f x y
+ mapP' _ = Nothing
+
+
+applyP :: PropertySource s1 => PropertySource s2 => s1 (a -> b) -> s2 a -> Property b
+applyP = combineWithP ($)
+
+constantP :: a -> Property a
+constantP value = Property $ \sink -> sink (Initial value) >> return (return ())
+
+newPushProperty :: IO (Property a, (a -> IO ()))
+newPushProperty = do (stream, pushEvent) <- newPushStream
+ property <- fromEventSource stream
+ return (property, pushEvent . Next)
diff --git a/src/Reactive/Bacon/PushStream.hs b/src/Reactive/Bacon/PushStream.hs
new file mode 100644
index 0000000..9a4b195
--- /dev/null
+++ b/src/Reactive/Bacon/PushStream.hs
@@ -0,0 +1,75 @@
+module Reactive.Bacon.PushStream(newPushStream, newDispatcher, wrap) where
+
+import Reactive.Bacon.Core
+import Data.IORef
+import Control.Monad
+
+data Subscription a = Subscription (EventSink a) Int
+instance Eq (Subscription q) where
+ (==) (Subscription _ a) (Subscription _ b) = a == b
+
+data PushCollection a = PushCollection (IORef ([Subscription a], Int)) (EventStream a) (IORef(Maybe Disposable)) (IORef Bool)
+
+instance EventSource PushCollection where
+ toEventStream collection = EventStream (subscribePushCollection collection)
+ where subscribePushCollection pc@(PushCollection ref src dref eref) sink = do
+ (subscriptions, id) <- readIORef ref
+ let subscription = Subscription sink id
+ writeIORef ref $ (subscription : subscriptions, id+1)
+ ended <- readIORef eref
+ when (not ended && null subscriptions) $ do
+ dispose <- subscribe src $ \event -> do
+ pushEvent pc event
+ return $ case event of
+ Next a -> More
+ End -> NoMore
+ writeIORef dref (Just dispose)
+ return (removeSubscription pc subscription)
+
+instance Observable PushCollection where
+ (==>) = (==>) . obs
+
+removeSubscription (PushCollection ref _ disposeRef _) s = do
+ (subscriptions, counter) <- readIORef ref
+ let updated = (removeSubscription' subscriptions)
+ writeIORef ref $ (updated, counter)
+ dispose <- readIORef disposeRef
+ when (null updated) (unsubscribe dispose)
+ where removeSubscription' sinks = filter (/= s) sinks
+ unsubscribe (Just dispose) = dispose
+ unsubscribe _ = return ()
+
+-- | Makes an observable with a single connection to the underlying EventSource.
+-- Automatically subscribes/unsubscribes from EventSource based on whether there
+-- are any EventSinks.
+wrap :: EventSource s => s a -> IO (EventStream a)
+wrap src = newPushStream' src >>= return . fst
+
+newDispatcher :: ((a -> IO ()) -> IO Disposable) -> IO (EventStream a)
+newDispatcher pusher = wrap $ EventStream $ \sink -> pusher (void . sink . Next)
+
+newPushStream' :: EventSource s => s a -> IO (EventStream a, (Event a -> IO()))
+newPushStream' src = do
+ stateRef <- newIORef ([], 1)
+ disposeRef <- newIORef Nothing
+ endRef <- newIORef False
+ let pc = PushCollection stateRef (obs src) disposeRef endRef
+ return $ (obs pc, pushEvent pc)
+
+newPushStream :: IO (EventStream a, (Event a -> IO ()))
+newPushStream = newPushStream' neverE
+
+pushEvent :: PushCollection a -> Event a -> IO ()
+pushEvent pc@(PushCollection listRef src _ endRef) event = do
+ ended <- readIORef endRef
+ unless ended $ do
+ applyEnd event endRef
+ (sinks, _) <- readIORef listRef
+ mapM_ (applyTo event) sinks
+ where applyTo event s@(Subscription sink _) = do
+ result <- sink event
+ case result of
+ More -> return ()
+ NoMore -> removeSubscription pc s
+ applyEnd End endRef = writeIORef endRef True
+ applyEnd _ _ = return ()
diff --git a/test/Specs.hs b/test/Specs.hs
new file mode 100644
index 0000000..431b347
--- /dev/null
+++ b/test/Specs.hs
@@ -0,0 +1,10 @@
+import Reactive.BaconTest
+import System.Exit(exitFailure)
+
+import Test.HUnit
+
+main = failOnError =<< runTestTT baconTests
+
+failOnError :: Counts -> IO ()
+failOnError (Counts _ _ 0 0) = return ()
+failOnError _ = exitFailure