summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxenog <>2018-09-14 11:21:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-09-14 11:21:00 (GMT)
commit1e2a9ab830c6f4d4f0c4b6e32ccd375597ea744d (patch)
tree6a98c19cdcfd8d50401796b2a0659e5e1a6e59e9
parent1c78eaa758805477c244e919040531725bd98e9b (diff)
version 0.5.0HEAD0.5.0master
-rw-r--r--CHANGELOG.md16
-rw-r--r--nqe.cabal12
-rw-r--r--src/Control/Concurrent/NQE/Conduit.hs16
-rw-r--r--src/Control/Concurrent/NQE/Network.hs26
-rw-r--r--src/Control/Concurrent/NQE/Process.hs191
-rw-r--r--src/Control/Concurrent/NQE/PubSub.hs137
-rw-r--r--src/Control/Concurrent/NQE/Supervisor.hs33
-rw-r--r--src/NQE.hs (renamed from src/Control/Concurrent/NQE.hs)6
-rw-r--r--test/Spec.hs57
9 files changed, 265 insertions, 229 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 002de0c..f7c2790 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+## 0.5.0
+### Added
+- `Inbox` type is now comparable for equality.
+- Haddock documentation for all functions, types and classes.
+- Expose `SupervisorMessage` type alias.
+- Expose `Publisher` type alias.
+
+### Changed
+- Change `Mailbox` typeclass.
+- Simplify PubSub module.
+- Replace network features with a single conduit.
+- Multiple API changes.
+
+### Removed
+- Remove dispatcher functions.
+
## 0.4.1
### Changed
- Specify different dependencies for test and library.
diff --git a/nqe.cabal b/nqe.cabal
index f8b7ff8..7f05a50 100644
--- a/nqe.cabal
+++ b/nqe.cabal
@@ -2,10 +2,10 @@
--
-- see: https://github.com/sol/hpack
--
--- hash: 7e7510a73928f4b4b1d8c11effd5484aa09190bd88208cb965a957a63e15e4f8
+-- hash: a5599fb449af1ba777de4bf50443ca71032893f717edec6dc78953d607e4145c
name: nqe
-version: 0.4.1
+version: 0.5.0
synopsis: Concurrency library in the style of Erlang/OTP
description: Minimalistic actor library inspired by Erlang/OTP with support for supervisor hierarchies and asynchronous messages, as well as abstractions for synchronous communication and easy management of TCP connections.
category: Control
@@ -27,12 +27,12 @@ source-repository head
library
exposed-modules:
- Control.Concurrent.NQE
- other-modules:
- Control.Concurrent.NQE.Network
+ Control.Concurrent.NQE.Conduit
Control.Concurrent.NQE.Process
Control.Concurrent.NQE.PubSub
Control.Concurrent.NQE.Supervisor
+ NQE
+ other-modules:
Paths_nqe
hs-source-dirs:
src
@@ -40,8 +40,10 @@ library
base >=4.7 && <5
, conduit
, containers
+ , hashable
, mtl
, stm
+ , unique
, unliftio
default-language: Haskell2010
diff --git a/src/Control/Concurrent/NQE/Conduit.hs b/src/Control/Concurrent/NQE/Conduit.hs
new file mode 100644
index 0000000..adb6a6a
--- /dev/null
+++ b/src/Control/Concurrent/NQE/Conduit.hs
@@ -0,0 +1,16 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE RankNTypes #-}
+module Control.Concurrent.NQE.Conduit
+ ( conduitMailbox
+ ) where
+
+import Control.Concurrent.NQE.Process
+import Conduit
+
+-- | Consumes messages and sends them to a mailbox.
+conduitMailbox ::
+ (MonadIO m, Mailbox mbox msg)
+ => mbox msg
+ -> ConduitT msg o m ()
+conduitMailbox mbox = awaitForever (`send` mbox)
diff --git a/src/Control/Concurrent/NQE/Network.hs b/src/Control/Concurrent/NQE/Network.hs
deleted file mode 100644
index 80b16a8..0000000
--- a/src/Control/Concurrent/NQE/Network.hs
+++ /dev/null
@@ -1,26 +0,0 @@
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE RankNTypes #-}
-module Control.Concurrent.NQE.Network
- ( fromSource
- , withSource
- ) where
-
-import Control.Concurrent.NQE.Process
-import Data.Conduit
-import UnliftIO
-
-fromSource ::
- (MonadIO m, Mailbox mbox)
- => ConduitT () msg m ()
- -> mbox msg -- ^ will receive all messages
- -> m ()
-fromSource src mbox = runConduit $ src .| awaitForever (`send` mbox)
-
-withSource ::
- (MonadUnliftIO m, Mailbox mbox)
- => ConduitT () msg m ()
- -> mbox msg
- -> (Async () -> m a)
- -> m a
-withSource src mbox = withAsync (fromSource src mbox)
diff --git a/src/Control/Concurrent/NQE/Process.hs b/src/Control/Concurrent/NQE/Process.hs
index f0d3494..f8f1256 100644
--- a/src/Control/Concurrent/NQE/Process.hs
+++ b/src/Control/Concurrent/NQE/Process.hs
@@ -1,99 +1,164 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
-module Control.Concurrent.NQE.Process where
-
-import Control.Monad
+module Control.Concurrent.NQE.Process
+ ( Mailbox(..)
+ , Inbox
+ , Reply
+ , Listen
+ , newInbox
+ , send
+ , receive
+ , query
+ , receiveMatch
+ , receiveMatchSTM
+ , mailboxEmpty
+ ) where
+
+import Control.Concurrent.Unique
+import Data.Hashable
import UnliftIO
+-- | STM action to reply to a synchronous request.
type Reply a = a -> STM ()
+
+-- | STM action for an event listener.
type Listen a = a -> STM ()
-class Mailbox mbox where
+-- | Mailboxes are used to communicate with processes (actors). A process will
+-- usually listen in a loop for events entering its mailbox. A process is its
+-- mailbox, so it may be named as the process that it communicates with.
+--
+-- >>> :m + Control.Monad NQE UnliftIO
+-- >>> registry <- newTQueueIO :: IO (TQueue String)
+-- >>> let run = receive registry >>= putStrLn . ("Registered: " ++)
+-- >>> withAsync run $ \a -> "Bruce Wayne" `send` registry >> wait a
+-- Registered: Bruce Wayne
+class Eq (mbox msg) => Mailbox mbox msg where
+ -- | STM action that responds true if the mailbox is empty. Useful to avoid
+ -- blocking on an empty mailbox.
mailboxEmptySTM :: mbox msg -> STM Bool
+ -- | STM action that responds true if the mailbox is full and would block if
+ -- a new message is received.
+ mailboxFullSTM :: mbox msg -> STM Bool
+ -- | STM action to send a message to a mailbox. This is usually called from
+ -- a process that wishes to communicate with actor that owns the mailbox.
sendSTM :: msg -> mbox msg -> STM ()
+ -- | STM action to receive a message from a mailbox. This should be called
+ -- from the process that owns the mailbox.
receiveSTM :: mbox msg -> STM msg
- requeueMsg :: msg -> mbox msg -> STM ()
+ -- | Put a message back in the mailbox so that it is the next one to be
+ -- received. Used for pattern matching.
+ requeueSTM :: msg -> mbox msg -> STM ()
-instance Mailbox TQueue where
+instance Mailbox TQueue msg where
mailboxEmptySTM = isEmptyTQueue
+ mailboxFullSTM = const $ return False
sendSTM msg = (`writeTQueue` msg)
receiveSTM = readTQueue
- requeueMsg msg = (`unGetTQueue` msg)
+ requeueSTM msg = (`unGetTQueue` msg)
-instance Mailbox TBQueue where
+instance Mailbox TBQueue msg where
mailboxEmptySTM = isEmptyTBQueue
+ mailboxFullSTM = isFullTBQueue
sendSTM msg = (`writeTBQueue` msg)
receiveSTM = readTBQueue
- requeueMsg msg = (`unGetTBQueue` msg)
+ requeueSTM msg = (`unGetTBQueue` msg)
-data Inbox msg =
- forall mbox. (Mailbox mbox) =>
- Inbox (mbox msg)
+-- | Wrapped 'Mailbox' hiding its implementation.
+data Inbox msg = forall mbox. (Mailbox mbox msg) => Inbox !(mbox msg) !Unique
-instance Mailbox Inbox where
- mailboxEmptySTM (Inbox mbox) = mailboxEmptySTM mbox
- sendSTM msg (Inbox mbox) = msg `sendSTM` mbox
- receiveSTM (Inbox mbox) = receiveSTM mbox
- requeueMsg msg (Inbox mbox) = msg `requeueMsg` mbox
+-- | Create a new 'Inbox' with a 'Unique' identifier inside. If you run this
+-- function more than once with the same 'Mailbox', its results will be
+-- different from the 'Eq' or 'Hashable' point of view.
+newInbox :: (MonadIO m, Mailbox mbox msg) => mbox msg -> m (Inbox msg)
+newInbox mbox = Inbox mbox <$> liftIO newUnique
-mailboxEmpty :: (MonadIO m, Mailbox mbox) => mbox msg -> m Bool
-mailboxEmpty = atomically . mailboxEmptySTM
+instance Eq (Inbox msg) where
+ Inbox _ u1 == Inbox _ u2 = u1 == u2
-send :: (MonadIO m, Mailbox mbox) => msg -> mbox msg -> m ()
-send msg = atomically . sendSTM msg
+instance Hashable (Inbox msg) where
+ hashWithSalt i (Inbox _ u) = hashWithSalt i u
+ hash (Inbox _ u) = hash u
-requeue :: (Mailbox mbox) => [msg] -> mbox msg -> STM ()
-requeue xs mbox = mapM_ (`requeueMsg` mbox) xs
+instance Mailbox Inbox msg where
+ mailboxEmptySTM (Inbox mbox _) = mailboxEmptySTM mbox
+ mailboxFullSTM (Inbox mbox _) = mailboxFullSTM mbox
+ sendSTM msg (Inbox mbox _) = msg `sendSTM` mbox
+ receiveSTM (Inbox mbox _) = receiveSTM mbox
+ requeueSTM msg (Inbox mbox _) = msg `requeueSTM` mbox
-extractMsg ::
- (Mailbox mbox)
- => [(msg -> Maybe a, a -> b)]
- -> mbox msg
- -> STM b
-extractMsg hs mbox = do
- msg <- receiveSTM mbox
- go [] msg hs
- where
- go acc msg [] = do
- msg' <- receiveSTM mbox
- go (msg : acc) msg' hs
- go acc msg ((f, action):fs) =
- case f msg of
- Just x -> do
- requeue acc mbox
- return $ action x
- Nothing -> go acc msg fs
+-- | Send a message to a mailbox.
+send :: (MonadIO m, Mailbox mbox msg) => msg -> mbox msg -> m ()
+send msg = atomically . sendSTM msg
+-- | Receive a message from the mailbox. This function should be called only by
+-- the process that owns the malibox.
+receive ::
+ (MonadIO m, Mailbox mbox msg)
+ => mbox msg
+ -> m msg
+receive mbox = receiveMatch mbox Just
+
+-- | Use a partially-applied message type that takes a `Reply a` as its last
+-- argument. This function will create the STM action for the response, send the
+-- message to a process and await for the STM action to be fulfilled before
+-- responding response. It implements synchronous communication with a process.
+--
+-- Example:
+--
+-- >>> :m + NQE UnliftIO
+-- >>> data Message = Square Integer (Reply Integer)
+-- >>> doubler <- newTQueueIO :: IO (TQueue Message)
+-- >>> let proc = receive doubler >>= \(Square i r) -> atomically $ r (i * i)
+-- >>> withAsync proc $ \_ -> Square 2 `query` doubler
+-- 4
+--
+-- In this example the @Square@ constructor takes a 'Reply' action as its last
+-- argument. It is passed partially-applied to @query@, which adds a new 'Reply'
+-- action before sending it to the @doubler@ and then waiting for it. The
+-- doubler process will run the @Reply@ action n STM with the reply as its
+-- argument. In this case @i * i@.
query ::
- (MonadIO m, Mailbox mbox)
- => (Reply b -> msg)
+ (MonadIO m, Mailbox mbox msg)
+ => (Reply a -> msg)
-> mbox msg
- -> m b
+ -> m a
query f mbox = do
box <- atomically newEmptyTMVar
f (putTMVar box) `send` mbox
atomically (takeTMVar box)
-dispatch ::
- (MonadIO m, Mailbox mbox)
- => [(msg -> Maybe a, a -> m b)] -- ^ action to dispatch
- -> mbox msg -- ^ mailbox to read from
- -> m b
-dispatch hs = join . atomically . extractMsg hs
-
-dispatchSTM :: (Mailbox mbox) => [msg -> Maybe a] -> mbox msg -> STM a
-dispatchSTM = extractMsg . map (\x -> (x, id))
-
-receive ::
- (MonadIO m, Mailbox mbox)
+-- | Test all the messages in a mailbox against the supplied function and return
+-- the output of the function only when it is 'Just'. Will block until a message
+-- matches. All messages that did not match are left in the mailbox. Only call
+-- from process that owns mailbox.
+receiveMatch ::
+ (MonadIO m, Mailbox mbox msg) => mbox msg -> (msg -> Maybe a) -> m a
+receiveMatch mbox = atomically . receiveMatchSTM mbox
+
+-- | Match a message in the mailbox as an atomic STM action.
+receiveMatchSTM ::
+ (Mailbox mbox msg)
=> mbox msg
- -> m msg
-receive = dispatch [(Just, return)]
-
-receiveMatch :: (MonadIO m, Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> m a
-receiveMatch mbox f = dispatch [(f, return)] mbox
+ -> (msg -> Maybe a)
+ -> STM a
+receiveMatchSTM mbox f = go []
+ where
+ go acc =
+ receiveSTM mbox >>= \msg ->
+ case f msg of
+ Just x -> do
+ requeueListSTM acc mbox
+ return x
+ Nothing -> go (msg : acc)
+
+-- | Check if the mailbox is empty.
+mailboxEmpty :: (MonadIO m, Mailbox mbox msg) => mbox msg -> m Bool
+mailboxEmpty = atomically . mailboxEmptySTM
-receiveMatchSTM :: (Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> STM a
-receiveMatchSTM mbox f = dispatchSTM [f] mbox
+-- | Put a message at the start of a mailbox, so that it is the next one read.
+requeueListSTM :: (Mailbox mbox msg) => [msg] -> mbox msg -> STM ()
+requeueListSTM xs mbox = mapM_ (`requeueSTM` mbox) xs
diff --git a/src/Control/Concurrent/NQE/PubSub.hs b/src/Control/Concurrent/NQE/PubSub.hs
index bbe490c..5e8e4d5 100644
--- a/src/Control/Concurrent/NQE/PubSub.hs
+++ b/src/Control/Concurrent/NQE/PubSub.hs
@@ -2,9 +2,9 @@
module Control.Concurrent.NQE.PubSub
( Publisher
, publisher
- , boundedPublisher
+ , subscribe
+ , unsubscribe
, withPubSub
- , withBoundedPubSub
) where
import Control.Applicative
@@ -13,55 +13,41 @@ import Control.Monad.Reader
import Data.List
import UnliftIO
-data ControlMsg ch msg
- = Subscribe (ch msg)
- | Unsubscribe (ch msg)
+-- | Subscribe or unsubscribe from an event publisher.
+data ControlMsg event
+ = Subscribe (Inbox event)
+ | Unsubscribe (Inbox event)
-data Incoming ch msg
- = Control (ControlMsg ch msg)
- | Event msg
-
-type Publisher mbox ch msg = mbox (ControlMsg ch msg)
+-- | Wrapper for a 'Mailbox' that can receive 'ControlMsg'.
+type Publisher msg = Inbox (ControlMsg msg)
+-- | Subscribe an 'Inbox' to events from a 'Publisher'. Unsubscribe when action
+-- finishes. Pass a mailbox constructor.
withPubSub ::
- (MonadUnliftIO m, Mailbox mbox)
- => Publisher mbox TQueue msg
- -> (TQueue msg -> m a)
+ (MonadUnliftIO m, Mailbox mbox event)
+ => Publisher event
+ -> m (mbox event) -- ^ mailbox creator for this subscription
+ -> (Inbox event -> m a) -- ^ unsubscribe when this action ends
-> m a
-withPubSub pub f = bracket subscribe unsubscribe action
+withPubSub pub sub = bracket acquire (unsubscribe pub)
where
- subscribe = do
- mbox <- newTQueueIO
- Subscribe mbox `send` pub
- return mbox
- unsubscribe mbox = Unsubscribe mbox `send` pub
- action mbox = f mbox
+ acquire = do
+ i <- newInbox =<< sub
+ subscribe pub i
+ return i
-withBoundedPubSub ::
- (MonadUnliftIO m, Mailbox mbox)
- => Int
- -> Publisher mbox TBQueue msg
- -> (TBQueue msg -> m a)
- -> m a
-withBoundedPubSub bound pub f = bracket subscribe unsubscribe action
- where
- subscribe = do
- mbox <- newTBQueueIO bound
- Subscribe mbox `send` pub
- return mbox
- unsubscribe mbox = Unsubscribe mbox `send` pub
- action mbox = f mbox
+-- | Subscribe an 'Inbox' to a 'Publisher' generating events.
+subscribe :: MonadIO m => Publisher event -> Inbox event -> m ()
+subscribe pub sub = Subscribe sub `send` pub
-publisher ::
- ( MonadIO m
- , Mailbox mbox
- , Mailbox events
- , Mailbox ch
- , Eq (ch msg)
- )
- => Publisher mbox ch msg
- -> events msg
- -> m ()
+-- | Unsubscribe an 'Inbox' from a 'Publisher' events.
+unsubscribe :: MonadIO m => Publisher event -> Inbox event -> m ()
+unsubscribe pub sub = Unsubscribe sub `send` pub
+
+-- | Start a publisher that will receive events from an 'STM' action, and
+-- subscription requests from a 'Publisher' mailbox. Events will be forwarded
+-- atomically to all subscribers. Full mailboxes will not receive events.
+publisher :: MonadIO m => Publisher event -> STM event -> m ()
publisher pub events = do
box <- newTVarIO []
runReaderT go box
@@ -70,62 +56,25 @@ publisher pub events = do
forever $ do
incoming <-
atomically $
- Control <$> receiveSTM pub <|> Event <$> receiveSTM events
+ Left <$> receiveSTM pub <|> Right <$> events
process incoming
-boundedPublisher ::
- (MonadIO m, Mailbox mbox, Mailbox events)
- => Publisher mbox TBQueue msg
- -> events msg
- -> m ()
-boundedPublisher pub events = do
- box <- newTVarIO []
- runReaderT go box
- where
- go =
- forever $ do
- incoming <-
- atomically $
- Control <$> receiveSTM pub <|> Event <$> receiveSTM events
- processBound incoming
-
-processBound ::
- (MonadIO m, MonadReader (TVar [TBQueue msg]) m)
- => Incoming TBQueue msg
- -> m ()
-processBound (Control (Subscribe mbox)) = do
- box <- ask
- atomically $ do
- subscribers <- readTVar box
- when (mbox `notElem` subscribers) $ writeTVar box (mbox : subscribers)
-
-processBound (Control (Unsubscribe mbox)) = do
- box <- ask
- atomically (modifyTVar box (delete mbox))
-
-processBound (Event event) =
- ask >>= \box ->
- atomically $
- readTVar box >>= \subs ->
- forM_ subs $ \sub ->
- isFullTBQueue sub >>= \full ->
- when (not full) (event `sendSTM` sub)
-
process ::
- (Eq (ch msg), Mailbox ch, MonadIO m, MonadReader (TVar [ch msg]) m)
- => Incoming ch msg
+ (MonadIO m, MonadReader (TVar [Inbox event]) m)
+ => Either (ControlMsg event) event
-> m ()
-process (Control (Subscribe mbox)) = do
+process (Left (Subscribe sub)) = do
box <- ask
- atomically $ do
- subscribers <- readTVar box
- when (mbox `notElem` subscribers) $
- writeTVar box (mbox : subscribers)
+ atomically (modifyTVar box (`union` [sub]))
-process (Control (Unsubscribe mbox)) = do
+process (Left (Unsubscribe sub)) = do
box <- ask
- atomically (modifyTVar box (delete mbox))
+ atomically (modifyTVar box (delete sub))
-process (Event event) = do
+process (Right event) = do
box <- ask
- readTVarIO box >>= mapM_ (send event)
+ atomically $ do
+ subs <- readTVar box
+ forM_ subs $ \sub -> do
+ full <- mailboxFullSTM sub
+ unless full $ event `sendSTM` sub
diff --git a/src/Control/Concurrent/NQE/Supervisor.hs b/src/Control/Concurrent/NQE/Supervisor.hs
index 315c7fa..acb243f 100644
--- a/src/Control/Concurrent/NQE/Supervisor.hs
+++ b/src/Control/Concurrent/NQE/Supervisor.hs
@@ -3,7 +3,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Control.Concurrent.NQE.Supervisor
- ( SupervisorMessage(..)
+ ( SupervisorMessage
, Strategy(..)
, supervisor
, addChild
@@ -17,6 +17,7 @@ import Control.Monad
import Control.Monad.STM (catchSTM)
import UnliftIO
+-- | A supervisor will start, stop and monitor processes.
data SupervisorMessage n
= MonadUnliftIO n =>
AddChild (n ())
@@ -24,17 +25,24 @@ data SupervisorMessage n
| RemoveChild (Async ())
| StopSupervisor
+-- | Supervisor strategies to decide what to do when a child stops.
data Strategy
= Notify ((Async (), Either SomeException ()) -> STM ())
+ -- ^ run this 'STM' action when a process stops
| KillAll
+ -- ^ kill all processes and propagate exception
| IgnoreGraceful
+ -- ^ ignore processes that stop without raising exceptions
| IgnoreAll
+ -- ^ do nothing and keep running if a process dies
+-- | Run a supervisor with a given 'Strategy' a 'Mailbox' to control it, and a
+-- list of children to launch. The list can be empty.
supervisor ::
- (MonadUnliftIO m, Mailbox mbox)
+ (MonadUnliftIO m, Mailbox mbox (SupervisorMessage n), m ~ n)
=> Strategy
- -> mbox (SupervisorMessage m)
- -> [m ()]
+ -> mbox (SupervisorMessage n)
+ -> [n ()]
-> m ()
supervisor strat mbox children = do
state <- newTVarIO []
@@ -53,9 +61,10 @@ supervisor strat mbox children = do
Left x -> processDead state strat x
when again $ loop state
down state = do
- as <- atomically (readTVar state)
+ as <- readTVarIO state
mapM_ cancel as
+-- | Internal action to wait for a child process to finish running.
waitForChild :: TVar [Async ()] -> STM (Async (), Either SomeException ())
waitForChild state = do
as <- readTVar state
@@ -126,6 +135,7 @@ processDead state (Notify notif) (a, e) = do
forM_ as (stopChild state)
throwIO ex
+-- | Internal function to start a child process.
startChild ::
(MonadUnliftIO m)
=> TVar [Async ()]
@@ -136,6 +146,7 @@ startChild state run = do
atomically (modifyTVar' state (a:))
return a
+-- | Internal fuction to stop a child process.
stopChild :: MonadIO m => TVar [Async ()] -> Async () -> m ()
stopChild state a = do
isChild <-
@@ -146,22 +157,28 @@ stopChild state a = do
return (cur /= new)
when isChild (cancel a)
+-- | Add a new child process to the supervisor. The child process will run in
+-- the supervisor context. Will return an 'Async' for the child. This function
+-- will not block or raise an exception if the child dies.
addChild ::
- (MonadUnliftIO n, MonadIO m, Mailbox mbox)
+ (MonadUnliftIO n, MonadIO m, Mailbox mbox (SupervisorMessage n))
=> mbox (SupervisorMessage n)
-> n ()
-> m (Async ())
addChild mbox action = AddChild action `query` mbox
+-- | Stop a child process controlled by the supervisor. Must pass the child
+-- 'Async'. Will not wait for the child to die.
removeChild ::
- (MonadUnliftIO n, MonadIO m, Mailbox mbox)
+ (MonadUnliftIO n, MonadIO m, Mailbox mbox (SupervisorMessage n))
=> mbox (SupervisorMessage n)
-> Async ()
-> m ()
removeChild mbox child = RemoveChild child `send` mbox
+-- | Stop the supervisor and its children.
stopSupervisor ::
- (MonadUnliftIO n, MonadIO m, Mailbox mbox)
+ (MonadUnliftIO n, MonadIO m, Mailbox mbox (SupervisorMessage n))
=> mbox (SupervisorMessage n)
-> m ()
stopSupervisor mbox = StopSupervisor `send` mbox
diff --git a/src/Control/Concurrent/NQE.hs b/src/NQE.hs
index b69bc94..5dcbd52 100644
--- a/src/Control/Concurrent/NQE.hs
+++ b/src/NQE.hs
@@ -1,11 +1,11 @@
-module Control.Concurrent.NQE
+module NQE
( module Control.Concurrent.NQE.Process
- , module Control.Concurrent.NQE.Network
, module Control.Concurrent.NQE.Supervisor
, module Control.Concurrent.NQE.PubSub
+ , module Control.Concurrent.NQE.Conduit
) where
-import Control.Concurrent.NQE.Network
+import Control.Concurrent.NQE.Conduit
import Control.Concurrent.NQE.Process
import Control.Concurrent.NQE.PubSub
import Control.Concurrent.NQE.Supervisor
diff --git a/test/Spec.hs b/test/Spec.hs
index 3404412..6509a28 100644
--- a/test/Spec.hs
+++ b/test/Spec.hs
@@ -1,20 +1,20 @@
+{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
import Conduit
-import Control.Concurrent hiding (yield)
-import Control.Concurrent.NQE
-import Control.Concurrent.STM (retry)
+import Control.Concurrent.STM (check)
import Control.Exception
import Control.Monad
-import Control.Monad.Catch
import Data.ByteString (ByteString)
import Data.Conduit.Text (decode, encode, utf8)
import qualified Data.Conduit.Text as CT
import Data.Conduit.TMChan
import Data.Text (Text)
+import NQE
import Test.Hspec
import UnliftIO
+import UnliftIO.Concurrent hiding (yield)
data Pong = Pong deriving (Eq, Show)
newtype Ping = Ping (Pong -> STM ())
@@ -59,12 +59,10 @@ pongServer ::
-> IO a
pongServer source sink go = do
mbox <- newTQueueIO
- withAsync (action mbox) go
+ withAsync (in_pipe mbox) $ const $ withAsync (out_pipe mbox) go
where
- action mbox =
- withSource src mbox . const . runConduit $ processor mbox .| snk
- src = source .| decoder
- snk = encoder .| sink
+ in_pipe mbox = runConduit $ source .| decoder .| conduitMailbox mbox
+ out_pipe mbox = runConduit $ processor mbox .| encoder .| sink
processor mbox =
forever $
receive mbox >>= \case
@@ -76,16 +74,12 @@ pongClient :: ConduitT () ByteString IO ()
-> IO Text
pongClient source sink = do
mbox <- newTQueueIO
- withAsync (action mbox) go
+ withAsync out_pipe $
+ const $ withAsync (in_pipe mbox) $ const $ receive mbox
where
- action mbox =
- withSource src mbox $ const $ processor mbox
- go = wait
- src = source .| decoder
- snk = encoder .| sink
- processor mbox = do
- runConduit $ yield ("ping\n" :: Text) .| snk
- receive mbox
+ in_pipe mbox = runConduit $ source .| decoder .| conduitMailbox mbox
+ out_pipe = runConduit $ generator .| encoder .| sink
+ generator = yield ("ping\n" :: Text)
main :: IO ()
main =
@@ -163,7 +157,7 @@ main =
case fromException x of
Just TestError1 -> True
Just TestError2 -> True
- _ -> False
+ _ -> False
snd t1 `shouldSatisfy` er
snd t2 `shouldSatisfy` er
stopSupervisor sup
@@ -171,11 +165,12 @@ main =
describe "pubsub" $ do
it "sends messages to all subscribers" $ do
let msgs = words "hello world"
- pub <- newTQueueIO
+ pub <- newTQueueIO >>= newInbox
events <- newTQueueIO
- withAsync (publisher pub events) $ \_ ->
- withPubSub pub $ \sub1 ->
- withPubSub pub $ \sub2 -> do
+ withAsync (publisher pub (receiveSTM events)) $
+ const $
+ withPubSub pub newTQueueIO $ \sub1 ->
+ withPubSub pub newTQueueIO $ \sub2 -> do
mapM_ (`send` events) msgs
sub1msgs <- replicateM 2 (receive sub1)
sub2msgs <- replicateM 2 (receive sub2)
@@ -183,14 +178,16 @@ main =
sub2msgs `shouldBe` msgs
it "drops messages when bounded queue full" $ do
let msgs = words "hello world drop"
- pub <- newTQueueIO
+ pub <- newTQueueIO >>= newInbox
events <- newTQueueIO
- withAsync (boundedPublisher pub events) $ \_ ->
- withBoundedPubSub 2 pub $ \sub -> do
+ withAsync (publisher pub (receiveSTM events)) $
+ const $
+ withPubSub pub (newTBQueueIO 2) $ \sub -> do
mapM_ (`send` events) msgs
- atomically $
- isFullTBQueue sub >>= \full -> when (not full) retry
- msgs <- replicateM 2 (receive sub)
+ atomically $ do
+ check =<< mailboxFullSTM sub
+ check =<< mailboxEmptySTM events
+ msgs' <- replicateM 2 (receive sub)
"meh" `send` events
msg <- receive sub
- msgs <> [msg] `shouldBe` (words "hello world meh")
+ msgs' <> [msg] `shouldBe` words "hello world meh"