summaryrefslogtreecommitdiff
path: root/src/Reactive/Bacon/EventStream/Monadic.hs
blob: cec6598f16f85efffb629bd52e055210b6240db8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
module Reactive.Bacon.EventStream.Monadic(flatMapE, switchE) where

import Data.IORef
import Reactive.Bacon.Core
import Reactive.Bacon.EventStream.Combinators
import Reactive.Bacon.EventStream
import Reactive.Bacon.PushStream(wrap)
import Control.Concurrent.STM
import Control.Monad

-- EventStream is not a Monad
-- However, flatMapE and switchE have a signature that's pretty close
-- to monadic bind. The difference is that IO is allowed in the bind step.
flatMapE :: EventSource s => (a -> IO (EventStream b)) -> s a -> IO (EventStream b)
flatMapE binder xs = wrap $ EventStream $ \sink -> do
    state <- newTVarIO $ State sink Nothing 1 [] [] False
    dispose <- subscribe (obs xs) $ mainEventSink state
    atomically $ modifyTVar state $ \state -> state { dispose = Just dispose }
    return $ disposeAll state
  where mainEventSink state eventA = do
            case eventA of
              End    -> do
                (end, sink) <- withState state $ \s -> do
                    modifyTVar state $ \s -> s { mainEnded = True }
                    return (null (childIds s), currentSink s)
                when end $ void $ sink End
                return NoMore
              Next x -> do
                id <- withState state $ \s -> do
                  let id = counter s
                  writeTVar state $ s { counter = (counter s + 1), childIds = id : (childIds s) }
                  return id
                childStream <- binder x
                childDispose <- subscribe childStream $ childEventSink id state
                atomically $ modifyTVar state $ \s -> s { childDisposables = (childDispose : childDisposables s) }
                return More
        childEventSink id state = \eventB -> do
                          case eventB of
                              End    -> do
                                (end, sink) <- withState state $ \s -> do
                                  let newState = removeChild s id
                                  writeTVar state newState
                                  let end = (null (childIds newState) && mainEnded newState)
                                  return (end, currentSink newState)
                                when end $ void $ sink End
                                return NoMore 
                              Next y -> do
                                sink <- withState state $ return.currentSink
                                result <- sink (Next y)
                                case result of
                                    NoMore -> disposeAll state >> return NoMore
                                    More -> return More
        disposeAll state = do
              (maybeDispose, children) <- withState state $ \s -> return (dispose s, childDisposables s)
              sequence_ children
              case maybeDispose of
                  Nothing -> return () -- TODO should dispose later?
                  Just dispose -> dispose
        removeChild state id = state { childIds = filter (/= id) (childIds state) }
        withState state action = atomically (readTVar state >>= action)

switchE :: EventSource s => (a -> IO (EventStream b)) -> s a -> IO (EventStream b)
switchE binder src = flatMapE (binder >=> (takeUntilE src)) src

data State a = State { currentSink :: EventSink a, 
                       dispose :: Maybe Disposable,
                       counter :: Int,
                       childIds :: [Int],
                       childDisposables :: [Disposable],
                       mainEnded :: Bool }

modifyTVar :: TVar a -> (a -> a) -> STM ()
modifyTVar var f = do
  val <- readTVar var
  writeTVar var (f val)