summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlfredoDiNapoli <>2015-10-13 12:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2015-10-13 12:16:00 (GMT)
commitff824b9b54f0be4b3da033cc51b74340bf283ddf (patch)
treeaca93556db040e7b17bb030d516def11bbc4733a
parentd4d4c20525bd8c6bcbc6ad9c55cb5d07f13fa4eb (diff)
version 1.0.4.01.0.4.0
-rw-r--r--examples/Main.hs2
-rw-r--r--src/Control/Concurrent/Supervisor.hs236
-rw-r--r--src/Control/Concurrent/Supervisor/Bounded.hs56
-rw-r--r--src/Control/Concurrent/Supervisor/Tutorial.hs13
-rw-r--r--src/Control/Concurrent/Supervisor/Types.hs277
-rw-r--r--test/Main.hs10
-rw-r--r--test/Tests.hs4
-rw-r--r--test/Tests/Bounded.hs207
-rw-r--r--threads-supervisor.cabal57
9 files changed, 613 insertions, 249 deletions
diff --git a/examples/Main.hs b/examples/Main.hs
index 470e246..4d0b7b5 100644
--- a/examples/Main.hs
+++ b/examples/Main.hs
@@ -47,6 +47,6 @@ main = bracketOnError (do
return sup1) shutdownSupervisor (\_ -> threadDelay 10000000000)
where
go eS = do
- newE <- atomically $ readTBQueue eS
+ newE <- atomically $ readTQueue eS
print newE
go eS
diff --git a/src/Control/Concurrent/Supervisor.hs b/src/Control/Concurrent/Supervisor.hs
index e6e1f84..1fe6de5 100644
--- a/src/Control/Concurrent/Supervisor.hs
+++ b/src/Control/Concurrent/Supervisor.hs
@@ -1,257 +1,43 @@
-
{-
Humble module inspired to Erlang supervisors,
with minimal dependencies.
-}
{-# LANGUAGE GADTs #-}
+{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE DeriveDataTypeable #-}
-module Control.Concurrent.Supervisor
+module Control.Concurrent.Supervisor
( SupervisorSpec
, Supervisor
- , DeadLetter
- , RestartAction
- , SupervisionEvent(..)
- , RestartStrategy(..)
- -- * Creating a new supervisor spec
- -- $new
+ , Child
, newSupervisorSpec
- -- * Creating a new supervisor
- -- $sup
, newSupervisor
- -- * Restart Strategies
- , oneForOne
- -- * Stopping a supervisor
- -- $shutdown
- , shutdownSupervisor
- -- * Accessing Supervisor event log
- -- $log
- , eventStream
- , activeChildren
- -- * Supervise a forked thread
- -- $fork
- , forkSupervised
- -- * Monitor another supervisor
- -- $monitor
- , monitor
+ , module T
) where
-import qualified Data.HashMap.Strict as Map
-import Control.Concurrent
+import Control.Concurrent.Supervisor.Types as T hiding (newSupervisor, newSupervisorSpec)
+import qualified Control.Concurrent.Supervisor.Types as Types
import Control.Concurrent.STM
-import Data.IORef
-import Control.Exception
-import Data.Typeable
-import Control.Monad
-import Control.Retry
-import Data.Time
-
---------------------------------------------------------------------------------
-data Uninitialised
-data Initialised
-
---------------------------------------------------------------------------------
-data Supervisor_ a = Supervisor_ {
- _sp_myTid :: !(Maybe ThreadId)
- , _sp_children :: !(IORef (Map.HashMap ThreadId Child))
- , _sp_mailbox :: TChan DeadLetter
- , _sp_eventStream :: TBQueue SupervisionEvent
- }
-
-type SupervisorSpec = Supervisor_ Uninitialised
-type Supervisor = Supervisor_ Initialised
-
---------------------------------------------------------------------------------
-data DeadLetter = DeadLetter ThreadId SomeException
-
---------------------------------------------------------------------------------
-data Child = Worker !RestartStrategy RestartAction
- | Supvsr !RestartStrategy !(Supervisor_ Initialised)
---------------------------------------------------------------------------------
-type RestartAction = ThreadId -> IO ThreadId
-
---------------------------------------------------------------------------------
-data SupervisionEvent =
- ChildBorn !ThreadId !UTCTime
- | ChildDied !ThreadId !SomeException !UTCTime
- | ChildRestarted !ThreadId !ThreadId !RestartStrategy !UTCTime
- | ChildRestartLimitReached !ThreadId !RestartStrategy !UTCTime
- | ChildFinished !ThreadId !UTCTime
- deriving Show
-
---------------------------------------------------------------------------------
--- | Erlang inspired strategies. At the moment only the 'OneForOne' is
--- implemented.
-data RestartStrategy =
- OneForOne !Int RetryPolicy
-
-instance Show RestartStrategy where
- show (OneForOne r _) = "OneForOne (Restarted " <> show r <> " times)"
+type SupervisorSpec = Types.SupervisorSpec0 TQueue
+type Supervisor = Types.Supervisor0 TQueue
--------------------------------------------------------------------------------
--- | Smart constructor which offers a default throttling based on
--- fibonacci numbers.
-oneForOne :: RestartStrategy
-oneForOne = OneForOne 0 $ fibonacciBackoff 100
-
--- $new
--- In order to create a new supervisor, you need a `SupervisorSpec`,
--- which can be acquired by a call to `newSupervisor`:
+type Child = Types.Child_ TQueue
--------------------------------------------------------------------------------
-- | Creates a new 'SupervisorSpec'. The reason it doesn't return a
-- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the
-- supervisor thread.
newSupervisorSpec :: IO SupervisorSpec
-newSupervisorSpec = do
- tkn <- newTChanIO
- evt <- newTBQueueIO 1000
- ref <- newIORef Map.empty
- return $ Supervisor_ Nothing ref tkn evt
+newSupervisorSpec = Types.newSupervisorSpec 0
-- $supervise
--------------------------------------------------------------------------------
newSupervisor :: SupervisorSpec -> IO Supervisor
-newSupervisor spec = forkIO (handleEvents spec) >>= \tid -> do
- mbx <- atomically $ dupTChan (_sp_mailbox spec)
- return Supervisor_ {
- _sp_myTid = Just tid
- , _sp_mailbox = mbx
- , _sp_children = _sp_children spec
- , _sp_eventStream = _sp_eventStream spec
- }
-
--- $log
-
---------------------------------------------------------------------------------
--- | Gives you access to the event this supervisor is generating, allowing you
--- to react. It's using a bounded queue to explicitly avoid memory leaks in case
--- you do not want to drain the queue to listen to incoming events.
-eventStream :: Supervisor -> TBQueue SupervisionEvent
-eventStream (Supervisor_ _ _ _ e) = e
-
---------------------------------------------------------------------------------
--- | Returns the number of active threads at a given moment in time.
-activeChildren :: Supervisor -> IO Int
-activeChildren (Supervisor_ _ chRef _ _) = do
- readIORef chRef >>= return . length . Map.keys
-
--- $shutdown
-
---------------------------------------------------------------------------------
--- | Shutdown the given supervisor. This will cause the supervised children to
--- be killed as well. To do so, we explore the children tree, killing workers as we go,
--- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`.
-shutdownSupervisor :: Supervisor -> IO ()
-shutdownSupervisor (Supervisor_ sId chRef _ _) = do
- case sId of
- Nothing -> return ()
- Just tid -> do
- chMap <- readIORef chRef
- processChildren (Map.toList chMap)
- killThread tid
- where
- processChildren [] = return ()
- processChildren (x:xs) = do
- case x of
- (tid, Worker _ _) -> killThread tid
- (_, Supvsr _ s) -> shutdownSupervisor s
- processChildren xs
-
--- $fork
-
---------------------------------------------------------------------------------
--- | Fork a thread in a supervised mode.
-forkSupervised :: Supervisor
- -- ^ The 'Supervisor'
- -> RestartStrategy
- -- ^ The 'RestartStrategy' to use
- -> IO ()
- -- ^ The computation to run
- -> IO ThreadId
-forkSupervised sup@Supervisor_{..} str act =
- bracket (supervised sup act) return $ \newChild -> do
- let ch = Worker str (const (supervised sup act))
- atomicModifyIORef' _sp_children $ \chMap -> (Map.insert newChild ch chMap, ())
- now <- getCurrentTime
- writeIfNotFull _sp_eventStream (ChildBorn newChild now)
- return newChild
-
---------------------------------------------------------------------------------
-writeIfNotFull :: TBQueue SupervisionEvent -> SupervisionEvent -> IO ()
-writeIfNotFull q evt = atomically $ do
- isFull <- isFullTBQueue q
- unless isFull $ writeTBQueue q evt
-
---------------------------------------------------------------------------------
-supervised :: Supervisor -> IO () -> IO ThreadId
-supervised Supervisor_{..} act = forkFinally act $ \res -> case res of
- Left ex -> bracket myThreadId return $ \myId -> atomically $
- writeTChan _sp_mailbox (DeadLetter myId ex)
- Right _ -> bracket myThreadId return $ \myId -> do
- now <- getCurrentTime
- atomicModifyIORef' _sp_children $ \chMap -> (Map.delete myId chMap, ())
- writeIfNotFull _sp_eventStream (ChildFinished myId now)
-
---------------------------------------------------------------------------------
-handleEvents :: SupervisorSpec -> IO ()
-handleEvents sp@(Supervisor_ myId myChildren myMailbox myStream) = do
- (DeadLetter newDeath ex) <- atomically $ readTChan myMailbox
- now <- getCurrentTime
- writeIfNotFull myStream (ChildDied newDeath ex now)
- -- If we catch an `AsyncException`, we have nothing but good
- -- reasons not to restart the thread.
- case typeOf ex == (typeOf (undefined :: AsyncException)) of
- True -> handleEvents sp
- False -> do
- chMap <- readIORef myChildren
- case Map.lookup newDeath chMap of
- Nothing -> return ()
- Just (Worker str act) ->
- applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
- let ch = Worker newStr act
- newThreadId <- act newDeath
- writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
- writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now)
- Just (Supvsr str s@(Supervisor_ _ mbx cld es)) ->
- applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
- let node = Supervisor_ myId myChildren myMailbox myStream
- let ch = (Supvsr newStr s)
- newThreadId <- supervised node (handleEvents $ Supervisor_ Nothing mbx cld es)
- writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
- writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now)
- handleEvents sp
- where
- applyStrategy :: RestartStrategy
- -> (RestartStrategy -> IO ())
- -> (RestartStrategy -> IO ())
- -> IO ()
- applyStrategy (OneForOne currentRestarts retryPol) ifAbort ifThrottle = do
- let newStr = OneForOne (currentRestarts + 1) retryPol
- case getRetryPolicy retryPol (currentRestarts + 1) of
- Nothing -> ifAbort newStr
- Just delay -> threadDelay delay >> ifThrottle newStr
-
--- $monitor
-
-newtype MonitorRequest = MonitoredSupervision ThreadId deriving (Show, Typeable)
-
-instance Exception MonitorRequest
-
---------------------------------------------------------------------------------
--- | Monitor another supervisor. To achieve these, we simulate a new 'DeadLetter',
--- so that the first supervisor will effectively restart the monitored one.
--- Thanks to the fact that for the supervisor the restart means we just copy over
--- its internal state, it should be perfectly fine to do so.
-monitor :: Supervisor -> Supervisor -> IO ()
-monitor (Supervisor_ _ _ mbox _) (Supervisor_ mbId _ _ _) = do
- case mbId of
- Nothing -> return ()
- Just tid -> atomically $
- writeTChan mbox (DeadLetter tid (toException $ MonitoredSupervision tid))
+newSupervisor spec = Types.newSupervisor spec
diff --git a/src/Control/Concurrent/Supervisor/Bounded.hs b/src/Control/Concurrent/Supervisor/Bounded.hs
new file mode 100644
index 0000000..925d6e4
--- /dev/null
+++ b/src/Control/Concurrent/Supervisor/Bounded.hs
@@ -0,0 +1,56 @@
+{-|
+ This module offers a `Bounded` supervisor variant,
+ where `SupervisionEvent`(s) are written on a `TBQueue`,
+ and simply discarded if the queue is full.
+-}
+
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE DeriveDataTypeable #-}
+
+module Control.Concurrent.Supervisor.Bounded
+ ( SupervisorSpec
+ , Supervisor
+ , Child
+ , newSupervisorSpec
+ , newSupervisorSpecBounded
+ , newSupervisor
+ , module T
+ ) where
+
+import Control.Concurrent.Supervisor.Types as T hiding (newSupervisor, newSupervisorSpec)
+import qualified Control.Concurrent.Supervisor.Types as Types
+import Control.Concurrent.STM
+
+type SupervisorSpec = Types.SupervisorSpec0 TBQueue
+type Supervisor = Types.Supervisor0 TBQueue
+
+--------------------------------------------------------------------------------
+type Child = Types.Child_ TBQueue
+
+--------------------------------------------------------------------------------
+-- | Creates a new 'SupervisorSpec'. The reason it doesn't return a
+-- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the
+-- supervisor thread.
+newSupervisorSpec :: IO SupervisorSpec
+newSupervisorSpec = Types.newSupervisorSpec defaultEventQueueSize
+
+--------------------------------------------------------------------------------
+-- | Like 'newSupervisorSpec', but give the user control over the size of the
+-- event queue.
+newSupervisorSpecBounded :: Int -> IO SupervisorSpec
+newSupervisorSpecBounded = Types.newSupervisorSpec
+
+--------------------------------------------------------------------------------
+-- | The default size of the queue where `SupervisionEvent`(s) are written.
+defaultEventQueueSize :: Int
+defaultEventQueueSize = 10000
+
+-- $supervise
+
+--------------------------------------------------------------------------------
+newSupervisor :: SupervisorSpec -> IO Supervisor
+newSupervisor spec = Types.newSupervisor spec
diff --git a/src/Control/Concurrent/Supervisor/Tutorial.hs b/src/Control/Concurrent/Supervisor/Tutorial.hs
index c02b72f..e89ef3f 100644
--- a/src/Control/Concurrent/Supervisor/Tutorial.hs
+++ b/src/Control/Concurrent/Supervisor/Tutorial.hs
@@ -29,6 +29,9 @@ module Control.Concurrent.Supervisor.Tutorial
-- * Creating a Supervisor
-- $createSupervisor
+ -- * Bounded vs Unbounded
+ -- $boundedVsUnbounded
+
-- * Supervising and choosing a 'RestartStrategy'
-- $supervising
@@ -49,7 +52,7 @@ module Control.Concurrent.Supervisor.Tutorial
--
-- To mitigate this, we have a couple of libraries available, for example
-- <http://hackage.haskell.org/package/async> and <http://hackage.haskell.org/package/slave-thread>.
---
+--
-- But what about if I do not want to take explicit action, but instead specifying upfront
-- how to react to disaster, and leave the library work out the details?
-- This is what this library aims to do.
@@ -102,6 +105,14 @@ module Control.Concurrent.Supervisor.Tutorial
-- immediately after doing so, a new thread will be started, monitoring any subsequent IO actions
-- submitted to it.
+-- $boundedVsUnbounded
+-- By default, it's programmer responsibility to read the `SupervisionEvent` the library writes
+-- into its internal queue. If you do not do so, your program might leak. To mitigate this, and
+-- to offer a more granular control, two different modules are provided: a `Bounded` and an
+-- `Unbounded` one, which use, respectively, a `TBQueue` or a `TQueue` underneath. You can decide
+-- to go with the bounded version, with a queue size enforced by the library author, or pass in
+-- your own size.
+
-- $supervising
-- Let's wrap everything together into a full blown example:
--
diff --git a/src/Control/Concurrent/Supervisor/Types.hs b/src/Control/Concurrent/Supervisor/Types.hs
new file mode 100644
index 0000000..866b69f
--- /dev/null
+++ b/src/Control/Concurrent/Supervisor/Types.hs
@@ -0,0 +1,277 @@
+{-
+ Humble module inspired to Erlang supervisors,
+ with minimal dependencies.
+-}
+
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE DeriveDataTypeable #-}
+
+module Control.Concurrent.Supervisor.Types
+ ( SupervisorSpec0
+ , Supervisor0
+ , Child_
+ , DeadLetter
+ , RestartAction
+ , SupervisionEvent(..)
+ , RestartStrategy(..)
+ -- * Creating a new supervisor spec
+ -- $new
+ , newSupervisorSpec
+ -- * Creating a new supervisor
+ -- $sup
+ , newSupervisor
+ -- * Restart Strategies
+ , oneForOne
+ -- * Stopping a supervisor
+ -- $shutdown
+ , shutdownSupervisor
+ -- * Accessing Supervisor event log
+ -- $log
+ , eventStream
+ , activeChildren
+ -- * Supervise a forked thread
+ -- $fork
+ , forkSupervised
+ -- * Monitor another supervisor
+ -- $monitor
+ , monitor
+ ) where
+
+import qualified Data.HashMap.Strict as Map
+import Control.Concurrent
+import Control.Concurrent.STM
+import Data.IORef
+import Control.Exception
+import Data.Typeable
+import Control.Monad
+import Control.Retry
+import Data.Time
+
+--------------------------------------------------------------------------------
+data Uninitialised
+data Initialised
+
+--------------------------------------------------------------------------------
+data Supervisor_ q a = Supervisor_ {
+ _sp_myTid :: !(Maybe ThreadId)
+ , _sp_children :: !(IORef (Map.HashMap ThreadId (Child_ q)))
+ , _sp_mailbox :: TChan DeadLetter
+ , _sp_eventStream :: q SupervisionEvent
+ }
+
+type SupervisorSpec0 q = Supervisor_ q Uninitialised
+type Supervisor0 q = Supervisor_ q Initialised
+
+class QueueLike q where
+ newQueueIO :: Int -> IO (q a)
+ readQueue :: q a -> STM a
+ writeQueue :: q a -> a -> STM ()
+
+instance QueueLike TQueue where
+ newQueueIO = const newTQueueIO
+ readQueue = readTQueue
+ writeQueue = writeTQueue
+
+instance QueueLike TBQueue where
+ newQueueIO = newTBQueueIO
+ readQueue = readTBQueue
+ writeQueue q e = do
+ isFull <- isFullTBQueue q
+ unless isFull $ writeTBQueue q e
+
+--------------------------------------------------------------------------------
+data DeadLetter = DeadLetter ThreadId SomeException
+
+--------------------------------------------------------------------------------
+data Child_ q = Worker !RestartStrategy RestartAction
+ | Supvsr !RestartStrategy !(Supervisor_ q Initialised)
+
+--------------------------------------------------------------------------------
+type RestartAction = ThreadId -> IO ThreadId
+
+--------------------------------------------------------------------------------
+data SupervisionEvent =
+ ChildBorn !ThreadId !UTCTime
+ | ChildDied !ThreadId !SomeException !UTCTime
+ | ChildRestarted !ThreadId !ThreadId !RestartStrategy !UTCTime
+ | ChildRestartLimitReached !ThreadId !RestartStrategy !UTCTime
+ | ChildFinished !ThreadId !UTCTime
+ deriving Show
+
+--------------------------------------------------------------------------------
+-- | Erlang inspired strategies. At the moment only the 'OneForOne' is
+-- implemented.
+data RestartStrategy =
+ OneForOne !Int RetryPolicy
+
+instance Show RestartStrategy where
+ show (OneForOne r _) = "OneForOne (Restarted " <> show r <> " times)"
+
+--------------------------------------------------------------------------------
+-- | Smart constructor which offers a default throttling based on
+-- fibonacci numbers.
+oneForOne :: RestartStrategy
+oneForOne = OneForOne 0 $ fibonacciBackoff 100
+
+-- $new
+-- In order to create a new supervisor, you need a `SupervisorSpec`,
+-- which can be acquired by a call to `newSupervisor`:
+
+
+--------------------------------------------------------------------------------
+-- | Creates a new 'SupervisorSpec'. The reason it doesn't return a
+-- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the
+-- supervisor thread.
+newSupervisorSpec :: QueueLike q => Int -> IO (SupervisorSpec0 q)
+newSupervisorSpec size = do
+ tkn <- newTChanIO
+ evt <- newQueueIO size
+ ref <- newIORef Map.empty
+ return $ Supervisor_ Nothing ref tkn evt
+
+-- $supervise
+
+--------------------------------------------------------------------------------
+newSupervisor :: QueueLike q => SupervisorSpec0 q -> IO (Supervisor0 q)
+newSupervisor spec = forkIO (handleEvents spec) >>= \tid -> do
+ mbx <- atomically $ dupTChan (_sp_mailbox spec)
+ return Supervisor_ {
+ _sp_myTid = Just tid
+ , _sp_mailbox = mbx
+ , _sp_children = _sp_children spec
+ , _sp_eventStream = _sp_eventStream spec
+ }
+
+-- $log
+
+--------------------------------------------------------------------------------
+-- | Gives you access to the event this supervisor is generating, allowing you
+-- to react. It's using a bounded queue to explicitly avoid memory leaks in case
+-- you do not want to drain the queue to listen to incoming events.
+eventStream :: QueueLike q => Supervisor0 q -> q SupervisionEvent
+eventStream (Supervisor_ _ _ _ e) = e
+
+--------------------------------------------------------------------------------
+-- | Returns the number of active threads at a given moment in time.
+activeChildren :: QueueLike q => Supervisor0 q -> IO Int
+activeChildren (Supervisor_ _ chRef _ _) = do
+ readIORef chRef >>= return . length . Map.keys
+
+-- $shutdown
+
+--------------------------------------------------------------------------------
+-- | Shutdown the given supervisor. This will cause the supervised children to
+-- be killed as well. To do so, we explore the children tree, killing workers as we go,
+-- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`.
+shutdownSupervisor :: QueueLike q => Supervisor0 q -> IO ()
+shutdownSupervisor (Supervisor_ sId chRef _ _) = do
+ case sId of
+ Nothing -> return ()
+ Just tid -> do
+ chMap <- readIORef chRef
+ processChildren (Map.toList chMap)
+ killThread tid
+ where
+ processChildren [] = return ()
+ processChildren (x:xs) = do
+ case x of
+ (tid, Worker _ _) -> killThread tid
+ (_, Supvsr _ s) -> shutdownSupervisor s
+ processChildren xs
+
+-- $fork
+
+--------------------------------------------------------------------------------
+-- | Fork a thread in a supervised mode.
+forkSupervised :: QueueLike q
+ => Supervisor0 q
+ -- ^ The 'Supervisor'
+ -> RestartStrategy
+ -- ^ The 'RestartStrategy' to use
+ -> IO ()
+ -- ^ The computation to run
+ -> IO ThreadId
+forkSupervised sup@Supervisor_{..} str act =
+ bracket (supervised sup act) return $ \newChild -> do
+ let ch = Worker str (const (supervised sup act))
+ atomicModifyIORef' _sp_children $ \chMap -> (Map.insert newChild ch chMap, ())
+ now <- getCurrentTime
+ atomically $ writeQueue _sp_eventStream (ChildBorn newChild now)
+ return newChild
+
+--------------------------------------------------------------------------------
+supervised :: QueueLike q => Supervisor0 q -> IO () -> IO ThreadId
+supervised Supervisor_{..} act = forkFinally act $ \res -> case res of
+ Left ex -> bracket myThreadId return $ \myId -> atomically $
+ writeTChan _sp_mailbox (DeadLetter myId ex)
+ Right _ -> bracket myThreadId return $ \myId -> do
+ now <- getCurrentTime
+ atomicModifyIORef' _sp_children $ \chMap -> (Map.delete myId chMap, ())
+ atomically $ writeQueue _sp_eventStream (ChildFinished myId now)
+
+--------------------------------------------------------------------------------
+handleEvents :: QueueLike q => SupervisorSpec0 q -> IO ()
+handleEvents sp@(Supervisor_ myId myChildren myMailbox myStream) = do
+ (DeadLetter newDeath ex) <- atomically $ readTChan myMailbox
+ now <- getCurrentTime
+ atomically $ writeQueue myStream (ChildDied newDeath ex now)
+ -- If we catch an `AsyncException`, we have nothing but good
+ -- reasons not to restart the thread.
+ -- Note to the skeptical: It's perfectly fine do put `undefined` here,
+ -- as `typeOf` does not inspect the content (try in GHCi!)
+ case typeOf ex == (typeOf (undefined :: AsyncException)) of
+ True -> handleEvents sp
+ False -> do
+ chMap <- readIORef myChildren
+ case Map.lookup newDeath chMap of
+ Nothing -> return ()
+ Just (Worker str act) ->
+ applyStrategy str (\newStr -> do
+ atomically $
+ writeQueue myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
+ let ch = Worker newStr act
+ newThreadId <- act newDeath
+ writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
+ atomically $ writeQueue myStream (ChildRestarted newDeath newThreadId newStr now)
+ Just (Supvsr str s@(Supervisor_ _ mbx cld es)) ->
+ applyStrategy str (\newStr -> do
+ atomically $
+ writeQueue myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
+ let node = Supervisor_ myId myChildren myMailbox myStream
+ let ch = (Supvsr newStr s)
+ newThreadId <- supervised node (handleEvents $ Supervisor_ Nothing mbx cld es)
+ writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
+ atomically $ writeQueue myStream (ChildRestarted newDeath newThreadId newStr now)
+ handleEvents sp
+ where
+ applyStrategy :: RestartStrategy
+ -> (RestartStrategy -> IO ())
+ -> (RestartStrategy -> IO ())
+ -> IO ()
+ applyStrategy (OneForOne currentRestarts retryPol) ifAbort ifThrottle = do
+ let newStr = OneForOne (currentRestarts + 1) retryPol
+ case getRetryPolicy retryPol (currentRestarts + 1) of
+ Nothing -> ifAbort newStr
+ Just delay -> threadDelay delay >> ifThrottle newStr
+
+-- $monitor
+
+newtype MonitorRequest = MonitoredSupervision ThreadId deriving (Show, Typeable)
+
+instance Exception MonitorRequest
+
+--------------------------------------------------------------------------------
+-- | Monitor another supervisor. To achieve these, we simulate a new 'DeadLetter',
+-- so that the first supervisor will effectively restart the monitored one.
+-- Thanks to the fact that for the supervisor the restart means we just copy over
+-- its internal state, it should be perfectly fine to do so.
+monitor :: QueueLike q => Supervisor0 q -> Supervisor0 q -> IO ()
+monitor (Supervisor_ _ _ mbox _) (Supervisor_ mbId _ _ _) = do
+ case mbId of
+ Nothing -> return ()
+ Just tid -> atomically $
+ writeTChan mbox (DeadLetter tid (toException $ MonitoredSupervision tid))
diff --git a/test/Main.hs b/test/Main.hs
index 1bf24af..f1f7580 100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -6,6 +6,7 @@ import Test.Tasty.HUnit
import Test.Tasty.QuickCheck
import Test.QuickCheck.Monadic
import Tests
+import qualified Tests.Bounded as B
--------------------------------------------------------------------------------
@@ -22,11 +23,18 @@ withQuickCheckDepth tn depth tests =
--------------------------------------------------------------------------------
allTests :: TestTree
allTests = testGroup "All Tests" [
- withQuickCheckDepth "Control.Concurrent.Supervisor" 10 [
+ withQuickCheckDepth "Control.Concurrent.Supervisor" 20 [
testProperty "1 supervised thread, no exceptions" (monadicIO test1SupThreadNoEx)
, testProperty "1 supervised thread, premature exception" (monadicIO test1SupThreadPrematureDemise)
, testProperty "killing spree" (monadicIO testKillingSpree)
, testProperty "cleanup" (monadicIO testSupCleanup)
, testCase "too many restarts" testTooManyRestarts
]
+ , withQuickCheckDepth "Control.Concurrent.Supervisor.Bounded" 20 [
+ testProperty "1 supervised thread, no exceptions" (monadicIO B.test1SupThreadNoEx)
+ , testProperty "1 supervised thread, premature exception" (monadicIO B.test1SupThreadPrematureDemise)
+ , testProperty "killing spree" (monadicIO B.testKillingSpree)
+ , testProperty "cleanup" (monadicIO B.testSupCleanup)
+ , testCase "too many restarts" B.testTooManyRestarts
+ ]
]
diff --git a/test/Tests.hs b/test/Tests.hs
index 09b3300..6316a58 100644
--- a/test/Tests.hs
+++ b/test/Tests.hs
@@ -76,9 +76,9 @@ assertActiveThreads sup p = do
QM.assert (p ac)
--------------------------------------------------------------------------------
-qToList :: TBQueue SupervisionEvent -> IO [SupervisionEvent]
+qToList :: TQueue SupervisionEvent -> IO [SupervisionEvent]
qToList q = do
- nextEl <- atomically (tryReadTBQueue q)
+ nextEl <- atomically (tryReadTQueue q)
case nextEl of
(Just el) -> (el :) <$> qToList q
Nothing -> return []
diff --git a/test/Tests/Bounded.hs b/test/Tests/Bounded.hs
new file mode 100644
index 0000000..e40e75e
--- /dev/null
+++ b/test/Tests/Bounded.hs
@@ -0,0 +1,207 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+module Tests.Bounded where
+
+import Test.Tasty.HUnit as HUnit
+import Test.Tasty.QuickCheck
+import Test.QuickCheck.Monadic as QM
+import qualified Data.List as List
+import Control.Monad
+import Control.Retry
+import Control.Monad.Trans.Class
+import Control.Applicative
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Exception
+import Control.Concurrent.Supervisor.Bounded
+
+--------------------------------------------------------------------------------
+type IOProperty = PropertyM IO
+
+-- How much a thread will live.
+newtype TTL = TTL Int deriving Show
+
+-- | Generate a random thread live time between 0.5 sec and 2 secs.
+randomLiveTime :: Gen Int
+randomLiveTime = choose (500000, 2000000)
+
+instance Arbitrary TTL where
+ arbitrary = TTL <$> randomLiveTime
+
+data ThreadAction =
+ Live
+ | DieAfter TTL --natural death
+ | ThrowAfter TTL
+ deriving Show
+
+instance Arbitrary ThreadAction where
+ arbitrary = do
+ act <- elements [const Live, DieAfter, ThrowAfter]
+ ttl <- arbitrary
+ return $ act ttl
+
+-- We cannot easily deal with async exceptions
+-- being thrown at us.
+data ExecutionPlan = ExecutionPlan {
+ toSpawn :: Int
+ , actions :: [ThreadAction]
+ } deriving Show
+
+instance Arbitrary ExecutionPlan where
+ arbitrary = do
+ ts <- choose (1,20)
+ acts <- vectorOf ts arbitrary
+ return $ ExecutionPlan ts acts
+
+--------------------------------------------------------------------------------
+howManyRestarted :: ExecutionPlan -> Int
+howManyRestarted (ExecutionPlan _ acts) = length . filter pred_ $ acts
+ where
+ pred_ (ThrowAfter _) = True
+ pred_ _ = False
+
+--------------------------------------------------------------------------------
+howManyLiving :: ExecutionPlan -> Int
+howManyLiving (ExecutionPlan _ acts) = length . filter pred_ $ acts
+ where
+ pred_ Live = True
+ pred_ _ = False
+
+--------------------------------------------------------------------------------
+assertActiveThreads :: Supervisor -> (Int -> Bool) -> IOProperty ()
+assertActiveThreads sup p = do
+ ac <- lift (activeChildren sup)
+ QM.assert (p ac)
+
+--------------------------------------------------------------------------------
+qToList :: TBQueue SupervisionEvent -> IO [SupervisionEvent]
+qToList q = do
+ nextEl <- atomically (tryReadTBQueue q)
+ case nextEl of
+ (Just el) -> (el :) <$> qToList q
+ Nothing -> return []
+
+--------------------------------------------------------------------------------
+assertContainsNMsg :: (SupervisionEvent -> Bool)
+ -> Int
+ -> [SupervisionEvent]
+ -> IO ()
+assertContainsNMsg _ 0 _ = HUnit.assertBool "" True
+assertContainsNMsg _ x [] = do
+ HUnit.assertBool ("assertContainsNMsg: list exhausted and " ++ show x ++ " left.") False
+assertContainsNMsg matcher !n (x:xs) = case matcher x of
+ True -> assertContainsNMsg matcher (n - 1) xs
+ False -> assertContainsNMsg matcher n xs
+
+--------------------------------------------------------------------------------
+assertContainsNRestartMsg :: Int -> [SupervisionEvent] -> IOProperty ()
+assertContainsNRestartMsg n e = lift $ assertContainsNMsg matches n e
+ where
+ matches (ChildRestarted{}) = True
+ matches _ = False
+
+--------------------------------------------------------------------------------
+assertContainsNFinishedMsg :: Int -> [SupervisionEvent] -> IOProperty ()
+assertContainsNFinishedMsg n e = lift $ assertContainsNMsg matches n e
+ where
+ matches (ChildFinished{}) = True
+ matches _ = False
+
+--------------------------------------------------------------------------------
+assertContainsNLimitReached :: Int -> [SupervisionEvent] -> IO ()
+assertContainsNLimitReached = assertContainsNMsg matches
+ where
+ matches (ChildRestartLimitReached{}) = True
+ matches _ = False
+
+--------------------------------------------------------------------------------
+assertContainsRestartMsg :: [SupervisionEvent] -> ThreadId -> IOProperty ()
+assertContainsRestartMsg [] _ = QM.assert False
+assertContainsRestartMsg (x:xs) tid = case x of
+ ((ChildRestarted old _ _ _)) ->
+ if old == tid then QM.assert True else assertContainsRestartMsg xs tid
+ _ -> assertContainsRestartMsg xs tid
+
+--------------------------------------------------------------------------------
+-- Control.Concurrent.Supervisor tests
+test1SupThreadNoEx :: IOProperty ()
+test1SupThreadNoEx = forAllM randomLiveTime $ \ttl -> do
+ supSpec <- lift newSupervisorSpec
+ sup <- lift $ newSupervisor supSpec
+ _ <- lift (forkSupervised sup oneForOne (forever $ threadDelay ttl))
+ assertActiveThreads sup (== 1)
+ lift $ shutdownSupervisor sup
+
+--------------------------------------------------------------------------------
+test1SupThreadPrematureDemise :: IOProperty ()
+test1SupThreadPrematureDemise = forAllM randomLiveTime $ \ttl -> do
+ supSpec <- lift newSupervisorSpec
+ sup <- lift $ newSupervisor supSpec
+ tid <- lift (forkSupervised sup oneForOne (forever $ threadDelay ttl))
+ lift $ do
+ throwTo tid (AssertionFailed "You must die")
+ threadDelay ttl --give time to restart the thread
+ assertActiveThreads sup (== 1)
+ q <- lift $ qToList (eventStream sup)
+ assertContainsNRestartMsg 1 q
+ lift $ shutdownSupervisor sup
+
+--------------------------------------------------------------------------------
+fromAction :: Supervisor -> ThreadAction -> IO ThreadId
+fromAction s Live = forkSupervised s oneForOne (forever $ threadDelay 100000000)
+fromAction s (DieAfter (TTL ttl)) = forkSupervised s oneForOne (threadDelay ttl)
+fromAction s (ThrowAfter (TTL ttl)) = forkSupervised s oneForOne (do
+ threadDelay ttl
+ throwIO $ AssertionFailed "die")
+
+--------------------------------------------------------------------------------
+maxWait :: [ThreadAction] -> Int
+maxWait ta = go ta []
+ where
+ go [] [] = 0
+ go [] acc = List.maximum acc
+ go (Live:xs) acc = go xs acc
+ go ((DieAfter (TTL t)):xs) acc = go xs (t : acc)
+ go ((ThrowAfter (TTL t)):xs) acc = go xs (t : acc)
+
+--------------------------------------------------------------------------------
+-- In this test, we generate random IO actions for the threads to be
+-- executed, then we calculate how many of them needs to be alive after all
+-- the side effects strikes.
+testKillingSpree :: IOProperty ()
+testKillingSpree = forAllM arbitrary $ \ep@(ExecutionPlan _ acts) -> do
+ supSpec <- lift newSupervisorSpec
+ sup <- lift $ newSupervisor supSpec
+ _ <- forM acts $ lift . fromAction sup
+ lift (threadDelay $ maxWait acts * 2)
+ q <- lift $ qToList (eventStream sup)
+ assertActiveThreads sup (>= howManyLiving ep)
+ assertContainsNRestartMsg (howManyRestarted ep) q
+ lift $ shutdownSupervisor sup
+
+--------------------------------------------------------------------------------
+-- In this test, we test that the supervisor does not leak memory by removing
+-- children who finished
+testSupCleanup :: IOProperty ()
+testSupCleanup = forAllM (vectorOf 100 arbitrary) $ \ttls -> do
+ let acts = map DieAfter ttls
+ supSpec <- lift newSupervisorSpec
+ sup <- lift $ newSupervisor supSpec
+ _ <- forM acts $ lift . fromAction sup
+ lift (threadDelay $ maxWait acts * 2)
+ q <- lift $ qToList (eventStream sup)
+ assertActiveThreads sup (== 0)
+ assertContainsNFinishedMsg (length acts) q
+ lift $ shutdownSupervisor sup
+
+testTooManyRestarts :: Assertion
+testTooManyRestarts = do
+ supSpec <- newSupervisorSpec
+ sup <- newSupervisor supSpec
+ _ <- forkSupervised sup (OneForOne 0 $ limitRetries 5) $ error "die"
+ threadDelay 2000000
+ q <- qToList (eventStream sup)
+ assertContainsNLimitReached 1 q
+ shutdownSupervisor sup
diff --git a/threads-supervisor.cabal b/threads-supervisor.cabal
index 6fbe01d..cba1e73 100644
--- a/threads-supervisor.cabal
+++ b/threads-supervisor.cabal
@@ -1,5 +1,5 @@
name: threads-supervisor
-version: 1.0.3.0
+version: 1.0.4.0
synopsis: Simple, IO-based library for Erlang-style thread supervision
description: Simple, IO-based library for Erlang-style thread supervision
license: MIT
@@ -21,17 +21,18 @@ flag prof
library
exposed-modules:
Control.Concurrent.Supervisor
+ Control.Concurrent.Supervisor.Bounded
+ Control.Concurrent.Supervisor.Types
Control.Concurrent.Supervisor.Tutorial
build-depends:
- base >= 4.6 && < 5,
- unordered-containers >= 0.2.0.0 && < 0.3.0.0,
- retry >= 0.5 && < 0.8,
+ base >= 4.6 && < 6,
+ unordered-containers >= 0.2.0.0 && < 0.5.0.0,
+ retry >= 0.5 && < 0.10,
stm >= 2.4,
time >= 1.2
- hs-source-dirs:
- src
+ hs-source-dirs: src
if flag(prof)
- ghc-options: -fprof-auto -rtsopts
+ ghc-options: -fprof-auto -rtsopts -auto-all -caf-all
default-language: Haskell2010
ghc-options:
-Wall
@@ -39,23 +40,31 @@ library
executable threads-supervisor-example
build-depends:
- base >= 4.6 && < 5,
+ base >= 4.6 && < 6,
threads-supervisor -any,
- unordered-containers >= 0.2.0.0 && < 0.3.0.0,
+ unordered-containers >= 0.2.0.0 && < 0.5.0.0,
stm >= 2.4,
time >= 1.2
hs-source-dirs:
examples
main-is:
Main.hs
- if flag(prof)
- ghc-options: -fprof-auto -rtsopts
default-language: Haskell2010
- ghc-options:
- -Wall
- -threaded
- "-with-rtsopts=-N"
- -funbox-strict-fields
+ if flag(prof)
+ ghc-options:
+ -Wall
+ -fprof-auto
+ -rtsopts
+ -auto-all
+ -caf-all
+ -threaded
+ "-with-rtsopts=-N -K1K"
+ else
+ ghc-options:
+ -Wall
+ -threaded
+ "-with-rtsopts=-N"
+ -funbox-strict-fields
test-suite threads-supervisor-tests
type:
@@ -64,13 +73,23 @@ test-suite threads-supervisor-tests
Main.hs
other-modules:
Tests
+ Tests.Bounded
hs-source-dirs:
test
default-language:
Haskell2010
- ghc-options:
- -threaded
- "-with-rtsopts=-N"
+ if flag(prof)
+ ghc-options:
+ -fprof-auto
+ -rtsopts
+ -auto-all
+ -caf-all
+ -threaded
+ "-with-rtsopts=-N -K1K"
+ else
+ ghc-options:
+ -threaded
+ "-with-rtsopts=-N"
build-depends:
threads-supervisor -any
, base