summaryrefslogtreecommitdiff
path: root/Control/Concurrent/Chan/Strict.hs
blob: 52e410253aaffbc8f91699f54ed8df9edf5d4339 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
{-# LANGUAGE BangPatterns #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Chan.Strict
-- Copyright   :  (c) The University of Glasgow 2001, Don Stewart 2007
-- License     :  BSD-style
-- 
-- Maintainer  :  dons@galois.com
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Unbounded, element-strict channels. Elements will be evaluated to
-- WHNF on entering the channel. For some concurrency applications, this
-- is more desirable than passing an unevaluted thunk through the channel
-- (for instance, it guarantees the node willl be evaluated to WHNF in a
-- worker thead).
--
-- Element-strict channes may potentially use more memory than lazy
-- channels
--
-----------------------------------------------------------------------------

module Control.Concurrent.Chan.Strict (
          -- * The 'Chan' type
        Chan,                   -- abstract

          -- * Operations
        newChan,                -- :: IO (Chan a)
        writeChan,              -- :: Chan a -> a -> IO ()
        readChan,               -- :: Chan a -> IO a
        dupChan,                -- :: Chan a -> IO (Chan a)
        unGetChan,              -- :: Chan a -> a -> IO ()
        isEmptyChan,            -- :: Chan a -> IO Bool

          -- * Stream interface
        getChanContents,        -- :: Chan a -> IO [a]
        writeList2Chan,         -- :: Chan a -> [a] -> IO ()
   ) where

import Prelude

import System.IO.Unsafe         ( unsafeInterleaveIO )
import Control.Concurrent.MVar.Strict

-- A channel is represented by two @MVar@s keeping track of the two ends
-- of the channel contents,i.e.,  the read- and write ends. Empty @MVar@s
-- are used to handle consumers trying to read from an empty channel.

-- |'Chan' is an abstract type representing an unbounded FIFO channel.
data Chan a
 = Chan (MVar (Stream a))
        (MVar (Stream a))

type Stream a = MVar (ChItem a)

data ChItem a = ChItem !a (Stream a)

-- @newChan@ sets up the read and write end of a channel by initialising
-- these two @MVar@s with an empty @MVar@.

-- |Build and returns a new instance of 'Chan'.
newChan :: IO (Chan a)
newChan = do
   hole  <- newEmptyMVar
   readm <- newMVar hole
   write <- newMVar hole
   return (Chan readm write)

-- To put an element on a channel, a new hole at the write end is created.
-- What was previously the empty @MVar@ at the back of the channel is then
-- filled in with a new stream element holding the entered value and the
-- new hole.

-- |Write a value to a 'Chan'.
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
  new_hole <- newEmptyMVar
  modifyMVar_ write $ \old_hole -> do
    putMVar old_hole $! ChItem val new_hole
    return new_hole

-- |Read the next value from the 'Chan'.
readChan :: Chan a -> IO a
readChan (Chan readm _write) = do
  modifyMVar readm $ \read_end -> do
    (ChItem val new_read_end) <- readMVar read_end
        -- Use readMVar here, not takeMVar,
        -- else dupChan doesn't work
    return (new_read_end, val)

-- |Duplicate a 'Chan': the duplicate channel begins empty, but data written to
-- either channel from then on will be available from both.  Hence this creates
-- a kind of broadcast channel, where data written by anyone is seen by
-- everyone else.
dupChan :: Chan a -> IO (Chan a)
dupChan (Chan _read write) = do
   hole     <- readMVar write
   new_read <- newMVar hole
   return (Chan new_read write)

-- |Put a data item back onto a channel, where it will be the next item read.
unGetChan :: Chan a -> a -> IO ()
unGetChan (Chan readm _write) val = do
   new_read_end <- newEmptyMVar
   modifyMVar_ readm $ \read_end -> do
     putMVar new_read_end (ChItem val read_end)
     return new_read_end

-- |Returns 'True' if the supplied 'Chan' is empty.
isEmptyChan :: Chan a -> IO Bool
isEmptyChan (Chan readm write) = do
   withMVar readm $ \r -> do
     w <- readMVar write
     let eq = r == w
     eq `seq` return eq

-- Operators for interfacing with functional streams.

-- |Return a lazy list representing the contents of the supplied
-- 'Chan', much like 'System.IO.hGetContents'.
getChanContents :: Chan a -> IO [a]
getChanContents ch = unsafeInterleaveIO $ do
    x  <- readChan ch
    xs <- getChanContents ch
    return (x:xs)

-- |Write an entire list of items to a 'Chan'.
writeList2Chan :: Chan a -> [a] -> IO ()
writeList2Chan = mapM_ . writeChan