summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlfredoDiNapoli <>2015-03-10 12:02:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2015-03-10 12:02:00 (GMT)
commitd4d4c20525bd8c6bcbc6ad9c55cb5d07f13fa4eb (patch)
tree082f06d6017c87a4449fe65ebefe1ef656b9ba14
parentb10c443c6c3b336997197152152ada20e5d8d748 (diff)
version 1.0.3.01.0.3.0
-rw-r--r--examples/Main.hs52
-rw-r--r--src/Control/Concurrent/Supervisor.hs56
-rw-r--r--src/Control/Concurrent/Supervisor/Tutorial.hs20
-rw-r--r--test/Main.hs2
-rw-r--r--test/Tests.hs38
-rw-r--r--threads-supervisor.cabal29
6 files changed, 165 insertions, 32 deletions
diff --git a/examples/Main.hs b/examples/Main.hs
new file mode 100644
index 0000000..470e246
--- /dev/null
+++ b/examples/Main.hs
@@ -0,0 +1,52 @@
+{-# LANGUAGE ScopedTypeVariables #-}
+
+module Main where
+
+import Control.Concurrent.Supervisor
+import Control.Concurrent
+import Control.Exception
+import Control.Concurrent.STM
+
+job1 :: IO ()
+job1 = do
+ threadDelay 5000000
+ fail "Dead"
+
+job2 :: ThreadId -> IO ()
+job2 tid = do
+ threadDelay 3000000
+ killThread tid
+
+job3 :: IO ()
+job3 = do
+ threadDelay 5000000
+ error "Oh boy, I'm good as dead"
+
+job4 :: IO ()
+job4 = threadDelay 7000000
+
+job5 :: IO ()
+job5 = threadDelay 100 >> error "dead"
+
+main :: IO ()
+main = bracketOnError (do
+ supSpec <- newSupervisorSpec
+
+ sup1 <- newSupervisor supSpec
+ sup2 <- newSupervisor supSpec
+
+ sup1 `monitor` sup2
+
+ _ <- forkSupervised sup2 oneForOne job3
+
+ j1 <- forkSupervised sup1 oneForOne job1
+ _ <- forkSupervised sup1 oneForOne (job2 j1)
+ _ <- forkSupervised sup1 oneForOne job4
+ _ <- forkSupervised sup1 oneForOne job5
+ _ <- forkIO (go (eventStream sup1))
+ return sup1) shutdownSupervisor (\_ -> threadDelay 10000000000)
+ where
+ go eS = do
+ newE <- atomically $ readTBQueue eS
+ print newE
+ go eS
diff --git a/src/Control/Concurrent/Supervisor.hs b/src/Control/Concurrent/Supervisor.hs
index 24ce060..e6e1f84 100644
--- a/src/Control/Concurrent/Supervisor.hs
+++ b/src/Control/Concurrent/Supervisor.hs
@@ -6,7 +6,6 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE DeriveDataTypeable #-}
@@ -24,6 +23,8 @@ module Control.Concurrent.Supervisor
-- * Creating a new supervisor
-- $sup
, newSupervisor
+ -- * Restart Strategies
+ , oneForOne
-- * Stopping a supervisor
-- $shutdown
, shutdownSupervisor
@@ -43,9 +44,10 @@ import qualified Data.HashMap.Strict as Map
import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
-import Data.Typeable
import Control.Exception
+import Data.Typeable
import Control.Monad
+import Control.Retry
import Data.Time
--------------------------------------------------------------------------------
@@ -78,6 +80,7 @@ data SupervisionEvent =
ChildBorn !ThreadId !UTCTime
| ChildDied !ThreadId !SomeException !UTCTime
| ChildRestarted !ThreadId !ThreadId !RestartStrategy !UTCTime
+ | ChildRestartLimitReached !ThreadId !RestartStrategy !UTCTime
| ChildFinished !ThreadId !UTCTime
deriving Show
@@ -85,8 +88,16 @@ data SupervisionEvent =
-- | Erlang inspired strategies. At the moment only the 'OneForOne' is
-- implemented.
data RestartStrategy =
- OneForOne
- deriving Show
+ OneForOne !Int RetryPolicy
+
+instance Show RestartStrategy where
+ show (OneForOne r _) = "OneForOne (Restarted " <> show r <> " times)"
+
+--------------------------------------------------------------------------------
+-- | Smart constructor which offers a default throttling based on
+-- fibonacci numbers.
+oneForOne :: RestartStrategy
+oneForOne = OneForOne 0 $ fibonacciBackoff 100
-- $new
-- In order to create a new supervisor, you need a `SupervisorSpec`,
@@ -194,25 +205,38 @@ handleEvents sp@(Supervisor_ myId myChildren myMailbox myStream) = do
(DeadLetter newDeath ex) <- atomically $ readTChan myMailbox
now <- getCurrentTime
writeIfNotFull myStream (ChildDied newDeath ex now)
- case asyncExceptionFromException ex of
- Just ThreadKilled -> handleEvents sp
- _ -> do
+ -- If we catch an `AsyncException`, we have nothing but good
+ -- reasons not to restart the thread.
+ case typeOf ex == (typeOf (undefined :: AsyncException)) of
+ True -> handleEvents sp
+ False -> do
chMap <- readIORef myChildren
case Map.lookup newDeath chMap of
- Nothing -> handleEvents sp
- Just ch@(Worker str act) -> case str of
- OneForOne -> do
+ Nothing -> return ()
+ Just (Worker str act) ->
+ applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
+ let ch = Worker newStr act
newThreadId <- act newDeath
writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
- writeIfNotFull myStream (ChildRestarted newDeath newThreadId str now)
- handleEvents sp
- Just ch@(Supvsr str (Supervisor_ _ mbx cld es)) -> case str of
- OneForOne -> do
+ writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now)
+ Just (Supvsr str s@(Supervisor_ _ mbx cld es)) ->
+ applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
let node = Supervisor_ myId myChildren myMailbox myStream
+ let ch = (Supvsr newStr s)
newThreadId <- supervised node (handleEvents $ Supervisor_ Nothing mbx cld es)
writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
- writeIfNotFull myStream (ChildRestarted newDeath newThreadId str now)
- handleEvents sp
+ writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now)
+ handleEvents sp
+ where
+ applyStrategy :: RestartStrategy
+ -> (RestartStrategy -> IO ())
+ -> (RestartStrategy -> IO ())
+ -> IO ()
+ applyStrategy (OneForOne currentRestarts retryPol) ifAbort ifThrottle = do
+ let newStr = OneForOne (currentRestarts + 1) retryPol
+ case getRetryPolicy retryPol (currentRestarts + 1) of
+ Nothing -> ifAbort newStr
+ Just delay -> threadDelay delay >> ifThrottle newStr
-- $monitor
diff --git a/src/Control/Concurrent/Supervisor/Tutorial.hs b/src/Control/Concurrent/Supervisor/Tutorial.hs
index 86aa37a..c02b72f 100644
--- a/src/Control/Concurrent/Supervisor/Tutorial.hs
+++ b/src/Control/Concurrent/Supervisor/Tutorial.hs
@@ -93,7 +93,7 @@ module Control.Concurrent.Supervisor.Tutorial
-- > type Supervisor = Supervisor_ Initialised
-- The important difference though, is that the `SupervisorSpec` does not imply the creation
-- of an asynchronous thread, which the latter does. To keep separated the initialisation
--- of the data structure from the logic of supervising, we use GADTs and type synonyms to
+-- of the data structure from the logic of supervising, we use phantom types to
-- force you create a spec first.
-- Creating a spec it just a matter of calling `newSupervisorSpec`.
@@ -114,11 +114,11 @@ module Control.Concurrent.Supervisor.Tutorial
-- >
-- > sup1 `monitor` sup2
-- >
--- > _ <- forkSupervised sup2 OneForOne job3
+-- > _ <- forkSupervised sup2 oneForOne job3
-- >
--- > j1 <- forkSupervised sup1 OneForOne job1
--- > _ <- forkSupervised sup1 OneForOne (job2 j1)
--- > _ <- forkSupervised sup1 OneForOne job4
+-- > j1 <- forkSupervised sup1 oneForOne job1
+-- > _ <- forkSupervised sup1 oneForOne (job2 j1)
+-- > _ <- forkSupervised sup1 oneForOne job4
-- > _ <- forkIO (go (eventStream sup1))
-- > return sup1) shutdownSupervisor (\_ -> threadDelay 10000000000)
-- > where
@@ -136,6 +136,16 @@ module Control.Concurrent.Supervisor.Tutorial
-- (from the same spec, but you can create a separate one as well)
-- and we ask the first supervisor to monitor the second one.
--
+-- `oneForOne` is a smart constructor for our `RestartStrategy`,which creates
+-- under the hood a `OneForOne` strategy which is using the `fibonacciBackoff`
+-- as `RetryPolicy` from the "retry" package. The clear advantage is that
+-- you are not obliged to use it if you don't like this sensible default;
+-- `RetryPolicy` is an monoid, so you can compose retry policies as you wish.
+--
+-- The `RetryPolicy` will also be responsible for determining whether a thread can be
+-- restarted or not; in the latter case you will find a `ChildRestartedLimitReached`
+-- in your event log.
+--
-- If you run this program, hopefully you should see on stdout
-- something like this:
--
diff --git a/test/Main.hs b/test/Main.hs
index 359bd7d..1bf24af 100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -2,6 +2,7 @@
module Main where
import Test.Tasty
+import Test.Tasty.HUnit
import Test.Tasty.QuickCheck
import Test.QuickCheck.Monadic
import Tests
@@ -26,5 +27,6 @@ allTests = testGroup "All Tests" [
, testProperty "1 supervised thread, premature exception" (monadicIO test1SupThreadPrematureDemise)
, testProperty "killing spree" (monadicIO testKillingSpree)
, testProperty "cleanup" (monadicIO testSupCleanup)
+ , testCase "too many restarts" testTooManyRestarts
]
]
diff --git a/test/Tests.hs b/test/Tests.hs
index bbd26a6..09b3300 100644
--- a/test/Tests.hs
+++ b/test/Tests.hs
@@ -9,6 +9,7 @@ import Test.Tasty.QuickCheck
import Test.QuickCheck.Monadic as QM
import qualified Data.List as List
import Control.Monad
+import Control.Retry
import Control.Monad.Trans.Class
import Control.Applicative
import Control.Concurrent
@@ -86,9 +87,9 @@ qToList q = do
assertContainsNMsg :: (SupervisionEvent -> Bool)
-> Int
-> [SupervisionEvent]
- -> IOProperty ()
-assertContainsNMsg _ 0 _ = QM.assert True
-assertContainsNMsg _ x [] = lift $
+ -> IO ()
+assertContainsNMsg _ 0 _ = HUnit.assertBool "" True
+assertContainsNMsg _ x [] = do
HUnit.assertBool ("assertContainsNMsg: list exhausted and " ++ show x ++ " left.") False
assertContainsNMsg matcher !n (x:xs) = case matcher x of
True -> assertContainsNMsg matcher (n - 1) xs
@@ -96,19 +97,26 @@ assertContainsNMsg matcher !n (x:xs) = case matcher x of
--------------------------------------------------------------------------------
assertContainsNRestartMsg :: Int -> [SupervisionEvent] -> IOProperty ()
-assertContainsNRestartMsg = assertContainsNMsg matches
+assertContainsNRestartMsg n e = lift $ assertContainsNMsg matches n e
where
matches (ChildRestarted{}) = True
matches _ = False
--------------------------------------------------------------------------------
assertContainsNFinishedMsg :: Int -> [SupervisionEvent] -> IOProperty ()
-assertContainsNFinishedMsg = assertContainsNMsg matches
+assertContainsNFinishedMsg n e = lift $ assertContainsNMsg matches n e
where
matches (ChildFinished{}) = True
matches _ = False
--------------------------------------------------------------------------------
+assertContainsNLimitReached :: Int -> [SupervisionEvent] -> IO ()
+assertContainsNLimitReached = assertContainsNMsg matches
+ where
+ matches (ChildRestartLimitReached{}) = True
+ matches _ = False
+
+--------------------------------------------------------------------------------
assertContainsRestartMsg :: [SupervisionEvent] -> ThreadId -> IOProperty ()
assertContainsRestartMsg [] _ = QM.assert False
assertContainsRestartMsg (x:xs) tid = case x of
@@ -122,7 +130,7 @@ test1SupThreadNoEx :: IOProperty ()
test1SupThreadNoEx = forAllM randomLiveTime $ \ttl -> do
supSpec <- lift newSupervisorSpec
sup <- lift $ newSupervisor supSpec
- _ <- lift (forkSupervised sup OneForOne (forever $ threadDelay ttl))
+ _ <- lift (forkSupervised sup oneForOne (forever $ threadDelay ttl))
assertActiveThreads sup (== 1)
lift $ shutdownSupervisor sup
@@ -131,7 +139,7 @@ test1SupThreadPrematureDemise :: IOProperty ()
test1SupThreadPrematureDemise = forAllM randomLiveTime $ \ttl -> do
supSpec <- lift newSupervisorSpec
sup <- lift $ newSupervisor supSpec
- tid <- lift (forkSupervised sup OneForOne (forever $ threadDelay ttl))
+ tid <- lift (forkSupervised sup oneForOne (forever $ threadDelay ttl))
lift $ do
throwTo tid (AssertionFailed "You must die")
threadDelay ttl --give time to restart the thread
@@ -142,9 +150,9 @@ test1SupThreadPrematureDemise = forAllM randomLiveTime $ \ttl -> do
--------------------------------------------------------------------------------
fromAction :: Supervisor -> ThreadAction -> IO ThreadId
-fromAction s Live = forkSupervised s OneForOne (forever $ threadDelay 100000000)
-fromAction s (DieAfter (TTL ttl)) = forkSupervised s OneForOne (threadDelay ttl)
-fromAction s (ThrowAfter (TTL ttl)) = forkSupervised s OneForOne (do
+fromAction s Live = forkSupervised s oneForOne (forever $ threadDelay 100000000)
+fromAction s (DieAfter (TTL ttl)) = forkSupervised s oneForOne (threadDelay ttl)
+fromAction s (ThrowAfter (TTL ttl)) = forkSupervised s oneForOne (do
threadDelay ttl
throwIO $ AssertionFailed "die")
@@ -187,3 +195,13 @@ testSupCleanup = forAllM (vectorOf 100 arbitrary) $ \ttls -> do
assertActiveThreads sup (== 0)
assertContainsNFinishedMsg (length acts) q
lift $ shutdownSupervisor sup
+
+testTooManyRestarts :: Assertion
+testTooManyRestarts = do
+ supSpec <- newSupervisorSpec
+ sup <- newSupervisor supSpec
+ _ <- forkSupervised sup (OneForOne 0 $ limitRetries 5) $ error "die"
+ threadDelay 2000000
+ q <- qToList (eventStream sup)
+ assertContainsNLimitReached 1 q
+ shutdownSupervisor sup
diff --git a/threads-supervisor.cabal b/threads-supervisor.cabal
index 6fbaacc..6fbe01d 100644
--- a/threads-supervisor.cabal
+++ b/threads-supervisor.cabal
@@ -1,5 +1,5 @@
name: threads-supervisor
-version: 1.0.2.0
+version: 1.0.3.0
synopsis: Simple, IO-based library for Erlang-style thread supervision
description: Simple, IO-based library for Erlang-style thread supervision
license: MIT
@@ -15,6 +15,9 @@ source-repository head
type: git
location: https://github.com/adinapoli/threads-supervisor
+flag prof
+ default: False
+
library
exposed-modules:
Control.Concurrent.Supervisor
@@ -22,13 +25,36 @@ library
build-depends:
base >= 4.6 && < 5,
unordered-containers >= 0.2.0.0 && < 0.3.0.0,
+ retry >= 0.5 && < 0.8,
stm >= 2.4,
time >= 1.2
hs-source-dirs:
src
+ if flag(prof)
+ ghc-options: -fprof-auto -rtsopts
+ default-language: Haskell2010
+ ghc-options:
+ -Wall
+ -funbox-strict-fields
+
+executable threads-supervisor-example
+ build-depends:
+ base >= 4.6 && < 5,
+ threads-supervisor -any,
+ unordered-containers >= 0.2.0.0 && < 0.3.0.0,
+ stm >= 2.4,
+ time >= 1.2
+ hs-source-dirs:
+ examples
+ main-is:
+ Main.hs
+ if flag(prof)
+ ghc-options: -fprof-auto -rtsopts
default-language: Haskell2010
ghc-options:
-Wall
+ -threaded
+ "-with-rtsopts=-N"
-funbox-strict-fields
test-suite threads-supervisor-tests
@@ -49,6 +75,7 @@ test-suite threads-supervisor-tests
threads-supervisor -any
, base
, bytestring
+ , retry
, QuickCheck
, tasty >= 0.9.0.1
, tasty-quickcheck