summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikolajKonarski <>2019-04-15 15:13:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-04-15 15:13:00 (GMT)
commit339586a80ea9286f61c5dda7f6de8a3583448a3d (patch)
tree2ffe20c6a851e540fc439fd5c67bd5d29ddc634a
parent5d1c0733b4c4f14aebdb5fca0070cfe9ae8480c7 (diff)
version 1.2.0.1HEAD1.2.0.1master
-rw-r--r--[-rwxr-xr-x]LICENSE0
-rw-r--r--[-rwxr-xr-x]Setup.hs0
-rw-r--r--[-rwxr-xr-x]examples/Main.hs17
-rw-r--r--[-rwxr-xr-x]src/Control/Concurrent/Supervisor.hs27
-rw-r--r--[-rwxr-xr-x]src/Control/Concurrent/Supervisor/Bounded.hs37
-rw-r--r--[-rwxr-xr-x]src/Control/Concurrent/Supervisor/Tutorial.hs39
-rw-r--r--[-rwxr-xr-x]src/Control/Concurrent/Supervisor/Types.hs345
-rw-r--r--[-rwxr-xr-x]test/Main.hs2
-rw-r--r--[-rwxr-xr-x]test/Tests.hs64
-rw-r--r--[-rwxr-xr-x]test/Tests/Bounded.hs15
-rw-r--r--[-rwxr-xr-x]threads-supervisor.cabal8
11 files changed, 315 insertions, 239 deletions
diff --git a/LICENSE b/LICENSE
index 7da15ba..7da15ba 100755..100644
--- a/LICENSE
+++ b/LICENSE
diff --git a/Setup.hs b/Setup.hs
index 9a994af..9a994af 100755..100644
--- a/Setup.hs
+++ b/Setup.hs
diff --git a/examples/Main.hs b/examples/Main.hs
index b22183b..99db1b4 100755..100644
--- a/examples/Main.hs
+++ b/examples/Main.hs
@@ -2,10 +2,10 @@
module Main where
-import Control.Concurrent.Supervisor
import Control.Concurrent
-import Control.Exception
import Control.Concurrent.STM
+import Control.Concurrent.Supervisor
+import Control.Exception
job1 :: IO ()
job1 = do
@@ -30,12 +30,11 @@ job5 = threadDelay 100 >> error "dead"
main :: IO ()
main = bracketOnError (do
- supSpec <- newSupervisorSpec OneForOne
-
- sup1 <- newSupervisor supSpec
- sup2 <- newSupervisor supSpec
+ sup1 <- newSupervisor OneForOne
+ sup2 <- newSupervisor OneForOne
- sup1 `monitor` sup2
+ sup2ThreadId <- monitorWith fibonacciRetryPolicy sup1 sup2
+ putStrLn $ "Supervisor 2 has ThreadId: " ++ show sup2ThreadId
_ <- forkSupervised sup2 fibonacciRetryPolicy job3
@@ -44,9 +43,11 @@ main = bracketOnError (do
_ <- forkSupervised sup1 fibonacciRetryPolicy job4
_ <- forkSupervised sup1 fibonacciRetryPolicy job5
_ <- forkIO (go (eventStream sup1))
+ -- We kill sup2
+ throwTo sup2ThreadId (AssertionFailed "sup2, die please.")
return sup1) shutdownSupervisor (\_ -> threadDelay 10000000000)
where
go eS = do
- newE <- atomically $ readTQueue eS
+ newE <- atomically $ readQueue eS
print newE
go eS
diff --git a/src/Control/Concurrent/Supervisor.hs b/src/Control/Concurrent/Supervisor.hs
index 7b2fe70..f710634 100755..100644
--- a/src/Control/Concurrent/Supervisor.hs
+++ b/src/Control/Concurrent/Supervisor.hs
@@ -6,38 +6,27 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
-{-# LANGUAGE DeriveDataTypeable #-}
module Control.Concurrent.Supervisor
- ( SupervisorSpec
- , Supervisor
+ ( Supervisor
, Child
- , newSupervisorSpec
, newSupervisor
, module T
) where
-import Control.Concurrent.Supervisor.Types as T hiding (newSupervisor, newSupervisorSpec)
-import qualified Control.Concurrent.Supervisor.Types as Types
import Control.Concurrent.STM
+import Control.Concurrent.Supervisor.Types as T hiding (Supervisor, newSupervisor)
+import qualified Control.Concurrent.Supervisor.Types as Types
-type SupervisorSpec = Types.SupervisorSpec0 TQueue
-type Supervisor = Types.Supervisor0 TQueue
+type Supervisor = Types.Supervisor TQueue
--------------------------------------------------------------------------------
type Child = Types.Child_ TQueue
--------------------------------------------------------------------------------
--- | Creates a new 'SupervisorSpec'. The reason it doesn't return a
--- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the
--- supervisor thread.
-newSupervisorSpec :: Types.RestartStrategy -> IO SupervisorSpec
-newSupervisorSpec strategy = Types.newSupervisorSpec strategy 0
+-- NOTE: The `maxBound` value will be ignore by the underlying implementation.
+newSupervisor :: RestartStrategy -> IO Supervisor
+newSupervisor str = Types.newSupervisor str 9223372036854775807
--- $supervise
-
---------------------------------------------------------------------------------
-newSupervisor :: SupervisorSpec -> IO Supervisor
-newSupervisor spec = Types.newSupervisor spec
+-- Arbitrary number that we could choose in a better way, was maxBound :: Int
diff --git a/src/Control/Concurrent/Supervisor/Bounded.hs b/src/Control/Concurrent/Supervisor/Bounded.hs
index 57dd50a..641d786 100755..100644
--- a/src/Control/Concurrent/Supervisor/Bounded.hs
+++ b/src/Control/Concurrent/Supervisor/Bounded.hs
@@ -7,50 +7,31 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
-{-# LANGUAGE DeriveDataTypeable #-}
module Control.Concurrent.Supervisor.Bounded
- ( SupervisorSpec
- , Supervisor
+ ( Supervisor
, Child
- , newSupervisorSpec
- , newSupervisorSpecBounded
, newSupervisor
+ , defaultEventQueueSize
, module T
) where
-import Control.Concurrent.Supervisor.Types as T hiding (newSupervisor, newSupervisorSpec)
-import qualified Control.Concurrent.Supervisor.Types as Types
import Control.Concurrent.STM
+import Control.Concurrent.Supervisor.Types as T hiding (Supervisor, newSupervisor)
+import qualified Control.Concurrent.Supervisor.Types as Types
+import Numeric.Natural
-type SupervisorSpec = Types.SupervisorSpec0 TBQueue
-type Supervisor = Types.Supervisor0 TBQueue
+type Supervisor = Types.Supervisor TBQueue
--------------------------------------------------------------------------------
type Child = Types.Child_ TBQueue
--------------------------------------------------------------------------------
--- | Creates a new 'SupervisorSpec'. The reason it doesn't return a
--- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the
--- supervisor thread.
-newSupervisorSpec :: Types.RestartStrategy -> IO SupervisorSpec
-newSupervisorSpec strategy = Types.newSupervisorSpec strategy defaultEventQueueSize
-
---------------------------------------------------------------------------------
--- | Like 'newSupervisorSpec', but give the user control over the size of the
--- event queue.
-newSupervisorSpecBounded :: Types.RestartStrategy -> Int -> IO SupervisorSpec
-newSupervisorSpecBounded = Types.newSupervisorSpec
-
---------------------------------------------------------------------------------
-- | The default size of the queue where `SupervisionEvent`(s) are written.
-defaultEventQueueSize :: Int
+defaultEventQueueSize :: Natural
defaultEventQueueSize = 10000
--- $supervise
-
--------------------------------------------------------------------------------
-newSupervisor :: SupervisorSpec -> IO Supervisor
-newSupervisor spec = Types.newSupervisor spec
+newSupervisor :: RestartStrategy -> Natural -> IO Supervisor
+newSupervisor = Types.newSupervisor
diff --git a/src/Control/Concurrent/Supervisor/Tutorial.hs b/src/Control/Concurrent/Supervisor/Tutorial.hs
index a7c6840..68900a3 100755..100644
--- a/src/Control/Concurrent/Supervisor/Tutorial.hs
+++ b/src/Control/Concurrent/Supervisor/Tutorial.hs
@@ -23,9 +23,6 @@ module Control.Concurrent.Supervisor.Tutorial
-- * Different type of jobs
-- $jobs
- -- * Creating a SupervisorSpec
- -- $createSpec
-
-- * Creating a Supervisor
-- $createSupervisor
@@ -87,22 +84,11 @@ module Control.Concurrent.Supervisor.Tutorial
--
-- These jobs represent a significant pool of our everyday computations in the IO monad
--- $createSpec
--- A 'SupervisorSpec' simply holds the state of our supervision, and can be safely shared
--- between supervisors. Under the hood, both the `SupervisorSpec` and the `Supervisor`
--- share the same structure; in fact, they are just type synonyms:
---
--- > type SupervisorSpec = Supervisor_ Uninitialised
--- > type Supervisor = Supervisor_ Initialised
--- The important difference though, is that the `SupervisorSpec` does not imply the creation
--- of an asynchronous thread, which the latter does. To keep separated the initialisation
--- of the data structure from the logic of supervising, we use phantom types to
--- force you create a spec first.
--- Creating a spec it just a matter of calling `newSupervisorSpec`.
-
-- $createSupervisor
--- Creating a 'Supervisor' from a 'SupervisionSpec', is as simple as calling `newSupervisor`.
--- immediately after doing so, a new thread will be started, monitoring any subsequent IO actions
+-- Creating a 'Supervisor' is as simple as calling `newSupervisor`, specifying the `RestartStrategy`
+-- you want to use as well as the size of the `EventStream` (this depends whether you are using a Bounded
+-- supervisor or not).
+-- Immediately after doing so, a new thread will be started, monitoring any subsequent IO actions
-- submitted to it.
-- $boundedVsUnbounded
@@ -118,12 +104,11 @@ module Control.Concurrent.Supervisor.Tutorial
--
-- > main :: IO ()
-- > main = bracketOnError (do
--- > supSpec <- newSupervisorSpec OneForOne
-- >
--- > sup1 <- newSupervisor supSpec
--- > sup2 <- newSupervisor supSpec
+-- > sup1 <- newSupervisor OneForOne
+-- > sup2 <- newSupervisor OneForOne
-- >
--- > sup1 `monitor` sup2
+-- > monitorWith fibonacciRetryPolicy sup1 sup2
-- >
-- > _ <- forkSupervised sup2 fibonacciRetryPolicy job3
-- >
@@ -138,14 +123,12 @@ module Control.Concurrent.Supervisor.Tutorial
-- > print newE
-- > go eS
--
--- What we have done here, was to spawn our supervisor out from a spec,
--- any using our swiss knife `forkSupervised` to spawn for supervised
+-- What we have done here, was to spawn two supervisors and we have used
+-- our swiss knife `forkSupervised` to spawn four supervised
-- IO computations. As you can see, if we partially apply `forkSupervised`,
-- its type resemble `forkIO` one; this is by design, as we want to keep
--- this API as IO-friendly as possible
--- in the very same example, we also create another supervisor
--- (from the same spec, but you can create a separate one as well)
--- and we ask the first supervisor to monitor the second one.
+-- this API as IO-friendly as possible.
+-- Note also how we can ask the first supervisor to monitor the second one.
--
-- `fibonacciRetryPolicy` is a constructor for the `RetryPolicy`, which creates
-- under the hood a `RetryPolicy` from the "retry" package which is using
diff --git a/src/Control/Concurrent/Supervisor/Types.hs b/src/Control/Concurrent/Supervisor/Types.hs
index defcaaa..b594241 100755..100644
--- a/src/Control/Concurrent/Supervisor/Types.hs
+++ b/src/Control/Concurrent/Supervisor/Types.hs
@@ -4,26 +4,24 @@
-}
{-# LANGUAGE GADTs #-}
+{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
-{-# LANGUAGE DeriveDataTypeable #-}
module Control.Concurrent.Supervisor.Types
- ( SupervisorSpec0
- , Supervisor0
+ ( SupervisionCtx
+ , Supervisor
, QueueLike(..)
, Child_
, DeadLetter
, RestartAction
, SupervisionEvent(..)
, RestartStrategy(..)
- -- * Creating a new supervisor spec
- -- $new
- , newSupervisorSpec
+ , RestartResult(..)
-- * Creating a new supervisor
- -- $sup
+ -- $new
, newSupervisor
-- * Restart Policies
, fibonacciRetryPolicy
@@ -39,37 +37,43 @@ module Control.Concurrent.Supervisor.Types
, forkSupervised
-- * Monitor another supervisor
-- $monitor
- , monitor
+ , monitorWith
) where
-import qualified Data.HashMap.Strict as Map
import Control.Concurrent
import Control.Concurrent.STM
-import Data.IORef
import Control.Exception
-import Data.Typeable
import Control.Monad
+import Control.Monad.IO.Class
import Control.Retry
+import qualified Data.HashMap.Strict as Map
+import Data.IORef
import Data.Time
+import Numeric.Natural
+import System.Clock (Clock(Monotonic), TimeSpec, getTime)
--------------------------------------------------------------------------------
-data Uninitialised
-data Initialised
+type Mailbox = TChan DeadLetter
--------------------------------------------------------------------------------
-data Supervisor_ q a = Supervisor_ {
- _sp_myTid :: !(Maybe ThreadId)
- , _sp_strategy :: !RestartStrategy
- , _sp_children :: !(IORef (Map.HashMap ThreadId (Child_ q)))
- , _sp_mailbox :: TChan DeadLetter
- , _sp_eventStream :: q SupervisionEvent
- }
+data SupervisionCtx q = SupervisionCtx {
+ _sc_mailbox :: Mailbox
+ , _sc_parent_mailbox :: !(IORef (Maybe Mailbox))
+ -- ^ The mailbox of the parent process (which is monitoring this one), if any.
+ , _sc_children :: !(IORef (Map.HashMap ThreadId (Child_ q)))
+ , _sc_eventStream :: q SupervisionEvent
+ , _sc_eventStreamSize :: !Natural
+ , _sc_strategy :: !RestartStrategy
+ }
-type SupervisorSpec0 q = Supervisor_ q Uninitialised
-type Supervisor0 q = Supervisor_ q Initialised
+--------------------------------------------------------------------------------
+data Supervisor q = Supervisor {
+ _sp_myTid :: !ThreadId
+ , _sp_ctx :: !(SupervisionCtx q)
+ }
class QueueLike q where
- newQueueIO :: Int -> IO (q a)
+ newQueueIO :: Natural -> IO (q a)
readQueue :: q a -> STM a
writeQueue :: q a -> a -> STM ()
@@ -86,11 +90,26 @@ instance QueueLike TBQueue where
unless isFull $ writeTBQueue q e
--------------------------------------------------------------------------------
-data DeadLetter = DeadLetter ThreadId SomeException
+data DeadLetter = DeadLetter !LetterEpoch !ThreadId !SomeException
--------------------------------------------------------------------------------
-data Child_ q = Worker !RetryStatus (RetryPolicyM IO) RestartAction
- | Supvsr !RetryStatus (RetryPolicyM IO) !(Supervisor_ q Initialised)
+type Epoch = TimeSpec
+newtype LetterEpoch = LetterEpoch Epoch deriving Show
+newtype ChildEpoch = ChildEpoch Epoch deriving Show
+
+--------------------------------------------------------------------------------
+data RestartResult =
+ Restarted !ThreadId !ThreadId !RetryStatus !UTCTime
+ -- ^ The supervised `Child_` was restarted successfully.
+ | StaleDeadLetter !ThreadId !LetterEpoch !ChildEpoch !UTCTime
+ -- ^ A stale `DeadLetter` was received.
+ | RestartFailed SupervisionEvent
+ -- ^ The restart failed for a reason decribed by a `SupervisionEvent`
+ deriving Show
+
+--------------------------------------------------------------------------------
+data Child_ q = Worker !ChildEpoch !RetryStatus (RetryPolicyM IO) RestartAction
+ | Supvsr !ChildEpoch !RetryStatus (RetryPolicyM IO) !(Supervisor q)
--------------------------------------------------------------------------------
type RestartAction = ThreadId -> IO ThreadId
@@ -100,6 +119,8 @@ data SupervisionEvent =
ChildBorn !ThreadId !UTCTime
| ChildDied !ThreadId !SomeException !UTCTime
| ChildRestarted !ThreadId !ThreadId !RetryStatus !UTCTime
+ | ChildNotFound !ThreadId !UTCTime
+ | StaleDeadLetterReceived !ThreadId !LetterEpoch !ChildEpoch !UTCTime
| ChildRestartLimitReached !ThreadId !RetryStatus !UTCTime
| ChildFinished !ThreadId !UTCTime
deriving Show
@@ -116,35 +137,56 @@ data RestartStrategy = OneForOne
fibonacciRetryPolicy :: RetryPolicyM IO
fibonacciRetryPolicy = fibonacciBackoff 100
+--------------------------------------------------------------------------------
+getEpoch :: MonadIO m => m Epoch
+getEpoch = liftIO $ getTime Monotonic
+
+--------------------------------------------------------------------------------
+tryNotifyParent :: IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
+tryNotifyParent mbPMbox myId ex = do
+ readIORef mbPMbox >>= \m -> case m of
+ Nothing -> return ()
+ Just m' -> do
+ e <- getEpoch
+ atomically $ writeTChan m' (DeadLetter (LetterEpoch e) myId ex)
+
-- $new
-- In order to create a new supervisor, you need a `SupervisorSpec`,
-- which can be acquired by a call to `newSupervisor`:
-
---------------------------------------------------------------------------------
--- | Creates a new 'SupervisorSpec'. The reason it doesn't return a
--- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the
--- supervisor thread.
-newSupervisorSpec :: QueueLike q => RestartStrategy -> Int -> IO (SupervisorSpec0 q)
-newSupervisorSpec strategy size = do
- tkn <- newTChanIO
- evt <- newQueueIO size
- ref <- newIORef Map.empty
- return $ Supervisor_ Nothing strategy ref tkn evt
-
-- $supervise
--------------------------------------------------------------------------------
-newSupervisor :: QueueLike q => SupervisorSpec0 q -> IO (Supervisor0 q)
-newSupervisor spec = forkIO (handleEvents spec) >>= \tid -> do
- mbx <- atomically $ dupTChan (_sp_mailbox spec)
- return Supervisor_ {
- _sp_myTid = Just tid
- , _sp_strategy = _sp_strategy spec
- , _sp_mailbox = mbx
- , _sp_children = _sp_children spec
- , _sp_eventStream = _sp_eventStream spec
- }
+newSupervisor :: QueueLike q
+ => RestartStrategy
+ -> Natural
+ -> IO (Supervisor q)
+newSupervisor strategy size = do
+ parentMbx <- newIORef Nothing
+ mbx <- newTChanIO
+ es <- newQueueIO size
+ cld <- newIORef Map.empty
+ let ctx = SupervisionCtx {
+ _sc_mailbox = mbx
+ , _sc_parent_mailbox = parentMbx
+ , _sc_eventStream = es
+ , _sc_children = cld
+ , _sc_strategy = strategy
+ , _sc_eventStreamSize = size
+ }
+ tid <- forkFinally (handleEvents ctx) $ \res -> case res of
+ Left ex -> do
+ bracket myThreadId return $ \myId -> do
+ -- If we have a parent supervisor watching us, notify it we died.
+ tryNotifyParent parentMbx myId ex
+ Right v -> return v
+ go ctx tid
+ where
+ go ctx tid = do
+ return Supervisor {
+ _sp_myTid = tid
+ , _sp_ctx = ctx
+ }
-- $log
@@ -152,14 +194,14 @@ newSupervisor spec = forkIO (handleEvents spec) >>= \tid -> do
-- | Gives you access to the event this supervisor is generating, allowing you
-- to react. It's using a bounded queue to explicitly avoid memory leaks in case
-- you do not want to drain the queue to listen to incoming events.
-eventStream :: QueueLike q => Supervisor0 q -> q SupervisionEvent
-eventStream (Supervisor_ _ _ _ _ e) = e
+eventStream :: QueueLike q => Supervisor q -> q SupervisionEvent
+eventStream Supervisor{_sp_ctx} = _sc_eventStream _sp_ctx
--------------------------------------------------------------------------------
-- | Returns the number of active threads at a given moment in time.
-activeChildren :: QueueLike q => Supervisor0 q -> IO Int
-activeChildren (Supervisor_ _ _ chRef _ _) = do
- readIORef chRef >>= return . length . Map.keys
+activeChildren :: QueueLike q => Supervisor q -> IO Int
+activeChildren Supervisor{_sp_ctx} = do
+ readIORef (_sc_children _sp_ctx) >>= return . length . Map.keys
-- $shutdown
@@ -167,20 +209,17 @@ activeChildren (Supervisor_ _ _ chRef _ _) = do
-- | Shutdown the given supervisor. This will cause the supervised children to
-- be killed as well. To do so, we explore the children tree, killing workers as we go,
-- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`.
-shutdownSupervisor :: QueueLike q => Supervisor0 q -> IO ()
-shutdownSupervisor (Supervisor_ sId _ chRef _ _) = do
- case sId of
- Nothing -> return ()
- Just tid -> do
- chMap <- readIORef chRef
- processChildren (Map.toList chMap)
- killThread tid
+shutdownSupervisor :: QueueLike q => Supervisor q -> IO ()
+shutdownSupervisor (Supervisor tid ctx) = do
+ chMap <- readIORef (_sc_children ctx)
+ processChildren (Map.toList chMap)
+ killThread tid
where
processChildren [] = return ()
processChildren (x:xs) = do
case x of
- (tid, Worker _ _ _) -> killThread tid
- (_, Supvsr _ _ s) -> shutdownSupervisor s
+ (workerTid, Worker{}) -> killThread workerTid
+ (_, Supvsr _ _ _ s) -> shutdownSupervisor s
processChildren xs
-- $fork
@@ -188,108 +227,160 @@ shutdownSupervisor (Supervisor_ sId _ chRef _ _) = do
--------------------------------------------------------------------------------
-- | Fork a thread in a supervised mode.
forkSupervised :: QueueLike q
- => Supervisor0 q
+ => Supervisor q
-- ^ The 'Supervisor'
-> RetryPolicyM IO
-- ^ The retry policy to use
-> IO ()
-- ^ The computation to run
-> IO ThreadId
-forkSupervised sup@Supervisor_{..} policy act =
+forkSupervised sup@Supervisor{..} policy act =
bracket (supervised sup act) return $ \newChild -> do
- let ch = Worker defaultRetryStatus policy (const (supervised sup act))
- atomicModifyIORef' _sp_children $ \chMap -> (Map.insert newChild ch chMap, ())
+ e <- getEpoch
+ let ch = Worker (ChildEpoch e) defaultRetryStatus policy (const (supervised sup act))
+ atomicModifyIORef' (_sc_children _sp_ctx) $ \chMap -> (Map.insert newChild ch chMap, ())
now <- getCurrentTime
- atomically $ writeQueue _sp_eventStream (ChildBorn newChild now)
+ atomically $ writeQueue (_sc_eventStream _sp_ctx) (ChildBorn newChild now)
return newChild
--------------------------------------------------------------------------------
-supervised :: QueueLike q => Supervisor0 q -> IO () -> IO ThreadId
-supervised Supervisor_{..} act = forkFinally act $ \res -> case res of
- Left ex -> bracket myThreadId return $ \myId -> atomically $
- writeTChan _sp_mailbox (DeadLetter myId ex)
+supervised :: QueueLike q => Supervisor q -> IO () -> IO ThreadId
+supervised Supervisor{..} act = forkFinally act $ \res -> case res of
+ Left ex -> bracket myThreadId return $ \myId -> do
+ e <- getEpoch
+ atomically $ writeTChan (_sc_mailbox _sp_ctx) (DeadLetter (LetterEpoch e) myId ex)
Right _ -> bracket myThreadId return $ \myId -> do
now <- getCurrentTime
- atomicModifyIORef' _sp_children $ \chMap -> (Map.delete myId chMap, ())
- atomically $ writeQueue _sp_eventStream (ChildFinished myId now)
+ atomicModifyIORef' (_sc_children _sp_ctx) $ \chMap -> (Map.delete myId chMap, ())
+ atomically $ writeQueue (_sc_eventStream _sp_ctx) (ChildFinished myId now)
+
+--------------------------------------------------------------------------------
+-- | Ignore any stale `DeadLetter`, which is a `DeadLetter` with an `Epoch`
+-- smaller than the one stored in the `Child_` to restart. Such stale `DeadLetter`
+-- are simply ignored.
+ignoringStaleLetters :: ThreadId
+ -> LetterEpoch
+ -> ChildEpoch
+ -> IO RestartResult
+ -> IO RestartResult
+ignoringStaleLetters tid deadLetterEpoch@(LetterEpoch l) childEpoch@(ChildEpoch c) act = do
+ now <- getCurrentTime
+ if l < c then return (StaleDeadLetter tid deadLetterEpoch childEpoch now) else act
-restartChild :: QueueLike q => SupervisorSpec0 q -> UTCTime -> ThreadId -> IO Bool
-restartChild (Supervisor_ myId myStrategy myChildren myMailbox myStream) now newDeath = do
- chMap <- readIORef myChildren
+--------------------------------------------------------------------------------
+restartChild :: QueueLike q
+ => SupervisionCtx q
+ -> LetterEpoch
+ -> UTCTime
+ -> ThreadId
+ -> IO RestartResult
+restartChild ctx deadLetterEpoch now newDeath = do
+ chMap <- readIORef (_sc_children ctx)
case Map.lookup newDeath chMap of
- Nothing -> return False
- Just (Worker rState rPolicy act) ->
+ Nothing -> return $ RestartFailed (ChildNotFound newDeath now)
+ Just (Worker workerEpoch rState rPolicy act) -> ignoringStaleLetters newDeath deadLetterEpoch workerEpoch $ do
runRetryPolicy rState rPolicy emitEventChildRestartLimitReached $ \newRState -> do
- let ch = Worker newRState rPolicy act
+ e <- getEpoch
+ let ch = Worker (ChildEpoch e) newRState rPolicy act
newThreadId <- act newDeath
- writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
- emitEventChildRestarted newThreadId newRState
- Just (Supvsr rState rPolicy s@(Supervisor_ _ str mbx cld es)) ->
- runRetryPolicy rState rPolicy emitEventChildRestartLimitReached $ \newRState -> do
- let node = Supervisor_ myId myStrategy myChildren myMailbox myStream
- let ch = (Supvsr newRState rPolicy s)
- -- TODO: shutdown children?
- newThreadId <- supervised node (handleEvents $ Supervisor_ Nothing str mbx cld es)
- writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
+ writeIORef (_sc_children ctx) (Map.insert newThreadId ch $! Map.delete newDeath chMap)
emitEventChildRestarted newThreadId newRState
+ Just (Supvsr supervisorEpoch rState rPolicy (Supervisor deathSup ctx')) -> do
+ ignoringStaleLetters newDeath deadLetterEpoch supervisorEpoch $ do
+ runRetryPolicy rState rPolicy emitEventChildRestartLimitReached $ \newRState -> do
+ e <- getEpoch
+ restartedSup <- newSupervisor (_sc_strategy ctx) (_sc_eventStreamSize ctx')
+ let ch = Supvsr (ChildEpoch e) newRState rPolicy restartedSup
+ -- TODO: shutdown children?
+ let newThreadId = _sp_myTid restartedSup
+ writeIORef (_sc_children ctx) (Map.insert newThreadId ch $! Map.delete deathSup chMap)
+ emitEventChildRestarted newThreadId newRState
where
- emitEventChildRestarted newThreadId newRState = atomically $
- writeQueue myStream (ChildRestarted newDeath newThreadId newRState now)
- emitEventChildRestartLimitReached newRState = atomically $
- writeQueue myStream (ChildRestartLimitReached newDeath newRState now)
+ emitEventChildRestarted newThreadId newRState = do
+ return $ Restarted newDeath newThreadId newRState now
+ emitEventChildRestartLimitReached newRState = do
+ return $ RestartFailed (ChildRestartLimitReached newDeath newRState now)
runRetryPolicy :: RetryStatus
- -> RetryPolicyM IO
- -> (RetryStatus -> IO ())
- -> (RetryStatus -> IO ())
- -> IO Bool
+ -> RetryPolicyM IO
+ -> (RetryStatus -> IO RestartResult)
+ -> (RetryStatus -> IO RestartResult)
+ -> IO RestartResult
runRetryPolicy rState rPolicy ifAbort ifThrottle = do
maybeDelay <- getRetryPolicyM rPolicy rState
case maybeDelay of
- Nothing -> ifAbort rState >> return False
+ Nothing -> ifAbort rState
Just delay ->
let newRState = rState { rsIterNumber = rsIterNumber rState + 1
, rsCumulativeDelay = rsCumulativeDelay rState + delay
, rsPreviousDelay = Just (maybe 0 (const delay) (rsPreviousDelay rState))
}
- in threadDelay delay >> ifThrottle newRState >> return True
+ in threadDelay delay >> ifThrottle newRState
-restartOneForOne :: QueueLike q => SupervisorSpec0 q -> UTCTime -> ThreadId -> IO Bool
-restartOneForOne sup now newDeath = restartChild sup now newDeath
+--------------------------------------------------------------------------------
+restartOneForOne :: QueueLike q
+ => SupervisionCtx q
+ -> LetterEpoch
+ -> UTCTime
+ -> ThreadId
+ -> IO RestartResult
+restartOneForOne = restartChild
--------------------------------------------------------------------------------
-handleEvents :: QueueLike q => SupervisorSpec0 q -> IO ()
-handleEvents sup@(Supervisor_ _ myStrategy _ myMailbox myStream) = do
- (DeadLetter newDeath ex) <- atomically $ readTChan myMailbox
+handleEvents :: QueueLike q => SupervisionCtx q -> IO ()
+handleEvents ctx@SupervisionCtx{..} = do
+ (DeadLetter epoch newDeath ex) <- atomically $ readTChan _sc_mailbox
now <- getCurrentTime
- atomically $ writeQueue myStream (ChildDied newDeath ex now)
+ atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now)
-- If we catch an `AsyncException`, we have nothing but good
- -- reasons not to restart the thread.
- -- Note to the skeptical: It's perfectly fine do put `undefined` here,
- -- as `typeOf` does not inspect the content (try in GHCi!)
- case typeOf ex == (typeOf (undefined :: AsyncException)) of
- True -> handleEvents sup
- False -> do
- successful <- case myStrategy of
- OneForOne -> restartOneForOne sup now newDeath
- unless successful $ do
- -- TODO: shutdown supervisor?
- return ()
- handleEvents sup
+ -- reasons NOT to restart the thread.
+ case asyncExceptionFromException ex of
+ Just (_ :: AsyncException) -> do
+ -- Remove the `Child_` from the map, log what happenend.
+ atomicModifyIORef' _sc_children $ \chMap -> (Map.delete newDeath chMap, ())
+ atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now)
+ handleEvents ctx
+ Nothing -> do
+ restartResult <- case _sc_strategy of
+ OneForOne -> restartOneForOne ctx epoch now newDeath
+ -- TODO: shutdown supervisor?
+ atomically $ case restartResult of
+ StaleDeadLetter tid le we tm -> do
+ writeQueue _sc_eventStream (StaleDeadLetterReceived tid le we tm)
+ RestartFailed reason -> do
+ writeQueue _sc_eventStream reason
+ Restarted oldId newId rStatus tm ->
+ writeQueue _sc_eventStream (ChildRestarted oldId newId rStatus tm)
+ handleEvents ctx
-- $monitor
-newtype MonitorRequest = MonitoredSupervision ThreadId deriving (Show, Typeable)
-
-instance Exception MonitorRequest
-
--------------------------------------------------------------------------------
-- | Monitor another supervisor. To achieve these, we simulate a new 'DeadLetter',
-- so that the first supervisor will effectively restart the monitored one.
-- Thanks to the fact that for the supervisor the restart means we just copy over
-- its internal state, it should be perfectly fine to do so.
-monitor :: QueueLike q => Supervisor0 q -> Supervisor0 q -> IO ()
-monitor (Supervisor_ _ _ _ mbox _) (Supervisor_ mbId _ _ _ _) = do
- case mbId of
- Nothing -> return ()
- Just tid -> atomically $
- writeTChan mbox (DeadLetter tid (toException $ MonitoredSupervision tid))
+-- Returns the `ThreadId` of the monitored supervisor.
+monitorWith :: QueueLike q
+ => RetryPolicyM IO
+ -- ^ The retry policy to use
+ -> Supervisor q
+ -- ^ The supervisor
+ -> Supervisor q
+ -- ^ The 'supervised' supervisor
+ -> IO ThreadId
+monitorWith policy sup1 sup2 = do
+ let sup1Children = _sc_children (_sp_ctx sup1)
+ let sup1Mailbox = _sc_mailbox (_sp_ctx sup1)
+ let sup2Id = _sp_myTid sup2
+ let sup2ParentMailbox = _sc_parent_mailbox (_sp_ctx sup2)
+
+ readIORef sup2ParentMailbox >>= \mbox -> case mbox of
+ Just _ -> return sup2Id -- Do nothing, this supervisor is already being monitored.
+ Nothing -> do
+ e <- getEpoch
+ let sup2RetryStatus = defaultRetryStatus
+ let ch' = Supvsr (ChildEpoch e) sup2RetryStatus policy sup2
+ atomicModifyIORef' sup1Children $ \chMap -> (Map.insert sup2Id ch' chMap, ())
+ duped <- atomically $ dupTChan sup1Mailbox
+ atomicModifyIORef' sup2ParentMailbox $ const (Just duped, ())
+ return sup2Id
diff --git a/test/Main.hs b/test/Main.hs
index f1f7580..d7d16fd 100755..100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -25,7 +25,9 @@ allTests :: TestTree
allTests = testGroup "All Tests" [
withQuickCheckDepth "Control.Concurrent.Supervisor" 20 [
testProperty "1 supervised thread, no exceptions" (monadicIO test1SupThreadNoEx)
+ , testProperty "1 supervised thread, premature async exception" (monadicIO test1SupThreadPrematureAsyncDemise)
, testProperty "1 supervised thread, premature exception" (monadicIO test1SupThreadPrematureDemise)
+ , testProperty "1 supervised supervisor, premature exception" (monadicIO test1SupSpvrPrematureDemise)
, testProperty "killing spree" (monadicIO testKillingSpree)
, testProperty "cleanup" (monadicIO testSupCleanup)
, testCase "too many restarts" testTooManyRestarts
diff --git a/test/Tests.hs b/test/Tests.hs
index 6387098..e21bae1 100755..100644
--- a/test/Tests.hs
+++ b/test/Tests.hs
@@ -1,5 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Tests where
@@ -14,7 +13,7 @@ import Control.Monad.Trans.Class
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
-import Control.Concurrent.Supervisor
+import Control.Concurrent.Supervisor as Supervisor
--------------------------------------------------------------------------------
type IOProperty = PropertyM IO
@@ -95,31 +94,38 @@ assertContainsNMsg matcher !n (x:xs) = case matcher x of
False -> assertContainsNMsg matcher n xs
--------------------------------------------------------------------------------
+assertContainsNDiedMsg :: Int -> [SupervisionEvent] -> IOProperty ()
+assertContainsNDiedMsg n e = lift $ assertContainsNMsg matches n e
+ where
+ matches ChildDied{} = True
+ matches _ = False
+
+--------------------------------------------------------------------------------
assertContainsNRestartMsg :: Int -> [SupervisionEvent] -> IOProperty ()
assertContainsNRestartMsg n e = lift $ assertContainsNMsg matches n e
where
- matches (ChildRestarted{}) = True
+ matches ChildRestarted{} = True
matches _ = False
--------------------------------------------------------------------------------
assertContainsNFinishedMsg :: Int -> [SupervisionEvent] -> IOProperty ()
assertContainsNFinishedMsg n e = lift $ assertContainsNMsg matches n e
where
- matches (ChildFinished{}) = True
+ matches ChildFinished{} = True
matches _ = False
--------------------------------------------------------------------------------
assertContainsNLimitReached :: Int -> [SupervisionEvent] -> IO ()
assertContainsNLimitReached = assertContainsNMsg matches
where
- matches (ChildRestartLimitReached{}) = True
+ matches ChildRestartLimitReached{} = True
matches _ = False
--------------------------------------------------------------------------------
assertContainsRestartMsg :: [SupervisionEvent] -> ThreadId -> IOProperty ()
assertContainsRestartMsg [] _ = QM.assert False
assertContainsRestartMsg (x:xs) tid = case x of
- ((ChildRestarted old _ _ _)) ->
+ (ChildRestarted old _ _ _) ->
if old == tid then QM.assert True else assertContainsRestartMsg xs tid
_ -> assertContainsRestartMsg xs tid
@@ -127,17 +133,31 @@ assertContainsRestartMsg (x:xs) tid = case x of
-- Control.Concurrent.Supervisor tests
test1SupThreadNoEx :: IOProperty ()
test1SupThreadNoEx = forAllM randomLiveTime $ \ttl -> do
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne
_ <- lift (forkSupervised sup fibonacciRetryPolicy (forever $ threadDelay ttl))
assertActiveThreads sup (== 1)
lift $ shutdownSupervisor sup
--------------------------------------------------------------------------------
+test1SupThreadPrematureAsyncDemise :: IOProperty ()
+test1SupThreadPrematureAsyncDemise = forAllM randomLiveTime $ \ttl -> do
+ sup <- lift $ newSupervisor OneForOne
+ tid <- lift (forkSupervised sup fibonacciRetryPolicy (forever $ threadDelay ttl))
+ lift $ do
+ throwTo tid ThreadKilled
+ threadDelay ttl
+ -- Due to the fact an `AsyncException` was thrown, the thread shouldn't have been
+ -- restarted.
+ assertActiveThreads sup (== 0)
+ q <- lift $ qToList (eventStream sup)
+ assertContainsNRestartMsg 0 q
+ assertContainsNDiedMsg 1 q
+ lift $ shutdownSupervisor sup
+
+--------------------------------------------------------------------------------
test1SupThreadPrematureDemise :: IOProperty ()
test1SupThreadPrematureDemise = forAllM randomLiveTime $ \ttl -> do
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne
tid <- lift (forkSupervised sup fibonacciRetryPolicy (forever $ threadDelay ttl))
lift $ do
throwTo tid (AssertionFailed "You must die")
@@ -148,6 +168,21 @@ test1SupThreadPrematureDemise = forAllM randomLiveTime $ \ttl -> do
lift $ shutdownSupervisor sup
--------------------------------------------------------------------------------
+test1SupSpvrPrematureDemise :: IOProperty ()
+test1SupSpvrPrematureDemise = forAllM randomLiveTime $ \ttl -> do
+ sup1 <- lift $ newSupervisor OneForOne
+ sup2 <- lift $ newSupervisor OneForOne
+ tid <- lift $ Supervisor.monitorWith fibonacciRetryPolicy sup1 sup2
+ lift $ do
+ throwTo tid (AssertionFailed "You must die")
+ threadDelay ttl --give time to restart the thread
+ assertActiveThreads sup1 (== 1)
+ q <- lift $ qToList (eventStream sup1)
+ assertContainsNRestartMsg 1 q
+ lift $ shutdownSupervisor sup1
+ -- TODO: Assert sup2 has been shutdown as result.
+
+--------------------------------------------------------------------------------
fromAction :: Supervisor -> ThreadAction -> IO ThreadId
fromAction s Live = forkSupervised s fibonacciRetryPolicy (forever $ threadDelay 100000000)
fromAction s (DieAfter (TTL ttl)) = forkSupervised s fibonacciRetryPolicy (threadDelay ttl)
@@ -171,8 +206,7 @@ maxWait ta = go ta []
-- the side effects strikes.
testKillingSpree :: IOProperty ()
testKillingSpree = forAllM arbitrary $ \ep@(ExecutionPlan _ acts) -> do
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne
_ <- forM acts $ lift . fromAction sup
lift (threadDelay $ maxWait acts * 2)
q <- lift $ qToList (eventStream sup)
@@ -186,8 +220,7 @@ testKillingSpree = forAllM arbitrary $ \ep@(ExecutionPlan _ acts) -> do
testSupCleanup :: IOProperty ()
testSupCleanup = forAllM (vectorOf 100 arbitrary) $ \ttls -> do
let acts = map DieAfter ttls
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne
_ <- forM acts $ lift . fromAction sup
lift (threadDelay $ maxWait acts * 2)
q <- lift $ qToList (eventStream sup)
@@ -197,8 +230,7 @@ testSupCleanup = forAllM (vectorOf 100 arbitrary) $ \ttls -> do
testTooManyRestarts :: Assertion
testTooManyRestarts = do
- supSpec <- newSupervisorSpec OneForOne
- sup <- newSupervisor supSpec
+ sup <- newSupervisor OneForOne
_ <- forkSupervised sup (limitRetries 5) $ error "die"
threadDelay 2000000
q <- qToList (eventStream sup)
diff --git a/test/Tests/Bounded.hs b/test/Tests/Bounded.hs
index a7f7bb2..624704f 100755..100644
--- a/test/Tests/Bounded.hs
+++ b/test/Tests/Bounded.hs
@@ -127,8 +127,7 @@ assertContainsRestartMsg (x:xs) tid = case x of
-- Control.Concurrent.Supervisor tests
test1SupThreadNoEx :: IOProperty ()
test1SupThreadNoEx = forAllM randomLiveTime $ \ttl -> do
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne 1000
_ <- lift (forkSupervised sup fibonacciRetryPolicy (forever $ threadDelay ttl))
assertActiveThreads sup (== 1)
lift $ shutdownSupervisor sup
@@ -136,8 +135,7 @@ test1SupThreadNoEx = forAllM randomLiveTime $ \ttl -> do
--------------------------------------------------------------------------------
test1SupThreadPrematureDemise :: IOProperty ()
test1SupThreadPrematureDemise = forAllM randomLiveTime $ \ttl -> do
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne 1000
tid <- lift (forkSupervised sup fibonacciRetryPolicy (forever $ threadDelay ttl))
lift $ do
throwTo tid (AssertionFailed "You must die")
@@ -171,8 +169,7 @@ maxWait ta = go ta []
-- the side effects strikes.
testKillingSpree :: IOProperty ()
testKillingSpree = forAllM arbitrary $ \ep@(ExecutionPlan _ acts) -> do
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne 1000
_ <- forM acts $ lift . fromAction sup
lift (threadDelay $ maxWait acts * 2)
q <- lift $ qToList (eventStream sup)
@@ -186,8 +183,7 @@ testKillingSpree = forAllM arbitrary $ \ep@(ExecutionPlan _ acts) -> do
testSupCleanup :: IOProperty ()
testSupCleanup = forAllM (vectorOf 100 arbitrary) $ \ttls -> do
let acts = map DieAfter ttls
- supSpec <- lift $ newSupervisorSpec OneForOne
- sup <- lift $ newSupervisor supSpec
+ sup <- lift $ newSupervisor OneForOne 1000
_ <- forM acts $ lift . fromAction sup
lift (threadDelay $ maxWait acts * 2)
q <- lift $ qToList (eventStream sup)
@@ -197,8 +193,7 @@ testSupCleanup = forAllM (vectorOf 100 arbitrary) $ \ttls -> do
testTooManyRestarts :: Assertion
testTooManyRestarts = do
- supSpec <- newSupervisorSpec OneForOne
- sup <- newSupervisor supSpec
+ sup <- newSupervisor OneForOne defaultEventQueueSize
_ <- forkSupervised sup (limitRetries 5) $ error "die"
threadDelay 2000000
q <- qToList (eventStream sup)
diff --git a/threads-supervisor.cabal b/threads-supervisor.cabal
index dc9e7fc..df1a0e5 100755..100644
--- a/threads-supervisor.cabal
+++ b/threads-supervisor.cabal
@@ -1,5 +1,5 @@
name: threads-supervisor
-version: 1.1.0.0
+version: 1.2.0.1
synopsis: Simple, IO-based library for Erlang-style thread supervision
description: Simple, IO-based library for Erlang-style thread supervision
license: MIT
@@ -26,13 +26,15 @@ library
Control.Concurrent.Supervisor.Tutorial
build-depends:
base >= 4.6 && < 6,
+ clock >= 0.6,
unordered-containers >= 0.2.0.0 && < 0.5.0.0,
retry >= 0.7 && < 0.10,
- stm >= 2.4,
+ transformers >= 0.4 && < 0.6,
+ stm >= 2.5,
time >= 1.2
hs-source-dirs: src
if flag(prof)
- ghc-options: -fprof-auto -rtsopts -auto-all -caf-all
+ ghc-options: -fprof-auto -auto-all -caf-all
default-language: Haskell2010
ghc-options:
-Wall