summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchessai <>2018-12-05 20:33:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-12-05 20:33:00 (GMT)
commitd4e8ee149e171bb7ba1bd24a0e35009d14b778ae (patch)
tree69bf23ba82a124eed3a81140851def139488ae8a
version 0.1.0.0HEAD0.1.0.0master
-rwxr-xr-xCHANGELOG.md5
-rw-r--r--LICENSE30
-rw-r--r--Setup.hs2
-rw-r--r--qsem.cabal25
-rw-r--r--src/QSem.hs134
-rw-r--r--src/QSemN.hs123
6 files changed, 319 insertions, 0 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100755
index 0000000..ba8d61a
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,5 @@
+# Revision history for qsem
+
+## 0.1.0.0 -- YYYY-mm-dd
+
+* First version. Released on an unsuspecting world.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8f3f232
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright (c) 2018, chessai
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * Neither the name of chessai nor the names of other
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Setup.hs b/Setup.hs
new file mode 100644
index 0000000..9a994af
--- /dev/null
+++ b/Setup.hs
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
diff --git a/qsem.cabal b/qsem.cabal
new file mode 100644
index 0000000..a197fab
--- /dev/null
+++ b/qsem.cabal
@@ -0,0 +1,25 @@
+name: qsem
+version: 0.1.0.0
+synopsis: quantity semaphores
+description: simple and general quantity semaphores
+homepage: https://github.com/chessai/qsem
+license: BSD3
+license-file: LICENSE
+author: chessai
+maintainer: chessai1996@gmail.com
+copyright: (c) The University of Glasgow 2001
+ (c) chessai 2018
+category: Concurrency
+build-type: Simple
+extra-source-files: CHANGELOG.md
+cabal-version: >=1.10
+
+library
+ exposed-modules:
+ QSem
+ QSemN
+ build-depends:
+ base >=4.6 && <4.13
+ , ghc-prim
+ hs-source-dirs: src
+ default-language: Haskell2010
diff --git a/src/QSem.hs b/src/QSem.hs
new file mode 100644
index 0000000..707aa27
--- /dev/null
+++ b/src/QSem.hs
@@ -0,0 +1,134 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE GADTSyntax #-}
+{-# LANGUAGE MagicHash #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- | Simple quantity semaphores.
+module QSem
+ ( -- * Simple Quantity Semaphores
+ QSem -- abstract
+ , newQSem -- :: Int -> IO QSem
+ , waitQSem -- :: QSem -> IO ()
+ , signalQSem -- :: QSem -> IO ()
+ ) where
+
+import GHC.Prim (MVar#, RealWorld)
+import Control.Concurrent.MVar
+import GHC.MVar
+import GHC.IO
+import Prelude hiding (reverse)
+
+data MVarList where
+ MNil :: MVarList
+ MCons :: !(MVar# RealWorld ()) -> MVarList -> MVarList
+
+reverse :: MVarList -> MVarList
+reverse l = rev l MNil
+ where
+ rev MNil a = a
+ rev (MCons x xs) a = rev xs (MCons x a)
+
+-- The semaphore state (i, xs, ys):
+--
+-- i is the current resource value
+--
+-- (xs,ys) is the queue of blocked threads, where the queue is
+-- given by xs ++ reverse ys. We can enqueue new blocked threads
+-- by consing onto ys, and dequeue by removing from the head of xs.
+--
+data SemaphoreState = SS
+ { _currentResourceValue :: {-# UNPACK #-} !Int
+ , _queueForward :: !(MVarList)
+ , _queueReverse :: !(MVarList)
+ }
+
+-- | 'QSem' is a quantity semaphore in which the resource is acquired
+-- and released in units of one. It provides guaranteed FIFO ordering
+-- for satisfying blocked `waitQSem` calls.
+--
+-- The pattern
+--
+-- > bracket_ waitQSem signalQSem (...)
+--
+-- is safe; it never loses a unit of the resource.
+newtype QSem = QSem (MVar SemaphoreState)
+
+-- A blocked thread is represented by an empty (MVar ()). To unblock
+-- the thread, we put () into the MVar.
+--
+-- A thread can dequeue itself by also putting () into the MVar, which
+-- it must do if it receives an exception while blocked in waitQSem.
+-- This means that when unblocking a thread in signalQSem we must
+-- first check whether the MVar is already full; the MVar lock on the
+-- semaphore itself resolves race conditions between signalQSem and a
+-- thread attempting to dequeue itself.
+
+-- | Build a new 'QSem' with a supplied initial quantity.
+-- The initial quantity must be at least 0.
+newQSem :: Int -> IO QSem
+newQSem !initial
+ | initial < 0 = fail "newQSem: Initial quantity must be non-negative"
+ | otherwise = do
+ sem <- newMVar $ SS initial MNil MNil
+ return (QSem sem)
+
+-- | Wait for a unit to become available.
+waitQSem :: QSem -> IO ()
+waitQSem (QSem !m) = mask_ $ do
+ (SS i b1 b2) <- takeMVar m
+ if i == 0
+ then do
+ (MVar b) <- newEmptyMVar
+ putMVar m (SS i b1 (MCons b b2))
+ wait b
+ else do
+ let !z = i - 1
+ putMVar m (SS z b1 b2)
+ return ()
+ where
+ wait :: MVar# RealWorld () -> IO ()
+ wait b = takeMVar# b `onException` do
+ uninterruptibleMask_ $ do
+ (SS i b1 b2) <- takeMVar m
+ r <- tryTakeMVar (MVar b)
+ r' <- case r of { Just _ -> signal (SS i b1 b2); Nothing -> do { putMVar (MVar b) (); return (SS i b1 b2) } }
+ putMVar m r'
+
+-- | Signal that a unit of the 'QSem' is available
+signalQSem :: QSem -> IO ()
+signalQSem (QSem !m) = uninterruptibleMask_ $ do
+ r <- takeMVar m
+ r' <- signal r
+ putMVar m r'
+
+-- Note [signal uninterruptible]
+--
+-- If we have
+--
+-- bracket waitQSem signalQSem (...)
+--
+-- and an exception arrives at the signalQSem, then we must not lose
+-- the resource. The signalQSem is masked by bracket, but taking
+-- the MVar might block, and so it would be interruptible. Hence we
+-- need an uninterruptibleMask here.
+--
+-- This isn't ideal: during high contention, some threads won't be
+-- interruptible. The QSemSTM implementation has better behaviour
+-- here, but it performs much worse than this one in some
+-- benchmarks.
+signal :: SemaphoreState -> IO SemaphoreState
+signal (SS i a1 a2) =
+ if i == 0
+ then loop a1 a2
+ else let !z = i + 1 in return (SS z a2 a2)
+ where
+ loop MNil MNil = return (SS 1 MNil MNil)
+ loop MNil b2 = loop (reverse b2) MNil
+ loop (MCons b bs) b2 = do
+ r <- tryPutMVar (MVar b) ()
+ if r then return (SS 0 bs b2)
+ else loop bs b2
+
+takeMVar# :: MVar# RealWorld a -> IO a
+{-# INLINE takeMVar# #-}
+takeMVar# m = takeMVar (MVar m) \ No newline at end of file
diff --git a/src/QSemN.hs b/src/QSemN.hs
new file mode 100644
index 0000000..98fa2d6
--- /dev/null
+++ b/src/QSemN.hs
@@ -0,0 +1,123 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE MagicHash #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE UnboxedTuples #-}
+{-# LANGUAGE GADTSyntax #-}
+
+-- | Quantity semaphores in which each thread may wait for an arbitrary \"amount\".
+module QSemN
+ ( -- * General Quantity Semaphores
+ QSemN -- abstract
+ , newQSemN -- :: Int -> IO QSemN
+ , waitQSemN -- :: QSemN -> Int -> IO ()
+ , signalQSemN -- :: QSemN -> Int -> IO ()
+ ) where
+
+import GHC.Prim (MVar#, RealWorld)
+import Control.Concurrent.MVar
+import GHC.MVar
+import GHC.IO
+import Prelude hiding (reverse)
+
+type QuantityMVar = (# Int, MVar# RealWorld () #)
+
+data MVarList where
+ MNil :: MVarList
+ MCons :: !QuantityMVar -> MVarList -> MVarList
+
+reverse :: MVarList -> MVarList
+reverse l = rev l MNil
+ where
+ rev MNil a = a
+ rev (MCons x xs) a = rev xs (MCons x a)
+
+
+-- The semaphore state (i, xs, ys):
+--
+-- i is the current resource value
+--
+-- (xs,ys) is the queue of blocked threads, where the queue is
+-- given by xs ++ reverse ys. We can enqueue new blocked threads
+-- by consing onto ys, and dequeue by removing from the head of xs.
+--
+data SemaphoreState = SS
+ { _currentResourceValue :: {-# UNPACK #-} !Int
+ , _queueForward :: !(MVarList)
+ , _queueReverse :: !(MVarList)
+ }
+
+-- | 'QSemN' is a quantity semaphore in which the resource is acquired
+-- and released in units of one. It provides guaranteed FIFO ordering
+-- for satisfying blocked `waitQSemN` calls.
+--
+-- The pattern
+--
+-- > bracket_ (waitQSemN n) (signalQSemN n) (...)
+--
+-- is safe; it never loses any of the resource.
+newtype QSemN = QSemN (MVar SemaphoreState)
+
+-- A blocked thread is represented by an empty (MVar ()). To unblock
+-- the thread, we put () into the MVar.
+--
+-- A thread can dequeue itself by also putting () into the MVar, which
+-- it must do if it receives an exception while blocked in waitQSemN.
+-- This means that when unblocking a thread in signalQSemN we must
+-- first check whether the MVar is already full; the MVar lock on the
+-- semaphore itself resolves race conditions between signalQSemN and a
+-- thread attempting to dequeue itself.
+
+-- | Build a new 'QSemN' with a supplied initial quantity.
+-- The initial quantity must be at least 0.
+newQSemN :: Int -> IO QSemN
+newQSemN !initial
+ | initial < 0 = fail "newQSemN: Initial quantity must be non-negative"
+ | otherwise = do
+ sem <- newMVar (SS initial MNil MNil)
+ return (QSemN sem)
+
+-- | Wait for the specified quantity to become available.
+waitQSemN :: QSemN -> Int -> IO ()
+waitQSemN (QSemN !m) !sz = mask_ $ do
+ (SS i b1 b2) <- takeMVar m
+ let !z = i - sz
+ if z < 0
+ then do
+ bl@(MVar b) <- newEmptyMVar
+ putMVar m (SS i b1 (MCons (# sz, b #) b2))
+ wait bl
+ else do
+ putMVar m (SS z b1 b2)
+ return ()
+ where
+ wait :: MVar () -> IO ()
+ wait b = takeMVar b `onException`
+ ( uninterruptibleMask_ $ do
+ ss <- takeMVar m
+ r <- tryTakeMVar b
+ r' <- case r of { Just _ -> signal sz ss; Nothing -> do { putMVar b (); return ss; } }
+ putMVar m r'
+ )
+
+-- | Signal that a given quantity is now available from the 'QSemN'.
+signalQSemN :: QSemN -> Int -> IO ()
+signalQSemN (QSemN !m) !sz = uninterruptibleMask_ $ do
+ r <- takeMVar m
+ r' <- signal sz r
+ putMVar m r'
+
+signal :: Int -> SemaphoreState -> IO SemaphoreState
+signal !sz0 (SS i a1 a2) = loop (sz0 + i) a1 a2
+ where
+ loop 0 bs b2 = return (SS 0 bs b2)
+ loop sz MNil MNil = return (SS sz MNil MNil)
+ loop sz MNil b2 = loop sz (reverse b2) MNil
+ loop sz jbbs@(MCons (# j, b #) bs) b2
+ | j > sz = do
+ r <- isEmptyMVar (MVar b)
+ if r then return (SS sz jbbs b2)
+ else loop sz bs b2
+ | otherwise = do
+ r <- tryPutMVar (MVar b) ()
+ if r then loop (sz - j) bs b2
+ else loop sz bs b2