summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharendra <>2018-09-04 11:51:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-09-04 11:51:00 (GMT)
commit1bfd478bd0a77b3db0b0c36d98a62e7a4c8a01e5 (patch)
treeecaa94ba3710f171d6a6f089f2fe715fede9e679
parent8d94fbe7755c526f9858d54e628d4ad4cdcdb2a0 (diff)
version 0.5.00.5.0
-rw-r--r--Changelog.md28
-rw-r--r--README.md63
-rw-r--r--benchmark/BaseStreams.hs29
-rw-r--r--benchmark/Linear.hs41
-rw-r--r--benchmark/LinearOps.hs200
-rw-r--r--benchmark/StreamDOps.hs35
-rw-r--r--benchmark/StreamKOps.hs20
-rw-r--r--examples/AcidRain.hs11
-rw-r--r--examples/CirclingSquare.hs47
-rw-r--r--src/Streamly.hs33
-rw-r--r--src/Streamly/Prelude.hs167
-rw-r--r--src/Streamly/SVar.hs1573
-rw-r--r--src/Streamly/Streams/Ahead.hs458
-rw-r--r--src/Streamly/Streams/Async.hs416
-rw-r--r--src/Streamly/Streams/Parallel.hs36
-rw-r--r--src/Streamly/Streams/SVar.hs189
-rw-r--r--src/Streamly/Streams/Serial.hs11
-rw-r--r--src/Streamly/Streams/StreamD.hs8
-rw-r--r--src/Streamly/Streams/StreamK.hs107
-rw-r--r--src/Streamly/Time.hs5
-rw-r--r--src/Streamly/Tutorial.hs11
-rw-r--r--stack.yaml2
-rw-r--r--streamly.cabal34
-rw-r--r--test/Main.hs224
-rw-r--r--test/MaxRate.hs128
-rw-r--r--test/Prop.hs634
26 files changed, 3555 insertions, 955 deletions
diff --git a/Changelog.md b/Changelog.md
index b2bdf17..cfda9b7 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -1,3 +1,29 @@
+## 0.5.0
+
+### Bug Fixes
+
+* Leftover threads are now cleaned up as soon as the consumer is garbage
+ collected.
+* Fix a bug in concurrent function application that in certain cases would
+ unnecessarily share the concurrency state resulting in incorrect output
+ stream.
+* Fix passing of state across `parallel`, `async`, `wAsync`, `ahead`, `serial`,
+ `wSerial` combinators. Without this fix combinators that rely on state
+ passing e.g. `maxThreads` and `maxBuffer` won't work across these
+ combinators.
+
+### Enhancements
+
+* Added rate limiting combinators `rate`, `avgRate`, `minRate`, `maxRate` and
+ `constRate` to control the yield rate of a stream.
+* Add `foldl1'`, `foldr1`, `intersperseM`, `find`, `lookup`, `and`, `or`,
+ `findIndices`, `findIndex`, `elemIndices`, `elemIndex`, `init` to Prelude
+
+### Deprecations
+
+* The `Streamly.Time` module is now deprecated, its functionality is subsumed
+ by the new rate limiting combinators.
+
## 0.4.1
### Bug Fixes
@@ -20,7 +46,7 @@
* Add concurrency control primitives `maxThreads` and `maxBuffer`.
* Concurrency of a stream with bounded concurrency when used with `take` is now
- limited by the number elements demanded by `take`.
+ limited by the number of elements demanded by `take`.
* Significant performance improvements utilizing stream fusion optimizations.
* Add `yield` to construct a singleton stream from a pure value
* Add `repeat` to generate an infinite stream by repeating a pure value
diff --git a/README.md b/README.md
index 1039faa..a2da056 100644
--- a/README.md
+++ b/README.md
@@ -1,11 +1,5 @@
# Streamly
-[![Hackage](https://img.shields.io/hackage/v/streamly.svg?style=flat)](https://hackage.haskell.org/package/streamly)
-[![Gitter chat](https://badges.gitter.im/composewell/gitter.svg)](https://gitter.im/composewell/streamly)
-[![Build Status](https://travis-ci.org/composewell/streamly.svg?branch=master)](https://travis-ci.org/composewell/streamly)
-[![Windows Build status](https://ci.appveyor.com/api/projects/status/ajxg0c79raou9ned?svg=true)](https://ci.appveyor.com/project/harendra-kumar/streamly)
-[![Coverage Status](https://coveralls.io/repos/composewell/streamly/badge.svg?branch=master&service=github)](https://coveralls.io/github/composewell/streamly?branch=master)
-
## Stream`ing` `Concurrent`ly
Streamly, short for streaming concurrently, provides monadic streams, with a
@@ -52,6 +46,12 @@ Why use streamly?
[streaming-benchmarks](https://github.com/composewell/streaming-benchmarks)
for a comparison of popular streaming libraries on micro-benchmarks.
+The following chart shows a summary of the cost of key streaming operations
+processing a million elements. The timings for streamly and vector are in the
+600-700 microseconds range and therefore can barely be seen in the graph.
+
+![Streaming Operations at a Glance](charts-0/KeyOperations-time.svg)
+
For more details on streaming library ecosystem and where streamly fits in,
please see
[streaming libraries](https://github.com/composewell/streaming-benchmarks#streaming-libraries).
@@ -78,7 +78,7 @@ The following snippet provides a simple stream composition example that reads
numbers from stdin, prints the squares of even numbers and exits if an even
number more than 9 is entered.
-```haskell
+``` haskell
import Streamly
import qualified Streamly.Prelude as S
import Data.Function ((&))
@@ -101,7 +101,7 @@ when used with appropriate stream type combinator (e.g. `asyncly`, `aheadly` or
The following code finishes in 3 seconds (6 seconds when serial):
-```
+``` haskell
> let p n = threadDelay (n * 1000000) >> return n
> S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil
[3,2,1]
@@ -112,7 +112,7 @@ The following code finishes in 3 seconds (6 seconds when serial):
The following finishes in 10 seconds (100 seconds when serial):
-```
+``` haskell
runStream $ asyncly $ S.replicateM 10 $ p 10
```
@@ -123,7 +123,7 @@ following example prints a "hello" every second; if you use `&` instead of
`|&` you will see that the delay doubles to 2 seconds instead because of serial
application.
-```
+``` haskell
main = runStream $
S.repeatM (threadDelay 1000000 >> return "hello")
|& S.mapM (\x -> threadDelay 1000000 >> putStrLn x)
@@ -133,7 +133,7 @@ main = runStream $
We can use `mapM` or `sequence` functions concurrently on a stream.
-```
+``` haskell
> let p n = threadDelay (n * 1000000) >> return n
> runStream $ aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ repeatM (p 1))
```
@@ -170,7 +170,7 @@ delay n = S.yieldM $ do
```
### Serial
-```haskell
+``` haskell
main = runStream $ delay 3 <> delay 2 <> delay 1
```
```
@@ -181,7 +181,7 @@ ThreadId 36: Delay 1
### Parallel
-```haskell
+``` haskell
main = runStream . parallely $ delay 3 <> delay 2 <> delay 1
```
```
@@ -300,6 +300,28 @@ The concurrency facilities provided by streamly can be compared with
[Cilk](https://en.wikipedia.org/wiki/Cilk) but with a more declarative
expression.
+## Rate Limiting
+
+For bounded concurrent streams, stream yield rate can be specified. For
+example, to print hello once every second you can simply write this:
+
+``` haskell
+import Streamly
+import Streamly.Prelude as S
+
+main = runStream $ asyncly $ avgRate 1 $ S.repeatM $ putStrLn "hello"
+```
+
+For some practical uses of rate control, see
+[AcidRain.hs](https://github.com/composewell/streamly/tree/master/examples/AcidRain.hs)
+and
+[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
+.
+Concurrency of the stream is automatically controlled to match the specified
+rate. Rate control works precisely even at throughputs as high as millions of
+yields per second. For more sophisticated rate control see the haddock
+documentation.
+
## Reactive Programming (FRP)
Streamly is a foundation for first class reactive programming as well by virtue
@@ -309,21 +331,6 @@ for a console based FRP game example and
[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
for an SDL based animation example.
-## Performance
-
-`Streamly` has best in class performance even though it generalizes streaming
-to concurrent composition that does not mean it sacrifices non-concurrent
-performance. See
-[streaming-benchmarks](https://github.com/composewell/streaming-benchmarks) for
-detailed performance comparison with regular streaming libraries and the
-explanation of the benchmarks. The following graphs show a summary, the first
-one measures how four pipeline stages in a series perform, the second one
-measures the performance of individual stream operations; in both cases the
-stream processes a million elements:
-
-![Composing Pipeline Stages](charts/comparative/ComposingPipelineStages.svg)
-![All Operations at a Glance](charts/comparative/AllOperationsataGlance.svg)
-
## Contributing
The code is available under BSD-3 license
diff --git a/benchmark/BaseStreams.hs b/benchmark/BaseStreams.hs
index 6a17935..3fce088 100644
--- a/benchmark/BaseStreams.hs
+++ b/benchmark/BaseStreams.hs
@@ -5,6 +5,8 @@
-- License : BSD3
-- Maintainer : harendra.kumar@gmail.com
+{-# LANGUAGE CPP #-}
+
import Control.DeepSeq (NFData)
-- import Data.Functor.Identity (Identity, runIdentity)
import System.Random (randomRIO)
@@ -46,8 +48,8 @@ main = do
, benchIO "nullHeadTail" D.nullHeadTail D.sourceUnfoldrM
]
, bgroup "transformation"
- [ -- benchIO "scan" D.scan D.sourceUnfoldrM
- benchIO "map" D.map D.sourceUnfoldrM
+ [ benchIO "scanlM'" D.scan D.sourceUnfoldrM
+ , benchIO "map" D.map D.sourceUnfoldrM
, benchIO "mapM" D.mapM D.sourceUnfoldrM
]
, bgroup "filtering"
@@ -55,6 +57,25 @@ main = do
, benchIO "filter-all-out" D.filterAllOut D.sourceUnfoldrM
, benchIO "filter-all-in" D.filterAllIn D.sourceUnfoldrM
, benchIO "take-all" D.takeAll D.sourceUnfoldrM
+ , benchIO "takeWhile-true" D.takeWhileTrue D.sourceUnfoldrM
+ , benchIO "drop-all" D.dropAll D.sourceUnfoldrM
+ , benchIO "dropWhile-true" D.dropWhileTrue D.sourceUnfoldrM
+ ]
+ , benchIO "zip" D.zip D.sourceUnfoldrM
+ , bgroup "compose"
+ [ benchIO "mapM" D.composeMapM D.sourceUnfoldrM
+#if __GLASGOW_HASKELL__ != 802
+ , benchIO "map-with-all-in-filter" D.composeMapAllInFilter D.sourceUnfoldrM
+ , benchIO "all-in-filters" D.composeAllInFilters D.sourceUnfoldrM
+ , benchIO "all-out-filters" D.composeAllOutFilters D.sourceUnfoldrM
+#endif
+ ]
+ -- Scaling with same operation in sequence
+ , bgroup "compose-scaling"
+ [ benchIO "1" (D.composeScaling 1) D.sourceUnfoldrM
+ , benchIO "2" (D.composeScaling 2) D.sourceUnfoldrM
+ , benchIO "3" (D.composeScaling 3) D.sourceUnfoldrM
+ , benchIO "4" (D.composeScaling 4) D.sourceUnfoldrM
]
]
, bgroup "streamK"
@@ -73,6 +94,8 @@ main = do
, bgroup "elimination"
[ benchIO "toNull" K.toNull K.sourceUnfoldrM
, benchIO "uncons" K.uncons K.sourceUnfoldrM
+ , benchFold "init" K.init K.sourceUnfoldrM
+ , benchFold "tail" K.tail K.sourceUnfoldrM
, benchIO "nullHeadTail" K.nullHeadTail K.sourceUnfoldrM
, benchFold "toList" K.toList K.sourceUnfoldrM
, benchFold "fold" K.foldl K.sourceUnfoldrM
@@ -82,7 +105,7 @@ main = do
[ benchIO "scan" K.scan K.sourceUnfoldrM
, benchIO "map" K.map K.sourceUnfoldrM
, benchIO "mapM" K.mapM K.sourceUnfoldrM
- , benchIO "concat" K.concat K.sourceUnfoldrM
+ -- , benchIO "concat" K.concat K.sourceUnfoldrM
]
, bgroup "filtering"
[ benchIO "filter-even" K.filterEven K.sourceUnfoldrM
diff --git a/benchmark/Linear.hs b/benchmark/Linear.hs
index 0aace62..22e785d 100644
--- a/benchmark/Linear.hs
+++ b/benchmark/Linear.hs
@@ -56,12 +56,16 @@ main = do
, bgroup "elimination"
[ benchIO "toNull" $ Ops.toNull serially
, benchIO "uncons" Ops.uncons
+ , benchIO "init" Ops.init
+ , benchIO "tail" Ops.tail
, benchIO "nullHeadTail" Ops.nullHeadTail
, benchIO "mapM_" Ops.mapM_
, benchIO "toList" Ops.toList
, benchIO "foldr" Ops.foldr
+ , benchIO "foldr1" Ops.foldr1
, benchIO "foldrM" Ops.foldrM
- , benchIO "foldl'" Ops.foldl
+ , benchIO "foldl'" Ops.foldl'
+ , benchIO "foldl1'" Ops.foldl1'
, benchIO "last" Ops.last
, benchIO "length" Ops.length
@@ -69,6 +73,11 @@ main = do
, benchIO "notElem" Ops.notElem
, benchIO "all" Ops.all
, benchIO "any" Ops.any
+ , benchIO "and" Ops.and
+ , benchIO "or" Ops.or
+ , benchIO "find" Ops.find
+ , benchIO "findIndex" Ops.findIndex
+ , benchIO "elemIndex" Ops.elemIndex
, benchIO "maximum" Ops.maximum
, benchIO "minimum" Ops.minimum
, benchIO "sum" Ops.sum
@@ -83,6 +92,8 @@ main = do
, benchIO "mapMaybeM" Ops.mapMaybeM
, bench "sequence" $ nfIO $ randomRIO (1,1000) >>= \n ->
(Ops.sequence serially) (Ops.sourceUnfoldrMAction n)
+ , benchIO "findIndices" Ops.findIndices
+ , benchIO "elemIndices" Ops.elemIndices
, benchIO "concat" Ops.concat
]
, bgroup "filtering"
@@ -120,6 +131,27 @@ main = do
-- , benchSrcIO asyncly "foldMapWith" Ops.sourceFoldMapWith
, benchSrcIO asyncly "foldMapWithM" Ops.sourceFoldMapWithM
, benchIO "mapM" $ Ops.mapM asyncly
+ , benchSrcIO asyncly "unfoldrM maxThreads 1"
+ (maxThreads 1 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM maxBuffer 1 (1000 ops)"
+ (maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
+ ]
+ , bgroup "asyncly/rate"
+ [ -- benchIO "unfoldr" $ Ops.toNull asyncly
+ benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO asyncly "unfoldrM/Nothing"
+ (rate Nothing . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/AvgRate/1,000,000"
+ (avgRate 1000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/AvgRate/3,000,000"
+ (avgRate 3000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/AvgRate/10,000,000/maxThreads1"
+ (maxThreads 1 . avgRate 10000000 . Ops.sourceUnfoldrM)
+ -- XXX arbitrarily large rate should be the same as rate Nothing
+ , benchSrcIO asyncly "unfoldrM/AvgRate/10,000,000"
+ (avgRate 10000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/AvgRate/20,000,000"
+ (avgRate 20000000 . Ops.sourceUnfoldrM)
]
, bgroup "wAsyncly"
[ -- benchIO "unfoldr" $ Ops.toNull wAsyncly
@@ -135,6 +167,13 @@ main = do
, bgroup "aheadly"
[ -- benchIO "unfoldr" $ Ops.toNull aheadly
benchSrcIO aheadly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO aheadly "unfoldrM maxThreads 1"
+ (maxThreads 1 . Ops.sourceUnfoldrM)
+ -- XXX arbitrarily large maxRate should be the same as maxRate -1
+ , benchSrcIO aheadly "unfoldrM rate AvgRate 1000000"
+ (avgRate 1000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO aheadly "unfoldrM maxBuffer 1 (1000 ops)"
+ (maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
-- , benchSrcIO aheadly "fromFoldable" Ops.sourceFromFoldable
, benchSrcIO aheadly "fromFoldableM" Ops.sourceFromFoldableM
-- , benchSrcIO aheadly "foldMapWith" Ops.sourceFoldMapWith
diff --git a/benchmark/LinearOps.hs b/benchmark/LinearOps.hs
index 2701dad..98bf624 100644
--- a/benchmark/LinearOps.hs
+++ b/benchmark/LinearOps.hs
@@ -11,7 +11,7 @@ module LinearOps where
import Data.Maybe (fromJust)
import Prelude
- (Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=),
+ (Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=), (==), (<=),
subtract, undefined, Maybe(..), odd, Bool, not)
import qualified Streamly as S
@@ -25,76 +25,6 @@ maxValue = value + 1000
-- Benchmark ops
-------------------------------------------------------------------------------
-{-# INLINE uncons #-}
-{-# INLINE nullHeadTail #-}
-{-# INLINE scan #-}
-{-# INLINE mapM_ #-}
-{-# INLINE map #-}
-{-# INLINE fmap #-}
-{-# INLINE mapMaybe #-}
-{-# INLINE filterEven #-}
-{-# INLINE filterAllOut #-}
-{-# INLINE filterAllIn #-}
-{-# INLINE takeOne #-}
-{-# INLINE takeAll #-}
-{-# INLINE takeWhileTrue #-}
-{-# INLINE takeWhileMTrue #-}
-{-# INLINE dropAll #-}
-{-# INLINE dropWhileTrue #-}
-{-# INLINE dropWhileMTrue #-}
-{-# INLINE zip #-}
-{-# INLINE zipM #-}
-{-# INLINE concat #-}
-{-# INLINE composeAllInFilters #-}
-{-# INLINE composeAllOutFilters #-}
-{-# INLINE composeMapAllInFilter #-}
-uncons, nullHeadTail, scan, mapM_, map, fmap, mapMaybe, filterEven, filterAllOut,
- filterAllIn, takeOne, takeAll, takeWhileTrue, takeWhileMTrue, dropAll,
- dropWhileTrue, dropWhileMTrue, zip, zipM,
- concat, composeAllInFilters, composeAllOutFilters,
- composeMapAllInFilter
- :: Monad m
- => Stream m Int -> m ()
-
-{-# INLINE composeMapM #-}
-{-# INLINE zipAsync #-}
-{-# INLINE zipAsyncM #-}
-{-# INLINE mapMaybeM #-}
-composeMapM, zipAsync, zipAsyncM, mapMaybeM :: S.MonadAsync m => Stream m Int -> m ()
-
-{-# INLINE toList #-}
-{-# INLINE foldr #-}
-{-# INLINE foldrM #-}
-toList, foldr, foldrM :: Monad m => Stream m Int -> m [Int]
-
-{-# INLINE last #-}
-{-# INLINE maximum #-}
-{-# INLINE minimum #-}
-last, minimum, maximum :: Monad m => Stream m Int -> m (Maybe Int)
-
-{-# INLINE foldl #-}
-{-# INLINE length #-}
-{-# INLINE sum #-}
-{-# INLINE product #-}
-foldl, length, sum, product :: Monad m => Stream m Int -> m Int
-
-{-# INLINE all #-}
-{-# INLINE any #-}
-{-# INLINE elem #-}
-{-# INLINE notElem #-}
-elem, notElem, all, any :: Monad m => Stream m Int -> m Bool
-
-{-# INLINE toNull #-}
-toNull :: Monad m => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
-
-{-# INLINE mapM #-}
-mapM :: (S.IsStream t, S.MonadAsync m)
- => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
-
-{-# INLINE sequence #-}
-sequence :: (S.IsStream t, S.MonadAsync m)
- => (t m Int -> S.SerialT m Int) -> t m (m Int) -> m ()
-
-------------------------------------------------------------------------------
-- Stream generation and elimination
-------------------------------------------------------------------------------
@@ -150,6 +80,15 @@ sourceUnfoldrM n = S.unfoldrM step n
then return Nothing
else return (Just (cnt, cnt + 1))
+{-# INLINE sourceUnfoldrMN #-}
+sourceUnfoldrMN :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int
+sourceUnfoldrMN m n = S.unfoldrM step n
+ where
+ step cnt =
+ if cnt > n + m
+ then return Nothing
+ else return (Just (cnt, cnt + 1))
+
{-# INLINE sourceUnfoldrMAction #-}
sourceUnfoldrMAction :: (S.IsStream t, S.MonadAsync m) => Int -> t m (m Int)
sourceUnfoldrMAction n = S.serially $ S.unfoldrM step n
@@ -167,12 +106,65 @@ sourceUnfoldrMAction n = S.serially $ S.unfoldrM step n
runStream :: Monad m => Stream m a -> m ()
runStream = S.runStream
+{-# INLINE toList #-}
+{-# INLINE foldr #-}
+{-# INLINE foldrM #-}
+toList, foldr, foldrM :: Monad m => Stream m Int -> m [Int]
+
+{-# INLINE last #-}
+{-# INLINE maximum #-}
+{-# INLINE minimum #-}
+{-# INLINE find #-}
+{-# INLINE findIndex #-}
+{-# INLINE elemIndex #-}
+{-# INLINE foldl1' #-}
+{-# INLINE foldr1 #-}
+last, minimum, maximum, find, findIndex, elemIndex, foldl1', foldr1 :: Monad m => Stream m Int -> m (Maybe Int)
+
+{-# INLINE foldl' #-}
+{-# INLINE length #-}
+{-# INLINE sum #-}
+{-# INLINE product #-}
+foldl', length, sum, product :: Monad m => Stream m Int -> m Int
+
+{-# INLINE all #-}
+{-# INLINE any #-}
+{-# INLINE and #-}
+{-# INLINE or #-}
+{-# INLINE elem #-}
+{-# INLINE notElem #-}
+elem, notElem, all, any, and, or :: Monad m => Stream m Int -> m Bool
+
+{-# INLINE toNull #-}
+toNull :: Monad m => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
toNull t = runStream . t
+
+{-# INLINE uncons #-}
+uncons :: Monad m => Stream m Int -> m ()
uncons s = do
r <- S.uncons s
case r of
Nothing -> return ()
Just (_, t) -> uncons t
+
+{-# INLINE init #-}
+init :: Monad m => Stream m a -> m ()
+init s = do
+ r <- S.init s
+ case r of
+ Nothing -> return ()
+ Just x -> S.runStream x
+
+{-# INLINE tail #-}
+tail :: Monad m => Stream m a -> m ()
+tail s = do
+ r <- S.tail s
+ case r of
+ Nothing -> return ()
+ Just x -> tail x
+
+{-# INLINE nullHeadTail #-}
+nullHeadTail :: Monad m => Stream m Int -> m ()
nullHeadTail s = do
r <- S.null s
if not r
@@ -183,17 +175,25 @@ nullHeadTail s = do
Nothing -> return ()
Just x -> nullHeadTail x
else return ()
+
mapM_ = S.mapM_ (\_ -> return ())
toList = S.toList
foldr = S.foldr (:) []
+foldr1 = S.foldr1 (+)
foldrM = S.foldrM (\a xs -> return (a : xs)) []
-foldl = S.foldl' (+) 0
+foldl' = S.foldl' (+) 0
+foldl1' = S.foldl1' (+)
last = S.last
elem = S.elem maxValue
notElem = S.notElem maxValue
length = S.length
all = S.all (<= maxValue)
any = S.any (> maxValue)
+and = S.and . S.map (<= maxValue)
+or = S.or . S.map (> maxValue)
+find = S.find (== maxValue)
+findIndex = S.findIndex (== maxValue)
+elemIndex = S.elemIndex maxValue
maximum = S.maximum
minimum = S.minimum
sum = S.sum
@@ -207,6 +207,41 @@ product = S.product
transform :: Monad m => Stream m a -> m ()
transform = runStream
+{-# INLINE scan #-}
+{-# INLINE mapM_ #-}
+{-# INLINE map #-}
+{-# INLINE fmap #-}
+{-# INLINE mapMaybe #-}
+{-# INLINE filterEven #-}
+{-# INLINE filterAllOut #-}
+{-# INLINE filterAllIn #-}
+{-# INLINE takeOne #-}
+{-# INLINE takeAll #-}
+{-# INLINE takeWhileTrue #-}
+{-# INLINE takeWhileMTrue #-}
+{-# INLINE dropAll #-}
+{-# INLINE dropWhileTrue #-}
+{-# INLINE dropWhileMTrue #-}
+{-# INLINE findIndices #-}
+{-# INLINE elemIndices #-}
+scan, mapM_, map, fmap, mapMaybe, filterEven, filterAllOut,
+ filterAllIn, takeOne, takeAll, takeWhileTrue, takeWhileMTrue, dropAll,
+ dropWhileTrue, dropWhileMTrue,
+ findIndices, elemIndices
+ :: Monad m
+ => Stream m Int -> m ()
+
+{-# INLINE mapMaybeM #-}
+mapMaybeM :: S.MonadAsync m => Stream m Int -> m ()
+
+{-# INLINE mapM #-}
+mapM :: (S.IsStream t, S.MonadAsync m)
+ => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
+
+{-# INLINE sequence #-}
+sequence :: (S.IsStream t, S.MonadAsync m)
+ => (t m Int -> S.SerialT m Int) -> t m (m Int) -> m ()
+
scan = transform . S.scanl' (+) 0
fmap = transform . Prelude.fmap (+1)
map = transform . S.map (+1)
@@ -226,11 +261,22 @@ takeWhileMTrue = transform . S.takeWhileM (return . (<= maxValue))
dropAll = transform . S.drop maxValue
dropWhileTrue = transform . S.dropWhile (<= maxValue)
dropWhileMTrue = transform . S.dropWhileM (return . (<= maxValue))
+findIndices = transform . S.findIndices (== maxValue)
+elemIndices = transform . S.elemIndices maxValue
-------------------------------------------------------------------------------
-- Zipping and concat
-------------------------------------------------------------------------------
+{-# INLINE zip #-}
+{-# INLINE zipM #-}
+{-# INLINE concat #-}
+zip, zipM, concat :: Monad m => Stream m Int -> m ()
+
+{-# INLINE zipAsync #-}
+{-# INLINE zipAsyncM #-}
+zipAsync, zipAsyncM :: S.MonadAsync m => Stream m Int -> m ()
+
zip src = do
r <- S.tail src
let src1 = fromJust r
@@ -257,6 +303,16 @@ concat _n = return ()
compose :: Monad m => (Stream m Int -> Stream m Int) -> Stream m Int -> m ()
compose f = transform . f . f . f . f
+{-# INLINE composeMapM #-}
+{-# INLINE composeAllInFilters #-}
+{-# INLINE composeAllOutFilters #-}
+{-# INLINE composeMapAllInFilter #-}
+composeAllInFilters, composeAllOutFilters,
+ composeMapAllInFilter
+ :: Monad m
+ => Stream m Int -> m ()
+composeMapM :: S.MonadAsync m => Stream m Int -> m ()
+
composeMapM = compose (S.mapM return)
composeAllInFilters = compose (S.filter (<= maxValue))
composeAllOutFilters = compose (S.filter (> maxValue))
diff --git a/benchmark/StreamDOps.hs b/benchmark/StreamDOps.hs
index 4d16038..0486931 100644
--- a/benchmark/StreamDOps.hs
+++ b/benchmark/StreamDOps.hs
@@ -9,17 +9,14 @@
module StreamDOps where
--- import Prelude
- -- (Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=),
- -- subtract, undefined, Maybe(..))
import Prelude
- (Monad, Int, (+), (.), return, (>), even, (<=),
- Maybe(..), not)
+ (Monad, Int, (+), ($), (.), return, (>), even, (<=),
+ subtract, undefined, Maybe(..), not)
import qualified Streamly.Streams.StreamD as S
value, maxValue :: Int
-value = 1000000
+value = 100000
maxValue = value + 1000
-------------------------------------------------------------------------------
@@ -28,34 +25,32 @@ maxValue = value + 1000
{-# INLINE uncons #-}
{-# INLINE nullHeadTail #-}
--- {-# INLINE scan #-}
+{-# INLINE scan #-}
{-# INLINE map #-}
{-# INLINE filterEven #-}
{-# INLINE filterAllOut #-}
{-# INLINE filterAllIn #-}
{-# INLINE takeOne #-}
{-# INLINE takeAll #-}
-{-
{-# INLINE takeWhileTrue #-}
{-# INLINE dropAll #-}
{-# INLINE dropWhileTrue #-}
{-# INLINE zip #-}
+{-
{-# INLINE concat #-}
+-}
{-# INLINE composeAllInFilters #-}
{-# INLINE composeAllOutFilters #-}
{-# INLINE composeMapAllInFilter #-}
--}
-uncons, nullHeadTail, map, filterEven, filterAllOut,
- filterAllIn, takeOne, takeAll -- takeWhileTrue, dropAll, dropWhileTrue, zip,
- -- concat, composeAllInFilters, composeAllOutFilters,
- -- composeMapAllInFilter
+uncons, nullHeadTail, map, scan, filterEven, filterAllOut,
+ filterAllIn, takeOne, takeAll, takeWhileTrue, dropAll, dropWhileTrue, zip,
+ -- concat,
+ composeAllInFilters, composeAllOutFilters, composeMapAllInFilter
:: Monad m
=> Stream m Int -> m ()
-{-
{-# INLINE composeMapM #-}
-composeMapM :: S.MonadAsync m => Stream m Int -> m ()
--}
+composeMapM :: Monad m => Stream m Int -> m ()
{-# INLINE toList #-}
toList :: Monad m => Stream m Int -> m [Int]
@@ -140,7 +135,7 @@ last = S.last
transform :: Monad m => Stream m a -> m ()
transform = runStream
--- scan = transform . S.scanl' (+) 0
+scan = transform . S.scanlM' (\a b -> return (a + b)) 0
map = transform . S.map (+1)
mapM = transform . S.mapM return
filterEven = transform . S.filter even
@@ -148,7 +143,6 @@ filterAllOut = transform . S.filter (> maxValue)
filterAllIn = transform . S.filter (<= maxValue)
takeOne = transform . S.take 1
takeAll = transform . S.take maxValue
-{-
takeWhileTrue = transform . S.takeWhile (<= maxValue)
dropAll = transform . S.drop maxValue
dropWhileTrue = transform . S.dropWhile (<= maxValue)
@@ -158,7 +152,7 @@ dropWhileTrue = transform . S.dropWhile (<= maxValue)
-------------------------------------------------------------------------------
zip src = transform $ (S.zipWith (,) src src)
-concat _n = return ()
+-- concat _n = return ()
-------------------------------------------------------------------------------
-- Composition
@@ -171,7 +165,7 @@ compose f = transform . f . f . f . f
composeMapM = compose (S.mapM return)
composeAllInFilters = compose (S.filter (<= maxValue))
composeAllOutFilters = compose (S.filter (> maxValue))
-composeMapAllInFilter = compose (S.filter (<= maxValue) . fmap (subtract 1))
+composeMapAllInFilter = compose (S.filter (<= maxValue) . S.map (subtract 1))
{-# INLINABLE composeScaling #-}
composeScaling :: Monad m => Int -> Stream m Int -> m ()
@@ -183,4 +177,3 @@ composeScaling m =
4 -> transform . f . f . f . f
_ -> undefined
where f = S.filter (<= maxValue)
- -}
diff --git a/benchmark/StreamKOps.hs b/benchmark/StreamKOps.hs
index 3d0fb3e..ddb1b5a 100644
--- a/benchmark/StreamKOps.hs
+++ b/benchmark/StreamKOps.hs
@@ -18,7 +18,7 @@ import qualified Streamly.Streams.Prelude as S
import qualified Streamly.SVar as S
value, maxValue :: Int
-value = 1000000
+value = 100000
maxValue = value + 1000
-------------------------------------------------------------------------------
@@ -130,6 +130,24 @@ uncons s = do
Nothing -> return ()
Just (_, t) -> uncons t
+{-# INLINE init #-}
+init :: (Monad m, S.IsStream t) => t m a -> m ()
+init s = do
+ r <- S.init s
+ case r of
+ Nothing -> return ()
+ Just x -> S.runStream x
+
+{-# INLINE tail #-}
+tail :: (Monad m, S.IsStream t) => t m a -> m ()
+tail s = do
+ r <- S.tail s
+ case r of
+ Nothing -> return ()
+ Just x -> tail x
+
+-- | If the stream is not null get its head and tail and then do the same to
+-- the tail.
nullHeadTail s = do
r <- S.null s
if not r
diff --git a/examples/AcidRain.hs b/examples/AcidRain.hs
index 7ad9372..686bdd3 100644
--- a/examples/AcidRain.hs
+++ b/examples/AcidRain.hs
@@ -4,16 +4,15 @@
-- https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html
import Streamly
-import Control.Concurrent (threadDelay)
+import Streamly.Prelude as S
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.State (MonadState, get, modify, runStateT)
-import Data.Semigroup (cycle1)
data Event = Harm Int | Heal Int | Quit deriving (Show)
-userAction :: MonadIO m => SerialT m Event
-userAction = cycle1 $ liftIO askUser
+userAction :: MonadAsync m => SerialT m Event
+userAction = S.repeatM $ liftIO askUser
where
askUser = do
command <- getLine
@@ -22,8 +21,8 @@ userAction = cycle1 $ liftIO askUser
"quit" -> return Quit
_ -> putStrLn "What?" >> askUser
-acidRain :: MonadIO m => SerialT m Event
-acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1)
+acidRain :: MonadAsync m => SerialT m Event
+acidRain = asyncly $ constRate 1 $ S.repeatM $ liftIO $ return $ Harm 1
game :: (MonadAsync m, MonadState Int m) => SerialT m ()
game = do
diff --git a/examples/CirclingSquare.hs b/examples/CirclingSquare.hs
index f2814f6..eb9add9 100644
--- a/examples/CirclingSquare.hs
+++ b/examples/CirclingSquare.hs
@@ -9,8 +9,7 @@
import Data.IORef
import Graphics.UI.SDL as SDL
import Streamly
-import Streamly.Prelude (yieldM)
-import Streamly.Time
+import Streamly.Prelude as S
------------------------------------------------------------------------------
-- SDL Graphics Init
@@ -40,7 +39,7 @@ display (playerX, playerY) = do
-- Paint small red square, at an angle 'angle' with respect to the center
foreC <- mapRGB format 212 108 73
- let side = 10
+ let side = 20
x = round playerX
y = round playerY
_ <- fillRect screen (Just (Rect x y side side)) foreC
@@ -52,40 +51,34 @@ display (playerX, playerY) = do
-- Wait and update Controller Position if it changes
------------------------------------------------------------------------------
-refreshRate :: Int
-refreshRate = 40
-
updateController :: IORef (Double, Double) -> IO ()
-updateController ref = periodic refreshRate $ do
- e <- pollEvent
- case e of
- MouseMotion x y _ _ -> do
- writeIORef ref (fromIntegral x, fromIntegral y)
- _ -> return ()
+updateController ref = do
+ e <- pollEvent
+ case e of
+ MouseMotion x y _ _ -> do
+ writeIORef ref (fromIntegral x, fromIntegral y)
+ _ -> return ()
------------------------------------------------------------------------------
-- Periodically refresh the output display
------------------------------------------------------------------------------
updateDisplay :: IORef (Double, Double) -> IO ()
-updateDisplay cref = withClock clock refreshRate displaySquare
+updateDisplay cref = do
+ time <- SDL.getTicks
+ (x, y) <- readIORef cref
+ let t = (fromIntegral time) * speed / 1000
+ in display (x + cos t * radius, y + sin t * radius)
where
- clock = do
- t <- SDL.getTicks
- return ((fromIntegral t) * 1000)
-
- speed = 8
- radius = 30
- displaySquare time = do
- (x, y) <- readIORef cref
- let t = (fromIntegral time) * speed / 1000000
- in display (x + cos t * radius, y + sin t * radius)
+ speed = 6
+ radius = 60
main :: IO ()
main = do
- sdlInit
- cref <- newIORef (0,0)
- runStream $ yieldM (updateController cref)
- `parallel` yieldM (updateDisplay cref)
+ sdlInit
+ cref <- newIORef (0,0)
+ runStream $ asyncly $ constRate 40
+ $ S.repeatM (updateController cref)
+ `parallel` S.repeatM (updateDisplay cref)
diff --git a/src/Streamly.hs b/src/Streamly.hs
index 37795cb..d3c8367 100644
--- a/src/Streamly.hs
+++ b/src/Streamly.hs
@@ -113,6 +113,14 @@ module Streamly
, maxThreads
, maxBuffer
+ -- * Rate Limiting
+ , Rate (..)
+ , rate
+ , avgRate
+ , minRate
+ , maxRate
+ , constRate
+
-- * Folding Containers of Streams
-- $foldutils
, foldWith
@@ -172,8 +180,8 @@ import Streamly.Streams.Ahead
import Streamly.Streams.Parallel
import Streamly.Streams.Zip
import Streamly.Streams.Prelude
-import Streamly.Streams.SVar (maxThreads, maxBuffer)
-import Streamly.SVar (MonadAsync)
+import Streamly.Streams.SVar
+import Streamly.SVar (MonadAsync, Rate (..))
import Data.Semigroup (Semigroup(..))
import qualified Streamly.Streams.StreamD as D
@@ -296,16 +304,21 @@ runZipAsync = runStream . K.adapt
-- which can be used to combine two streams in a predetermined way irrespective
-- of the type.
+-- XXX An alternative design choice would be to let a control parameter affect
+-- the nearest SVar only and then it gets cleared. The benefit of the current
+-- choice is that it is simply just like global configuration, just like state
+-- behaves, so should be easy to comprehend. But it has the downside of leaking
+-- to undesired actions, that is we can forget to reset it.
+--
-- $concurrency
--
--- These combinators can be used at any point in a stream composition to
--- control the concurrency of the enclosed stream. When the combinators are
--- used in a nested manner, the nearest enclosing combinator overrides the
--- outer ones. These combinators have no effect on 'Parallel' streams,
--- concurrency for 'Parallel' streams is always unbounded.
--- Note that the use of these combinators does not enable concurrency, to
--- enable concurrency you have to use one of the concurrent stream type
--- combinators.
+-- These combinators can be used at any point in a stream composition to set
+-- parameters to control the concurrency of the enclosed stream. A parameter
+-- set at any point remains effective for any concurrent combinators used
+-- downstream until it is reset. These control parameters have no effect on
+-- non-concurrent combinators in the stream, or on non-concurrent streams. They
+-- also do not affect 'Parallel' streams, as concurrency for 'Parallel' streams
+-- is always unbounded.
-- $adapters
--
diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs
index 01abb44..a245efa 100644
--- a/src/Streamly/Prelude.hs
+++ b/src/Streamly/Prelude.hs
@@ -82,22 +82,37 @@ module Streamly.Prelude
-- * Elimination
-- ** General Folds
, foldr
+ , foldr1
, foldrM
, foldl'
+ , foldl1'
, foldlM'
, foldx
, foldxM
-- ** Specialized Folds
- , null
+
+ -- Filtering folds: extract parts of the stream
, head
, tail
, last
+ , init
+
+ -- Conditional folds: may terminate early based on a condition
+ , null
, elem
+ , elemIndex
, notElem
- , length
+ , lookup
+ , find
+ , findIndex
, all
, any
+ , and
+ , or
+
+ -- Full folds - need to go through all elements
+ , length
, maximum
, minimum
, sum
@@ -112,7 +127,14 @@ module Streamly.Prelude
, toHandle
-- * Transformation
- -- ** By folding (scans)
+ -- ** Mapping
+ , Serial.map
+ , mapM
+ , sequence
+
+ -- ** Scanning
+ -- | Scan is a transformation by continuously folding the result with the
+ -- next element of the stream.
, scanl'
, scanlM'
, scanx
@@ -127,18 +149,20 @@ module Streamly.Prelude
, dropWhile
, dropWhileM
- -- ** Mapping
- , Serial.map
- , mapM
- , sequence
+ -- ** Inserting
+ , intersperseM
+
+ -- ** Reordering
+ , reverse
+
+ -- ** Indices
+ , findIndices
+ , elemIndices
-- ** Map and Filter
, mapMaybe
, mapMaybeM
- -- ** Reordering
- , reverse
-
-- * Zipping
, zipWith
, zipWithM
@@ -160,7 +184,7 @@ import Prelude
hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem,
notElem, maximum, minimum, head, last, tail, length, null,
- reverse, iterate)
+ reverse, iterate, init, and, or, lookup, foldr1)
import qualified Prelude
import qualified System.IO as IO
@@ -286,10 +310,29 @@ unfoldrMSerial step seed = fromStreamS (S.unfoldrM step seed)
-- Specialized Generation
------------------------------------------------------------------------------
+-- Faster than yieldM because there is no bind. Usually we can construct a
+-- stream from a pure value using "pure" in an applicative, however in case of
+-- Zip streams pure creates an infinite stream.
+--
+-- | Create a singleton stream from a pure value. In monadic streams, 'pure' or
+-- 'return' can be used in place of 'yield', however, in Zip applicative
+-- streams 'pure' is equivalent to 'repeat'.
+--
+-- @since 0.4.0
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield a = K.yield a
+-- | Create a singleton stream from a monadic action. Same as @m \`consM` nil@
+-- but more efficient.
+--
+-- @
+-- > toList $ yieldM getLine
+-- hello
+-- ["hello"]
+-- @
+--
+-- @since 0.4.0
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM m = K.yieldM m
@@ -450,6 +493,14 @@ foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
-- foldr step acc m = S.foldr step acc $ S.fromStreamK (toStream m)
foldr f = foldrM (\a b -> return (f a b))
+-- | Right fold, for non-empty streams, using first element as the starting
+-- value. Returns 'Nothing' if the stream is empty.
+--
+-- @since 0.5.0
+{-# INLINE foldr1 #-}
+foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
+foldr1 = K.foldr1
+
-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
-- argument) to the folded value at the end. This is designed to work with the
@@ -473,6 +524,19 @@ foldl = foldx
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
foldl' step begin m = S.foldl' step begin $ toStreamS m
+-- | Strict left fold, for non-empty streams, using first element as the
+-- starting value. Returns 'Nothing' if the stream is empty.
+--
+-- @since 0.5.0
+foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
+foldl1' step m = do
+ r <- uncons m
+ case r of
+ Nothing -> return Nothing
+ Just (h, t) -> do
+ res <- foldl' step h t
+ return $ Just res
+
-- XXX replace the recursive "go" with explicit continuations.
-- | Like 'foldx', but with a monadic step function.
--
@@ -517,6 +581,13 @@ head m = K.head m
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
tail m = K.tail (K.adapt m)
+-- | Extract all but the last element of the stream, if any.
+--
+-- @since 0.5.0
+{-# INLINE init #-}
+init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
+init m = K.init (K.adapt m)
+
-- | Extract the last element of the stream, if any.
--
-- @since 0.1.1
@@ -559,6 +630,20 @@ all p m = S.all p (toStreamS m)
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
any p m = S.any p (toStreamS m)
+-- | Determines if all elements of a boolean stream are True.
+--
+-- @since 0.5.0
+{-# INLINE and #-}
+and :: Monad m => SerialT m Bool -> m Bool
+and = all (==True)
+
+-- | Determines wheter at least one element of a boolean stream is True.
+--
+-- @since 0.5.0
+{-# INLINE or #-}
+or :: Monad m => SerialT m Bool -> m Bool
+or = any (==True)
+
-- | Determine the sum of all elements of a stream of numbers
--
-- @since 0.1.0
@@ -587,6 +672,51 @@ minimum m = S.minimum (toStreamS m)
maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
maximum m = S.maximum (toStreamS m)
+-- | Looks the given key up, treating the given stream as an association list.
+--
+-- @since 0.5.0
+{-# INLINE lookup #-}
+lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b)
+lookup = K.lookup
+
+-- | Returns the first element of the stream satisfying the given predicate,
+-- if any.
+--
+-- @since 0.5.0
+{-# INLINE find #-}
+find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
+find = K.find
+
+-- | Finds all the indices of elements satisfying the given predicate.
+--
+-- @since 0.5.0
+{-# INLINE findIndices #-}
+findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int
+findIndices = K.findIndices
+
+-- | Gives the index of the first stream element satisfying the given
+-- preficate.
+--
+-- @since 0.5.0
+{-# INLINE findIndex #-}
+findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
+findIndex p = head . findIndices p
+
+-- | Finds the index of all elements in the stream which are equal to the
+-- given.
+--
+-- @since 0.5.0
+{-# INLINE elemIndices #-}
+elemIndices :: (IsStream t, Eq a) => a -> t m a -> t m Int
+elemIndices a = findIndices (==a)
+
+-- | Gives the first index of an element in the stream, which equals the given.
+--
+-- @since 0.5.0
+{-# INLINE elemIndex #-}
+elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
+elemIndex a = findIndex (==a)
+
------------------------------------------------------------------------------
-- Map and Fold
------------------------------------------------------------------------------
@@ -689,7 +819,8 @@ filterM p m = fromStreamD $ D.filterM p $ toStreamD m
-- @since 0.1.0
{-# INLINE take #-}
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
-take n m = fromStreamS $ S.take n $ toStreamS (maxYields (Just n) m)
+take n m = fromStreamS $ S.take n $ toStreamS
+ (maxYields (Just (fromIntegral n)) m)
-- | End the stream as soon as the predicate fails on an element.
--
@@ -813,6 +944,18 @@ reverse m = fromStream $ go K.nil (toStream m)
in K.unStream rest (rstState st) stop single yieldk
------------------------------------------------------------------------------
+-- Transformation by Inserting
+------------------------------------------------------------------------------
+
+-- | Generate a stream by performing the monadic action inbetween all elements
+-- of the given stream.
+--
+-- @since 0.5.0
+{-# INLINE intersperseM #-}
+intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
+intersperseM = K.intersperseM
+
+------------------------------------------------------------------------------
-- Zipping
------------------------------------------------------------------------------
diff --git a/src/Streamly/SVar.hs b/src/Streamly/SVar.hs
index 4bfcfdf..01d1632 100644
--- a/src/Streamly/SVar.hs
+++ b/src/Streamly/SVar.hs
@@ -1,12 +1,15 @@
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE KindSignatures #-}
-{-# LANGUAGE ConstraintKinds #-}
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MagicHash #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE UnboxedTuples #-}
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE KindSignatures #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MagicHash #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE UnboxedTuples #-}
-- |
-- Module : Streamly.SVar
@@ -18,50 +21,89 @@
-- Portability : GHC
--
--
+#ifdef DIAGNOSTICS_VERBOSE
+#define DIAGNOSTICS
+#endif
+
module Streamly.SVar
(
MonadAsync
- , SVar (..)
, SVarStyle (..)
- , defaultMaxBuffer
- , defaultMaxThreads
- , State (..)
+ , SVar (..)
+
+ -- State threaded around the stream
+ , Limit (..)
+ , State (streamVar)
, defState
, rstState
-
+ , getMaxThreads
+ , setMaxThreads
+ , getMaxBuffer
+ , setMaxBuffer
+ , getStreamRate
+ , setStreamRate
+ , setStreamLatency
+ , getYieldLimit
+ , setYieldLimit
+
+ , cleanupSVar
+ , cleanupSVarFromWorker
+
+ -- SVar related
, newAheadVar
, newParallelVar
- , toStreamVar
-
, atomicModifyIORefCAS
+ , WorkerInfo (..)
+ , YieldRateInfo (..)
+ , ThreadAbort (..)
, ChildEvent (..)
, AheadHeapEntry (..)
+ , send
, sendYield
, sendStop
, enqueueLIFO
- , workLoopLIFO
- , workLoopFIFO
, enqueueFIFO
, enqueueAhead
+ , reEnqueueAhead
, pushWorkerPar
, queueEmptyAhead
, dequeueAhead
, dequeueFromHeap
+ , Rate (..)
+ , getYieldRateInfo
+ , collectLatency
+ , workerUpdateLatency
+ , isBeyondMaxRate
+ , workerRateControl
+ , updateYieldCount
+ , decrementYieldLimit
+ , decrementYieldLimitPost
+ , incrementYieldLimit
, postProcessBounded
+ , postProcessPaced
, readOutputQBounded
- , sendWorker
+ , readOutputQPaced
+ , dispatchWorkerPaced
+ , sendFirstWorker
, delThread
+
+ , toStreamVar
+ , SVarStats (..)
+ , NanoSecs (..)
+#ifdef DIAGNOSTICS
+ , dumpSVar
+#endif
)
where
import Control.Concurrent
- (ThreadId, myThreadId, threadDelay, getNumCapabilities)
+ (ThreadId, myThreadId, threadDelay, getNumCapabilities, throwTo)
import Control.Concurrent.MVar
- (MVar, newEmptyMVar, tryPutMVar, takeMVar)
-import Control.Exception (SomeException(..), catch, mask)
+ (MVar, newEmptyMVar, tryPutMVar, takeMVar, newMVar)
+import Control.Exception (SomeException(..), catch, mask, assert, Exception)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
@@ -69,17 +111,19 @@ import Control.Monad.Trans.Control (MonadBaseControl, control)
import Data.Atomics
(casIORef, readForCAS, peekTicket, atomicModifyIORefCAS_,
writeBarrier, storeLoadBarrier)
-import Data.Concurrent.Queue.MichaelScott
- (LinkedQueue, pushL, tryPopR)
+import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
+import Data.Int (Int64)
import Data.IORef
- (IORef, modifyIORef, newIORef, readIORef, atomicModifyIORef)
+ (IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
+import Data.List ((\\))
import Data.Maybe (fromJust)
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
+import System.Clock (TimeSpec, Clock(Monotonic), getTime, toNanoSecs)
import qualified Data.Heap as H
import qualified Data.Set as S
@@ -94,14 +138,44 @@ import Control.Concurrent.MVar (tryTakeMVar)
import Control.Exception
(catches, throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
-import Data.IORef (writeIORef)
import System.IO (hPutStrLn, stderr)
+import Text.Printf (printf)
#endif
+-- Always use signed arithmetic to avoid inadvertant overflows of signed values
+-- on conversion when comparing unsigned quantities with signed.
+newtype NanoSecs = NanoSecs Int64
+ deriving ( Eq
+ , Read
+ , Show
+ , Enum
+ , Bounded
+ , Num
+ , Real
+ , Integral
+ , Ord
+ )
+
+newtype Count = Count Int64
+ deriving ( Eq
+ , Read
+ , Show
+ , Enum
+ , Bounded
+ , Num
+ , Real
+ , Integral
+ , Ord
+ )
+
------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------
+data ThreadAbort = ThreadAbort deriving Show
+
+instance Exception ThreadAbort
+
-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
ChildYield a
@@ -148,61 +222,198 @@ data SVarStyle =
-- New work is enqueued either at the time of creation of the SVar or as a
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
-- already enqueued computations get evaluated. See 'joinStreamVarAsync'.
+
+-- We measure the individual worker latencies to estimate the number of workers
+-- needed or the amount of time we have to sleep between dispatches to achieve
+-- a particular rate when controlled pace mode it used.
+data WorkerInfo = WorkerInfo
+ { workerYieldMax :: Count -- 0 means unlimited
+ -- total number of yields by the worker till now
+ , workerYieldCount :: IORef Count
+ -- yieldCount at start, timestamp
+ , workerLatencyStart :: IORef (Count, TimeSpec)
+ }
+
+
+-- | Specifies the stream yield rate in yields per second (@Hertz@).
+-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
+-- allow only as many yields as we have accumulated as per 'rateGoal' since the
+-- start of time. If the consumer or the producer is slower or faster, the
+-- actual rate may fall behind or exceed 'rateGoal'. We try to recover the gap
+-- between the two by increasing or decreasing the pull rate from the producer.
+-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
+-- much as 'rateBuffer'.
+--
+-- 'rateLow' puts a bound on how low the instantaneous rate can go when
+-- recovering the rate gap. In other words, it determines the maximum yield
+-- latency. Similarly, 'rateHigh' puts a bound on how high the instantaneous
+-- rate can go when recovering the rate gap. In other words, it determines the
+-- minimum yield latency. We reduce the latency by increasing concurrency,
+-- therefore we can say that it puts an upper bound on concurrency.
--
--- XXX can we use forall t m.
-data SVar t m a =
- SVar {
- -- Read only state
- svarStyle :: SVarStyle
-
- -- Shared output queue (events, length)
- , outputQueue :: IORef ([ChildEvent a], Int)
- , maxYieldLimit :: Maybe (IORef Int)
- , outputDoorBell :: MVar () -- signal the consumer about output
- , readOutputQ :: m [ChildEvent a]
- , postProcess :: m Bool
-
- -- Used only by bounded SVar types
- , enqueue :: t m a -> IO ()
- , isWorkDone :: IO Bool
- , needDoorBell :: IORef Bool
- , workLoop :: m ()
-
- -- Shared, thread tracking
- , workerThreads :: IORef (Set ThreadId)
- , workerCount :: IORef Int
- , accountThread :: ThreadId -> m ()
+-- If the 'rateGoal' is 0 or negative the stream never yields a value.
+-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
+--
+-- @since 0.5.0
+data Rate = Rate
+ { rateLow :: Double -- ^ The lower rate limit
+ , rateGoal :: Double -- ^ The target rate we want to achieve
+ , rateHigh :: Double -- ^ The upper rate limit
+ , rateBuffer :: Int -- ^ Maximum slack from the goal
+ }
+
+data LatencyRange = LatencyRange
+ { minLatency :: NanoSecs
+ , maxLatency :: NanoSecs
+ } deriving Show
+
+-- Rate control.
+data YieldRateInfo = YieldRateInfo
+ { svarLatencyTarget :: NanoSecs
+ , svarLatencyRange :: LatencyRange
+ , svarRateBuffer :: Int
+ , svarGainedLostYields :: IORef Count
+
+ -- Actual latency/througput as seen from the consumer side, we count the
+ -- yields and the time it took to generates those yields. This is used to
+ -- increase or decrease the number of workers needed to achieve the desired
+ -- rate. The idle time of workers is adjusted in this, so that we only
+ -- account for the rate when the consumer actually demands data.
+ -- XXX interval latency is enough, we can move this under diagnostics build
+ , svarAllTimeLatency :: IORef (Count, TimeSpec)
+
+ -- XXX Worker latency specified by the user to be used before the first
+ -- actual measurement arrives. Not yet implemented
+ , workerBootstrapLatency :: Maybe NanoSecs
+
+ -- After how many yields the worker should update the latency information.
+ -- If the latency is high, this count is kept lower and vice-versa. XXX If
+ -- the latency suddenly becomes too high this count may remain too high for
+ -- long time, in such cases the consumer can change it.
+ -- 0 means no latency computation
+ -- XXX this is derivable from workerMeasuredLatency, can be removed.
+ , workerPollingInterval :: IORef Count
+
+ -- This is in progress latency stats maintained by the workers which we
+ -- empty into workerCollectedLatency stats at certain intervals - whenever
+ -- we process the stream elements yielded in this period.
+ -- (yieldCount, timeTaken)
+ , workerPendingLatency :: IORef (Count, NanoSecs)
+
+ -- This is the second level stat which is an accmulation from
+ -- workerPendingLatency stats. We keep accumulating latencies in this
+ -- bucket until we have stats for a sufficient period and then we reset it
+ -- to start collecting for the next period and retain the computed average
+ -- latency for the last period in workerMeasuredLatency.
+ -- (yieldCount, timeTaken)
+ , workerCollectedLatency :: IORef (Count, NanoSecs)
+
+ -- Latency as measured by workers, aggregated for the last period.
+ , workerMeasuredLatency :: IORef NanoSecs
+ }
+
+data SVarStats = SVarStats {
+ totalDispatches :: IORef Int
+ , maxWorkers :: IORef Int
+ , maxOutQSize :: IORef Int
+ , maxHeapSize :: IORef Int
+ , maxWorkQSize :: IORef Int
+ , avgWorkerLatency :: IORef (Count, NanoSecs)
+ , minWorkerLatency :: IORef NanoSecs
+ , maxWorkerLatency :: IORef NanoSecs
+ , svarStopTime :: IORef (Maybe TimeSpec)
+}
+
+data Limit = Unlimited | Limited Word deriving Show
+
+data SVar t m a = SVar
+ {
+ -- Read only state
+ svarStyle :: SVarStyle
+
+ -- Shared output queue (events, length)
+ , outputQueue :: IORef ([ChildEvent a], Int)
+ , outputDoorBell :: MVar () -- signal the consumer about output
+ , readOutputQ :: m [ChildEvent a]
+ , postProcess :: m Bool
+
+ -- Combined/aggregate parameters
+ , maxWorkerLimit :: Limit
+ , maxBufferLimit :: Limit
+ , remainingYields :: Maybe (IORef Count)
+ , yieldRateInfo :: Maybe YieldRateInfo
+
+ -- Used only by bounded SVar types
+ , enqueue :: t m a -> IO ()
+ , isWorkDone :: IO Bool
+ , needDoorBell :: IORef Bool
+ , workLoop :: WorkerInfo -> m ()
+
+ -- Shared, thread tracking
+ , workerThreads :: IORef (Set ThreadId)
+ , workerCount :: IORef Int
+ , accountThread :: ThreadId -> m ()
+ , workerStopMVar :: MVar ()
+
+ , svarStats :: SVarStats
+ -- to track garbage collection of SVar
+ , svarRef :: Maybe (IORef ())
#ifdef DIAGNOSTICS
- , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a))
- , Int
- )
- -- Shared work queue (stream, seqNo)
- , aheadWorkQueue :: IORef ([t m a], Int)
- , totalDispatches :: IORef Int
- , maxWorkers :: IORef Int
- , maxOutQSize :: IORef Int
- , maxHeapSize :: IORef Int
- , maxWorkQSize :: IORef Int
+ , svarCreator :: ThreadId
+ , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)) , Int)
+ -- Shared work queue (stream, seqNo)
+ , aheadWorkQueue :: IORef ([t m a], Int)
#endif
- }
+ }
+-------------------------------------------------------------------------------
+-- State for concurrency control
+-------------------------------------------------------------------------------
+
+-- XXX we can put the resettable fields in a oneShotConfig field and others in
+-- a persistentConfig field. That way reset would be fast and scalable
+-- irrespective of the number of fields.
+--
+-- XXX make all these Limited types and use phantom types to distinguish them
data State t m a = State
- { streamVar :: Maybe (SVar t m a)
- , yieldLimit :: Maybe Int
- , threadsHigh :: Int
- , bufferHigh :: Int
+ { -- one shot configuration, automatically reset for each API call
+ streamVar :: Maybe (SVar t m a)
+ , _yieldLimit :: Maybe Count
+
+ -- persistent configuration, state that remains valid until changed by
+ -- an explicit setting via a combinator.
+ , _threadsHigh :: Limit
+ , _bufferHigh :: Limit
+ -- XXX these two can be collapsed into a single type
+ , _streamLatency :: Maybe NanoSecs -- bootstrap latency
+ , _maxStreamRate :: Maybe Rate
}
-defaultMaxThreads, defaultMaxBuffer :: Int
-defaultMaxThreads = 1500
-defaultMaxBuffer = 1500
+-------------------------------------------------------------------------------
+-- State defaults and reset
+-------------------------------------------------------------------------------
+
+-- A magical value for the buffer size arrived at by running the smallest
+-- possible task and measuring the optimal value of the buffer for that. This
+-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
+-- core-i7 processor.
+magicMaxBuffer :: Word
+magicMaxBuffer = 1500
+
+defaultMaxThreads, defaultMaxBuffer :: Limit
+defaultMaxThreads = Limited magicMaxBuffer
+defaultMaxBuffer = Limited magicMaxBuffer
+-- The fields prefixed by an _ are not to be accessed or updated directly but
+-- via smart accessor APIs.
defState :: State t m a
defState = State
{ streamVar = Nothing
- , yieldLimit = Nothing
- , threadsHigh = defaultMaxThreads
- , bufferHigh = defaultMaxBuffer
+ , _yieldLimit = Nothing
+ , _threadsHigh = defaultMaxThreads
+ , _bufferHigh = defaultMaxBuffer
+ , _maxStreamRate = Nothing
+ , _streamLatency = Nothing
}
-- XXX if perf gets affected we can have all the Nothing params in a single
@@ -215,14 +426,229 @@ defState = State
rstState :: State t m a -> State t m b
rstState st = st
{ streamVar = Nothing
- , yieldLimit = Nothing
+ , _yieldLimit = Nothing
}
+-------------------------------------------------------------------------------
+-- Smart get/set routines for State
+-------------------------------------------------------------------------------
+
+-- Use get/set routines instead of directly accessing the State fields
+setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
+setYieldLimit lim st =
+ st { _yieldLimit =
+ case lim of
+ Nothing -> Nothing
+ Just n ->
+ if n <= 0
+ then Just 0
+ else Just (fromIntegral n)
+ }
+
+getYieldLimit :: State t m a -> Maybe Count
+getYieldLimit = _yieldLimit
+
+setMaxThreads :: Int -> State t m a -> State t m a
+setMaxThreads n st =
+ st { _threadsHigh =
+ if n < 0
+ then Unlimited
+ else if n == 0
+ then defaultMaxThreads
+ else Limited (fromIntegral n)
+ }
+
+getMaxThreads :: State t m a -> Limit
+getMaxThreads = _threadsHigh
+
+setMaxBuffer :: Int -> State t m a -> State t m a
+setMaxBuffer n st =
+ st { _bufferHigh =
+ if n < 0
+ then Unlimited
+ else if n == 0
+ then defaultMaxBuffer
+ else Limited (fromIntegral n)
+ }
+
+getMaxBuffer :: State t m a -> Limit
+getMaxBuffer = _bufferHigh
+
+setStreamRate :: Maybe Rate -> State t m a -> State t m a
+setStreamRate r st = st { _maxStreamRate = r }
+
+getStreamRate :: State t m a -> Maybe Rate
+getStreamRate = _maxStreamRate
+
+setStreamLatency :: Int -> State t m a -> State t m a
+setStreamLatency n st =
+ st { _streamLatency =
+ if n < 0
+ then Nothing
+ else if n == 0
+ then Nothing
+ else Just (fromIntegral n)
+ }
+
+getStreamLatency :: State t m a -> Maybe NanoSecs
+getStreamLatency = _streamLatency
+
+-------------------------------------------------------------------------------
+-- Cleanup
+-------------------------------------------------------------------------------
+
+cleanupSVar :: SVar t m a -> IO ()
+cleanupSVar sv = do
+ workers <- readIORef (workerThreads sv)
+ Prelude.mapM_ (\tid -> throwTo tid ThreadAbort)
+ (S.toList workers)
+
+cleanupSVarFromWorker :: SVar t m a -> IO ()
+cleanupSVarFromWorker sv = do
+ workers <- readIORef (workerThreads sv)
+ self <- myThreadId
+ mapM_ (\tid -> throwTo tid ThreadAbort)
+ (S.toList workers \\ [self])
+
+-------------------------------------------------------------------------------
+-- Dumping the SVar for debug/diag
+-------------------------------------------------------------------------------
+
#ifdef DIAGNOSTICS
+-- | Convert a number of seconds to a string. The string will consist
+-- of four decimal places, followed by a short description of the time
+-- units.
+secs :: Double -> String
+secs k
+ | k < 0 = '-' : secs (-k)
+ | k >= 1 = k `with` "s"
+ | k >= 1e-3 = (k*1e3) `with` "ms"
+#ifdef mingw32_HOST_OS
+ | k >= 1e-6 = (k*1e6) `with` "us"
+#else
+ | k >= 1e-6 = (k*1e6) `with` "μs"
+#endif
+ | k >= 1e-9 = (k*1e9) `with` "ns"
+ | k >= 1e-12 = (k*1e12) `with` "ps"
+ | k >= 1e-15 = (k*1e15) `with` "fs"
+ | k >= 1e-18 = (k*1e18) `with` "as"
+ | otherwise = printf "%g s" k
+ where with (t :: Double) (u :: String)
+ | t >= 1e9 = printf "%.4g %s" t u
+ | t >= 1e3 = printf "%.0f %s" t u
+ | t >= 1e2 = printf "%.1f %s" t u
+ | t >= 1e1 = printf "%.2f %s" t u
+ | otherwise = printf "%.3f %s" t u
+
+-- XXX Code duplicated from collectLatency
+drainLatency :: SVarStats -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
+drainLatency _ss yinfo = do
+ let cur = workerPendingLatency yinfo
+ col = workerCollectedLatency yinfo
+ longTerm = svarAllTimeLatency yinfo
+ measured = workerMeasuredLatency yinfo
+
+ (count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0), v)
+ (colCount, colTime) <- readIORef col
+ (lcount, ltime) <- readIORef longTerm
+ prev <- readIORef measured
+
+ let pendingCount = colCount + count
+ pendingTime = colTime + time
+
+ lcount' = lcount + pendingCount
+ notUpdated = (lcount', ltime, prev)
+
+ if (pendingCount > 0)
+ then do
+ let new = pendingTime `div` (fromIntegral pendingCount)
+#ifdef DIAGNOSTICS
+ minLat <- readIORef (minWorkerLatency _ss)
+ when (new < minLat || minLat == 0) $
+ writeIORef (minWorkerLatency _ss) new
+
+ maxLat <- readIORef (maxWorkerLatency _ss)
+ when (new > maxLat) $ writeIORef (maxWorkerLatency _ss) new
+#endif
+ -- To avoid minor fluctuations update in batches
+ writeIORef col (0, 0)
+ writeIORef measured new
+#ifdef DIAGNOSTICS
+ modifyIORef (avgWorkerLatency _ss) $
+ \(cnt, t) -> (cnt + pendingCount, t + pendingTime)
+#endif
+ modifyIORef longTerm $ \(_, t) -> (lcount', t)
+ return (lcount', ltime, new)
+ else return notUpdated
+
+dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
+dumpSVarStats sv ss style = do
+ case yieldRateInfo sv of
+ Nothing -> return ()
+ Just yinfo -> do
+ _ <- liftIO $ drainLatency (svarStats sv) yinfo
+ return ()
+
+ dispatches <- readIORef $ totalDispatches ss
+ maxWrk <- readIORef $ maxWorkers ss
+ maxOq <- readIORef $ maxOutQSize ss
+ maxHp <- readIORef $ maxHeapSize ss
+ minLat <- readIORef $ minWorkerLatency ss
+ maxLat <- readIORef $ maxWorkerLatency ss
+ (avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss
+ (svarCnt, svarGainLossCnt, svarLat) <- case yieldRateInfo sv of
+ Nothing -> return (0, 0, 0)
+ Just yinfo -> do
+ (cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo
+ if cnt > 0
+ then do
+ t <- readIORef (svarStopTime ss)
+ gl <- readIORef (svarGainedLostYields yinfo)
+ case t of
+ Nothing -> do
+ now <- getTime Monotonic
+ let interval = toNanoSecs (now - startTime)
+ return $ (cnt, gl, interval `div` fromIntegral cnt)
+ Just stopTime -> do
+ let interval = toNanoSecs (stopTime - startTime)
+ return $ (cnt, gl, interval `div` fromIntegral cnt)
+ else return (0, 0, 0)
+
+ return $ unlines
+ [ "total dispatches = " ++ show dispatches
+ , "max workers = " ++ show maxWrk
+ , "max outQSize = " ++ show maxOq
+ ++ (if style == AheadVar
+ then "\nheap max size = " ++ show maxHp
+ else "")
+ ++ (if minLat > 0
+ then "\nmin worker latency = "
+ ++ secs (fromIntegral minLat * 1e-9)
+ else "")
+ ++ (if maxLat > 0
+ then "\nmax worker latency = "
+ ++ secs (fromIntegral maxLat * 1e-9)
+ else "")
+ ++ (if avgCnt > 0
+ then let lat = avgTime `div` fromIntegral avgCnt
+ in "\navg worker latency = "
+ ++ secs (fromIntegral lat * 1e-9)
+ else "")
+ ++ (if svarLat > 0
+ then "\nSVar latency = "
+ ++ secs (fromIntegral svarLat * 1e-9)
+ else "")
+ ++ (if svarCnt > 0
+ then "\nSVar yield count = " ++ show svarCnt
+ else "")
+ ++ (if svarGainLossCnt > 0
+ then "\nSVar gain/loss yield count = " ++ show svarGainLossCnt
+ else "")
+ ]
+
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar sv = do
- tid <- myThreadId
(oqList, oqLen) <- readIORef $ outputQueue sv
db <- tryTakeMVar $ outputDoorBell sv
aheadDump <-
@@ -230,38 +656,40 @@ dumpSVar sv = do
then do
(oheap, oheapSeq) <- readIORef $ outputHeap sv
(wq, wqSeq) <- readIORef $ aheadWorkQueue sv
- maxHp <- readIORef $ maxHeapSize sv
return $ unlines
[ "heap length = " ++ show (H.size oheap)
, "heap seqeunce = " ++ show oheapSeq
, "work queue length = " ++ show (length wq)
, "work queue sequence = " ++ show wqSeq
- , "heap max size = " ++ show maxHp
]
else return []
- waiting <- readIORef $ needDoorBell sv
+ let style = svarStyle sv
+ waiting <-
+ if style /= ParallelVar
+ then readIORef $ needDoorBell sv
+ else return False
rthread <- readIORef $ workerThreads sv
workers <- readIORef $ workerCount sv
- maxWrk <- readIORef $ maxWorkers sv
- dispatches <- readIORef $ totalDispatches sv
- maxOq <- readIORef $ maxOutQSize sv
+ stats <- dumpSVarStats sv (svarStats sv) (svarStyle sv)
return $ unlines
- [ "tid = " ++ show tid
+ [ "Creator tid = " ++ show (svarCreator sv)
, "style = " ++ show (svarStyle sv)
+ , "---------CURRENT STATE-----------"
, "outputQueue length computed = " ++ show (length oqList)
, "outputQueue length maintained = " ++ show oqLen
- , "output outputDoorBell = " ++ show db
- , "total dispatches = " ++ show dispatches
- , "max workers = " ++ show maxWrk
- , "max outQSize = " ++ show maxOq
+ -- XXX print the types of events in the outputQueue, first 5
+ , "outputDoorBell = " ++ show db
]
++ aheadDump ++ unlines
[ "needDoorBell = " ++ show waiting
, "running threads = " ++ show rthread
+ -- XXX print the status of first 5 threads
, "running thread count = " ++ show workers
]
+ ++ "---------STATS-----------\n"
+ ++ stats
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
@@ -287,6 +715,10 @@ withDBGMVar :: SVar t m a -> String -> IO () -> IO ()
withDBGMVar _ _ action = action
#endif
+-------------------------------------------------------------------------------
+-- CAS
+-------------------------------------------------------------------------------
+
-- Slightly faster version of CAS. Gained some improvement by avoiding the use
-- of "evaluate" because we know we do not have exceptions in fn.
{-# INLINE atomicModifyIORefCAS #-}
@@ -339,15 +771,58 @@ doFork action exHandler =
exHandler
runInIO (return tid)
+-- XXX Can we make access to remainingYields and yieldRateInfo fields in sv
+-- faster, along with the fields in sv required by send?
+-- XXX make it noinline
+--
+-- XXX we may want to employ an increment and decrement in batches when the
+-- througput is high or when the cost of synchronization is high. For example
+-- if the application is distributed then inc/dec of a shared variable may be
+-- very costly.
+--
+-- Note that we need it to be an Int type so that we have the ability to undo a
+-- decrement that takes below zero.
+{-# INLINE decrementYieldLimit #-}
+decrementYieldLimit :: SVar t m a -> IO Bool
+decrementYieldLimit sv =
+ case remainingYields sv of
+ Nothing -> return True
+ Just ref -> do
+ r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
+ return $ r >= 1
+
+-- decrementYieldLimit returns False when the old limit is 0. This one returns
+-- False when the old limit is 1.
+{-# INLINE decrementYieldLimitPost #-}
+decrementYieldLimitPost :: SVar t m a -> IO Bool
+decrementYieldLimitPost sv =
+ case remainingYields sv of
+ Nothing -> return True
+ Just ref -> do
+ r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
+ return $ r > 1
+
+{-# INLINE incrementYieldLimit #-}
+incrementYieldLimit :: SVar t m a -> IO ()
+incrementYieldLimit sv =
+ case remainingYields sv of
+ Nothing -> return ()
+ Just ref -> atomicModifyIORefCAS_ ref (+ 1)
+
-- XXX exception safety of all atomic/MVar operations
-- TBD Each worker can have their own queue and the consumer can empty one
-- queue at a time, that way contention can be reduced.
+-- XXX Only yields should be counted in the buffer limit and not the Stop
+-- events.
+
-- | This function is used by the producer threads to queue output for the
-- consumer thread to consume. Returns whether the queue has more space.
-send :: Int -> SVar t m a -> ChildEvent a -> IO Bool
-send maxOutputQLen sv msg = do
+send :: SVar t m a -> ChildEvent a -> IO Bool
+send sv msg = do
+ -- XXX can the access to outputQueue and maxBufferLimit be made faster
+ -- somehow?
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
when (len <= 0) $ do
@@ -361,22 +836,98 @@ send maxOutputQLen sv msg = do
-- The important point is that the consumer is guaranteed to receive a
-- doorbell if something was added to the queue after it empties it.
void $ tryPutMVar (outputDoorBell sv) ()
- return (len < maxOutputQLen || maxOutputQLen < 0)
-{-# NOINLINE sendYield #-}
-sendYield :: Int -> SVar t m a -> ChildEvent a -> IO Bool
-sendYield maxOutputQLen sv msg = do
- ylimit <- case maxYieldLimit sv of
- Nothing -> return True
- Just ref -> atomicModifyIORefCAS ref $ \x -> (x - 1, x > 1)
- r <- send maxOutputQLen sv msg
- return $ r && ylimit
+ -- XXX we should reserve the buffer when we pick up the work from the
+ -- queue, instead of checking it here when it is too late.
+ let limit = maxBufferLimit sv
+ case limit of
+ Unlimited -> return True
+ Limited lim -> do
+ active <- readIORef (workerCount sv)
+ return $ len < ((fromIntegral lim) - active)
+
+-- XXX We assume that a worker always yields a value. If we can have
+-- workers that return without yielding anything our computations to
+-- determine the number of workers may be off.
+workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
+workerUpdateLatency yinfo winfo = do
+ cnt1 <- readIORef (workerYieldCount winfo)
+ (cnt0, t0) <- readIORef (workerLatencyStart winfo)
+ t1 <- getTime Monotonic
+ writeIORef (workerLatencyStart winfo) (cnt1, t1)
+ let period = fromInteger $ toNanoSecs (t1 - t0)
+ let ref = workerPendingLatency yinfo
+ atomicModifyIORefCAS ref $ \(ycnt, ytime) ->
+ ((ycnt + cnt1 - cnt0, ytime + period), ())
+
+updateYieldCount :: WorkerInfo -> IO Count
+updateYieldCount winfo = do
+ cnt <- readIORef (workerYieldCount winfo)
+ let cnt1 = cnt + 1
+ writeIORef (workerYieldCount winfo) cnt1
+ return cnt1
+
+isBeyondMaxYield :: Count -> WorkerInfo -> Bool
+isBeyondMaxYield cnt winfo =
+ let ymax = workerYieldMax winfo
+ in ymax /= 0 && cnt >= ymax
+
+-- XXX we should do rate control periodically based on the total yields rather
+-- than based on the worker local yields as other workers may have yielded more
+-- and we should stop based on the aggregate yields. However, latency update
+-- period can be based on individual worker yields.
+{-# NOINLINE checkRatePeriodic #-}
+checkRatePeriodic :: SVar t m a
+ -> YieldRateInfo
+ -> WorkerInfo
+ -> Count
+ -> IO Bool
+checkRatePeriodic sv yinfo winfo ycnt = do
+ i <- readIORef (workerPollingInterval yinfo)
+ -- XXX use generation count to check if the interval has been updated
+ if (i /= 0 && (ycnt `mod` i) == 0)
+ then do
+ workerUpdateLatency yinfo winfo
+ -- XXX not required for parallel streams
+ isBeyondMaxRate sv yinfo
+ else return False
-{-# NOINLINE sendStop #-}
-sendStop :: SVar t m a -> IO ()
-sendStop sv = do
- liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
- myThreadId >>= \tid -> void $ send (-1) sv (ChildStop tid Nothing)
+-- CAUTION! this also updates the yield count and therefore should be called
+-- only when we are actually yielding an element.
+{-# NOINLINE workerRateControl #-}
+workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
+workerRateControl sv yinfo winfo = do
+ cnt <- updateYieldCount winfo
+ beyondMaxRate <- checkRatePeriodic sv yinfo winfo cnt
+ return $ not (isBeyondMaxYield cnt winfo || beyondMaxRate)
+
+-- XXX we should do rate control here but not latency update in case of ahead
+-- streams. latency update must be done when we yield directly to outputQueue
+-- or when we yield to heap.
+{-# INLINE sendYield #-}
+sendYield :: SVar t m a -> WorkerInfo -> ChildEvent a -> IO Bool
+sendYield sv winfo msg = do
+ r <- send sv msg
+ rateLimitOk <-
+ case yieldRateInfo sv of
+ Nothing -> return True
+ Just yinfo -> workerRateControl sv yinfo winfo
+ return $ r && rateLimitOk
+
+{-# INLINE workerStopUpdate #-}
+workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
+workerStopUpdate winfo info = do
+ i <- readIORef (workerPollingInterval info)
+ when (i /= 0) $ workerUpdateLatency info winfo
+
+{-# INLINABLE sendStop #-}
+sendStop :: SVar t m a -> WorkerInfo -> IO ()
+sendStop sv winfo = do
+ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
+ case yieldRateInfo sv of
+ Nothing -> return ()
+ Just info -> workerStopUpdate winfo info
+ myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
-------------------------------------------------------------------------------
-- Async
@@ -401,25 +952,6 @@ enqueueLIFO sv q m = do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
-{-# INLINE workLoopLIFO #-}
-workLoopLIFO :: MonadIO m
- => (State t m a -> IORef [t m a] -> t m a -> m () -> m ())
- -> State t m a -> IORef [t m a] -> m ()
-workLoopLIFO f st q = run
-
- where
-
- sv = fromJust $ streamVar st
- run = do
- work <- dequeue
- case work of
- Nothing -> liftIO $ sendStop sv
- Just m -> f st q m run
-
- dequeue = liftIO $ atomicModifyIORefCAS q $ \case
- [] -> ([], Nothing)
- x : xs -> (xs, Just x)
-
-------------------------------------------------------------------------------
-- WAsync
-------------------------------------------------------------------------------
@@ -443,21 +975,6 @@ enqueueFIFO sv q m = do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
-{-# INLINE workLoopFIFO #-}
-workLoopFIFO :: MonadIO m
- => (State t m a -> LinkedQueue (t m a) -> t m a -> m () -> m ())
- -> State t m a -> LinkedQueue (t m a) -> m ()
-workLoopFIFO f st q = run
-
- where
-
- sv = fromJust $ streamVar st
- run = do
- work <- liftIO $ tryPopR q
- case work of
- Nothing -> liftIO $ sendStop sv
- Just m -> f st q m run
-
-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------
@@ -510,10 +1027,18 @@ workLoopFIFO f st q = run
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.
--- Left associated ahead expressions are expensive. We start a new SVar for
+-- XXX Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
--- Thererefore the queue never has more than on item in it.
+-- Thererefore the queue never has more than one item in it.
+--
+-- XXX we can fix this. When we queue more than one item on the queue we can
+-- mark the previously queued item as not-runnable. The not-runnable item is
+-- not dequeued until the already running one has finished and at that time we
+-- would also know the exact sequence number of the already queued item.
+--
+-- we can even run the already queued items but they will have to be sorted in
+-- layers in the heap. We can use a list of heaps for that.
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead sv q m = do
@@ -531,6 +1056,19 @@ enqueueAhead sv q m = do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
+-- enqueue without incrementing the sequence number
+{-# INLINE reEnqueueAhead #-}
+reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
+reEnqueueAhead sv q m = do
+ atomicModifyIORefCAS_ q $ \ case
+ ([], n) -> ([m], n) -- DO NOT increment sequence
+ _ -> error "not empty"
+ storeLoadBarrier
+ w <- readIORef $ needDoorBell sv
+ when w $ do
+ atomicModifyIORefCAS_ (needDoorBell sv) (const False)
+ void $ tryPutMVar (outputDoorBell sv) ()
+
-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
-- point of time. But if the task that has the token finds that the outputQueue
@@ -571,7 +1109,7 @@ dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Int)
-> IO (Maybe (Entry Int (AheadHeapEntry t m a)))
dequeueFromHeap hpRef = do
- atomicModifyIORefCAS hpRef $ \hp@(h, snum) -> do
+ atomicModifyIORef hpRef $ \hp@(h, snum) -> do
let r = H.uncons h
case r of
Nothing -> (hp, Nothing)
@@ -637,25 +1175,36 @@ allThreadsDone sv = liftIO $ S.null <$> readIORef (workerThreads sv)
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException sv e = do
tid <- myThreadId
- void $ send (-1) sv (ChildStop tid (Just e))
+ void $ send sv (ChildStop tid (Just e))
#ifdef DIAGNOSTICS
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers sv = liftIO $ do
active <- readIORef (workerCount sv)
- maxWrk <- readIORef (maxWorkers sv)
- when (active > maxWrk) $ writeIORef (maxWorkers sv) active
- modifyIORef (totalDispatches sv) (+1)
+ maxWrk <- readIORef (maxWorkers $ svarStats sv)
+ when (active > maxWrk) $ writeIORef (maxWorkers $ svarStats sv) active
+ modifyIORef (totalDispatches $ svarStats sv) (+1)
#endif
{-# NOINLINE pushWorker #-}
-pushWorker :: MonadAsync m => SVar t m a -> m ()
-pushWorker sv = do
+pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
+pushWorker yieldMax sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
#ifdef DIAGNOSTICS
recordMaxWorkers sv
#endif
- doFork (workLoop sv) (handleChildException sv) >>= addThread sv
+ -- XXX we can make this allocation conditional, it might matter when
+ -- significant number of workers are being sent.
+ winfo <- do
+ cntRef <- liftIO $ newIORef 0
+ t <- liftIO $ getTime Monotonic
+ lat <- liftIO $ newIORef (0, t)
+ return $ WorkerInfo
+ { workerYieldMax = yieldMax
+ , workerYieldCount = cntRef
+ , workerLatencyStart = lat
+ }
+ doFork (workLoop sv winfo) (handleChildException sv) >>= addThread sv
-- XXX we can push the workerCount modification in accountThread and use the
-- same pushWorker for Parallel case as well.
@@ -666,7 +1215,7 @@ pushWorker sv = do
-- workerThreads. Alternatively, we can use a CreateThread event to avoid
-- using a CAS based modification.
{-# NOINLINE pushWorkerPar #-}
-pushWorkerPar :: MonadAsync m => SVar t m a -> m () -> m ()
+pushWorkerPar :: MonadAsync m => SVar t m a -> (WorkerInfo -> m ()) -> m ()
pushWorkerPar sv wloop = do
-- We do not use workerCount in case of ParallelVar but still there is no
-- harm in maintaining it correctly.
@@ -674,51 +1223,437 @@ pushWorkerPar sv wloop = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
#endif
- doFork wloop (handleChildException sv) >>= modifyThread sv
-
-dispatchWorker :: MonadAsync m => Int -> SVar t m a -> m ()
-dispatchWorker maxWorkerLimit sv = do
+ winfo <- do
+ cntRef <- liftIO $ newIORef 0
+ t <- liftIO $ getTime Monotonic
+ lat <- liftIO $ newIORef (0, t)
+ return $ WorkerInfo
+ { workerYieldMax = 0
+ , workerYieldCount = cntRef
+ , workerLatencyStart = lat
+ }
+
+ doFork (wloop winfo) (handleChildException sv) >>= modifyThread sv
+
+-- Returns:
+-- True: can dispatch more
+-- False: cannot dispatch any more
+dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
+dispatchWorker yieldCount sv = do
+ let workerLimit = maxWorkerLimit sv
+ -- XXX in case of Ahead streams we should not send more than one worker
+ -- when the work queue is done but heap is not done.
done <- liftIO $ isWorkDone sv
- when (not done) $ do
+ if (not done)
+ then do
-- Note that the worker count is only decremented during event
-- processing in fromStreamVar and therefore it is safe to read and
-- use it without a lock.
- cnt <- liftIO $ readIORef $ workerCount sv
+ active <- liftIO $ readIORef $ workerCount sv
-- Note that we may deadlock if the previous workers (tasks in the
-- stream) wait/depend on the future workers (tasks in the stream)
-- executing. In that case we should either configure the maxWorker
-- count to higher or use parallel style instead of ahead or async
-- style.
- limit <- case maxYieldLimit sv of
- Nothing -> return maxWorkerLimit
- Just x -> do
- lim <- liftIO $ readIORef x
+ limit <- case remainingYields sv of
+ Nothing -> return workerLimit
+ Just ref -> do
+ n <- liftIO $ readIORef ref
return $
- if maxWorkerLimit > 0
- then min maxWorkerLimit lim
- else lim
- when (cnt < limit || limit < 0) $ pushWorker sv
+ case workerLimit of
+ Unlimited -> Limited (fromIntegral n)
+ Limited lim -> Limited $ min lim (fromIntegral n)
+
+ -- XXX for ahead streams shall we take the heap yields into account for
+ -- controlling the dispatch? We should not dispatch if the heap has
+ -- already got the limit covered.
+ let dispatch = pushWorker yieldCount sv >> return True
+ in case limit of
+ Unlimited -> dispatch
+ -- Note that the use of remainingYields and workerCount is not
+ -- atomic and the counts may even have changed between reading and
+ -- using them here, so this is just approximate logic and we cannot
+ -- rely on it for correctness. We may actually dispatch more
+ -- workers than required.
+ Limited lim | active < (fromIntegral lim) -> dispatch
+ _ -> return False
+ else return False
-{-# NOINLINE sendWorkerWait #-}
-sendWorkerWait :: MonadAsync m => Int -> SVar t m a -> m ()
-sendWorkerWait maxWorkerLimit sv = do
- -- Note that we are guaranteed to have at least one outstanding worker when
- -- we enter this function. So if we sleep we are guaranteed to be woken up
- -- by a outputDoorBell, when the worker exits.
+-- | This is a magic number and it is overloaded, and used at several places to
+-- achieve batching:
+--
+-- 1. If we have to sleep to slowdown this is the minimum period that we
+-- accumulate before we sleep. Also, workers do not stop until this much
+-- sleep time is accumulated.
+-- 3. Collected latencies are computed and transferred to measured latency
+-- after a minimum of this period.
+minThreadDelay :: NanoSecs
+minThreadDelay = 10^(6 :: Int)
+
+-- | Another magic number! When we have to start more workers to cover up a
+-- number of yields that we are lagging by then we cannot start one worker for
+-- each yield because that may be a very big number and if the latency of the
+-- workers is low these number of yields could be very high. We assume that we
+-- run each extra worker for at least this much time.
+rateRecoveryTime :: NanoSecs
+rateRecoveryTime = 1000000
+
+nanoToMicroSecs :: NanoSecs -> Int
+nanoToMicroSecs s = (fromIntegral s) `div` 1000
+
+-- We either block, or send one worker with limited yield count or one or more
+-- workers with unlimited yield count.
+data Work
+ = BlockWait NanoSecs
+ | PartialWorker Count
+ | ManyWorkers Int Count
+ deriving Show
+
+-- XXX we can use phantom types to distinguish the duration/latency/expectedLat
+estimateWorkers
+ :: Limit
+ -> Count
+ -> Count
+ -> NanoSecs
+ -> NanoSecs
+ -> NanoSecs
+ -> LatencyRange
+ -> Work
+estimateWorkers workerLimit svarYields gainLossYields
+ svarElapsed wLatency targetLat range =
+ -- XXX we can have a maxEfficiency combinator as well which runs the
+ -- producer at the maximal efficiency i.e. the number of workers are chosen
+ -- such that the latency is minimum or within a range. Or we can call it
+ -- maxWorkerLatency.
+ --
+ let
+ -- How many workers do we need to acheive the required rate?
+ --
+ -- When the workers are IO bound we can increase the throughput by
+ -- increasing the number of workers as long as the IO device has enough
+ -- capacity to process all the requests concurrently. If the IO
+ -- bandwidth is saturated increasing the workers won't help. Also, if
+ -- the CPU utilization in processing all these requests exceeds the CPU
+ -- bandwidth, then increasing the number of workers won't help.
+ --
+ -- When the workers are purely CPU bound, increasing the workers beyond
+ -- the number of CPUs won't help.
+ --
+ -- TODO - measure the CPU and IO requirements of the workers. Have a
+ -- way to specify the max bandwidth of the underlying IO mechanism and
+ -- use that to determine the max rate of workers, and also take the CPU
+ -- bandwidth into account. We can also discover the IO bandwidth if we
+ -- know that we are not CPU bound, then how much steady state rate are
+ -- we able to acheive. Design tests for CPU bound and IO bound cases.
+
+ -- Calculate how many yields are we ahead or behind to match the exact
+ -- required rate. Based on that we increase or decrease the effective
+ -- workers.
+ --
+ -- When the worker latency is lower than required latency we begin with
+ -- a yield and then wait rather than first waiting and then yielding.
+ targetYields = (svarElapsed + wLatency + targetLat - 1) `div` targetLat
+ effectiveYields = svarYields + gainLossYields
+ deltaYields = fromIntegral targetYields - effectiveYields
+
+ -- We recover the deficit by running at a higher/lower rate for a
+ -- certain amount of time. To keep the effective rate in reasonable
+ -- limits we use rateRecoveryTime, minLatency and maxLatency.
+ in if deltaYields > 0
+ then
+ let deltaYieldsFreq :: Double
+ deltaYieldsFreq =
+ fromIntegral deltaYields /
+ fromIntegral rateRecoveryTime
+ yieldsFreq = 1.0 / fromIntegral targetLat
+ totalYieldsFreq = yieldsFreq + deltaYieldsFreq
+ requiredLat = NanoSecs $ round $ 1.0 / totalYieldsFreq
+ adjustedLat = min (max requiredLat (minLatency range))
+ (maxLatency range)
+ in assert (adjustedLat > 0) $
+ if wLatency <= adjustedLat
+ then PartialWorker deltaYields
+ else ManyWorkers ( fromIntegral
+ $ withLimit
+ $ wLatency `div` adjustedLat) deltaYields
+ else
+ let expectedDuration = fromIntegral effectiveYields * targetLat
+ sleepTime = expectedDuration - svarElapsed
+ maxSleepTime = maxLatency range - wLatency
+ s = min sleepTime maxSleepTime
+ in assert (sleepTime >= 0) $
+ -- if s is less than 0 it means our maxSleepTime is less
+ -- than the worker latency.
+ if (s > 0) then BlockWait s else ManyWorkers 1 (Count 0)
+ where
+ withLimit n =
+ case workerLimit of
+ Unlimited -> n
+ Limited x -> min n (fromIntegral x)
+
+-- | Get the worker latency without resetting workerPendingLatency
+-- Returns (total yield count, base time, measured latency)
+-- CAUTION! keep it in sync with collectLatency
+getWorkerLatency :: YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
+getWorkerLatency yinfo = do
+ let cur = workerPendingLatency yinfo
+ col = workerCollectedLatency yinfo
+ longTerm = svarAllTimeLatency yinfo
+ measured = workerMeasuredLatency yinfo
+
+ (count, time) <- readIORef cur
+ (colCount, colTime) <- readIORef col
+ (lcount, ltime) <- readIORef longTerm
+ prev <- readIORef measured
+
+ let pendingCount = colCount + count
+ pendingTime = colTime + time
+ new =
+ if pendingCount > 0
+ then let lat = pendingTime `div` (fromIntegral pendingCount)
+ -- XXX Give more weight to new?
+ in (lat + prev) `div` 2
+ else prev
+ return (lcount + pendingCount, ltime, new)
+
+isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
+isBeyondMaxRate sv yinfo = do
+ (count, tstamp, wLatency) <- getWorkerLatency yinfo
+ now <- getTime Monotonic
+ let duration = fromInteger $ toNanoSecs $ now - tstamp
+ let targetLat = svarLatencyTarget yinfo
+ gainLoss <- readIORef (svarGainedLostYields yinfo)
+ let work = estimateWorkers (maxWorkerLimit sv) count gainLoss duration
+ wLatency targetLat (svarLatencyRange yinfo)
+ cnt <- readIORef $ workerCount sv
+ return $ case work of
+ -- XXX set the worker's maxYields or polling interval based on yields
+ PartialWorker _yields -> cnt > 1
+ ManyWorkers n _ -> cnt > n
+ BlockWait _ -> True
+
+-- Every once in a while workers update the latencies and check the yield rate.
+-- They return if we are above the expected yield rate. If we check too often
+-- it may impact performance, if we check less often we may have a stale
+-- picture. We update every minThreadDelay but we translate that into a yield
+-- count based on latency so that the checking overhead is little.
+--
+-- XXX use a generation count to indicate that the value is updated. If the
+-- value is updated an existing worker must check it again on the next yield.
+-- Otherwise it is possible that we may keep updating it and because of the mod
+-- worker keeps skipping it.
+updateWorkerPollingInterval :: YieldRateInfo -> NanoSecs -> IO ()
+updateWorkerPollingInterval yinfo latency = do
+ let periodRef = workerPollingInterval yinfo
+ cnt = max 1 $ minThreadDelay `div` latency
+ period = min cnt (fromIntegral magicMaxBuffer)
+
+ writeIORef periodRef (fromIntegral period)
+
+-- Returns a triple, (1) yield count since last collection, (2) the base time
+-- when we started counting, (3) average latency in the last measurement
+-- period. The former two are used for accurate measurement of the going rate
+-- whereas the average is used for future estimates e.g. how many workers
+-- should be maintained to maintain the rate.
+-- CAUTION! keep it in sync with getWorkerLatency
+collectLatency :: SVarStats -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
+collectLatency _ss yinfo = do
+ let cur = workerPendingLatency yinfo
+ col = workerCollectedLatency yinfo
+ longTerm = svarAllTimeLatency yinfo
+ measured = workerMeasuredLatency yinfo
+
+ (count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0), v)
+ (colCount, colTime) <- readIORef col
+ (lcount, ltime) <- readIORef longTerm
+ prev <- readIORef measured
+
+ let pendingCount = colCount + count
+ pendingTime = colTime + time
+
+ lcount' = lcount + pendingCount
+ tripleWith lat = (lcount', ltime, lat)
+
+ if (pendingCount > 0)
+ then do
+ let new = pendingTime `div` (fromIntegral pendingCount)
+#ifdef DIAGNOSTICS
+ minLat <- readIORef (minWorkerLatency _ss)
+ when (new < minLat || minLat == 0) $
+ writeIORef (minWorkerLatency _ss) new
+
+ maxLat <- readIORef (maxWorkerLatency _ss)
+ when (new > maxLat) $ writeIORef (maxWorkerLatency _ss) new
+#endif
+ -- When we have collected a significant sized batch we compute the new
+ -- latency using that batch and return the new latency, otherwise we
+ -- return the previous latency derived from the previous batch.
+ if (pendingCount > fromIntegral magicMaxBuffer)
+ || (pendingTime > minThreadDelay)
+ || (let r = (fromIntegral new) / (fromIntegral prev) :: Double
+ in prev > 0 && (r > 2 || r < 0.5))
+ || (prev == 0)
+ then do
+ updateWorkerPollingInterval yinfo (max new prev)
+ writeIORef col (0, 0)
+ writeIORef measured ((prev + new) `div` 2)
+#ifdef DIAGNOSTICS
+ modifyIORef (avgWorkerLatency _ss) $
+ \(cnt, t) -> (cnt + pendingCount, t + pendingTime)
+#endif
+ modifyIORef longTerm $ \(_, t) -> (lcount', t)
+ return $ tripleWith new
+ else do
+ writeIORef col (pendingCount, pendingTime)
+ return $ tripleWith prev
+ else return $ tripleWith prev
+
+-- XXX in case of ahead style stream we need to take the heap size into account
+-- because we return the workers on the basis of that which causes a condition
+-- where we keep dispatching and they keep returning. So we must have exactly
+-- the same logic for not dispatching and for returning.
+--
+-- Returns:
+-- True: can dispatch more
+-- False: full, no more dispatches
+dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
+dispatchWorkerPaced sv = do
+ let yinfo = fromJust $ yieldRateInfo sv
+ (svarYields, svarElapsed, wLatency) <- do
+ now <- liftIO $ getTime Monotonic
+ (yieldCount, baseTime, lat) <-
+ liftIO $ collectLatency (svarStats sv) yinfo
+ let elapsed = fromInteger $ toNanoSecs $ now - baseTime
+ let latency =
+ if lat == 0
+ then
+ case workerBootstrapLatency yinfo of
+ Nothing -> lat
+ Just t -> t
+ else lat
+
+ return (yieldCount, elapsed, latency)
+
+ if wLatency == 0
+ -- Need to measure the latency with a single worker before we can perform
+ -- any computation.
+ then return False
+ else do
+ let workerLimit = maxWorkerLimit sv
+ let targetLat = svarLatencyTarget yinfo
+ let range = svarLatencyRange yinfo
+ gainLoss <- liftIO $ readIORef (svarGainedLostYields yinfo)
+ let work = estimateWorkers workerLimit svarYields gainLoss svarElapsed
+ wLatency targetLat range
+
+ -- XXX we need to take yieldLimit into account here. If we are at the
+ -- end of the limit as well as the time, we should not be sleeping.
+ -- If we are not actually planning to dispatch any more workers we need
+ -- to take that in account.
+ case work of
+ BlockWait s -> do
+ assert (s >= 0) (return ())
+ -- XXX note that when we return from here we will block waiting
+ -- for the result from the existing worker. If that takes too
+ -- long we won't be able to send another worker until the
+ -- result arrives.
+ --
+ -- Sleep only if there are no active workers, otherwise we will
+ -- defer the output of those. Note we cannot use workerCount
+ -- here as it is not a reliable way to ensure there are
+ -- definitely no active workers. When workerCount is 0 we may
+ -- still have a Stop event waiting in the outputQueue.
+ done <- allThreadsDone sv
+ when done $ void $ do
+ liftIO $ threadDelay $ nanoToMicroSecs s
+ dispatchWorker 1 sv
+ return False
+ PartialWorker yields -> do
+ assert (yields > 0) (return ())
+ updateGainedLostYields yinfo yields
+
+ done <- allThreadsDone sv
+ when done $ void $ dispatchWorker yields sv
+ return False
+ ManyWorkers netWorkers yields -> do
+ assert (netWorkers >= 1) (return ())
+ assert (yields >= 0) (return ())
+ updateGainedLostYields yinfo yields
+
+ let periodRef = workerPollingInterval yinfo
+ ycnt = max 1 $ yields `div` fromIntegral netWorkers
+ period = min ycnt (fromIntegral magicMaxBuffer)
+
+ old <- liftIO $ readIORef periodRef
+ when (period < old) $
+ liftIO $ writeIORef periodRef period
+
+ cnt <- liftIO $ readIORef $ workerCount sv
+ if (cnt < netWorkers)
+ then do
+ let total = netWorkers - cnt
+ batch = max 1 $ fromIntegral $
+ minThreadDelay `div` targetLat
+ r <- dispatchN (min total batch)
+ -- XXX stagger the workers over a period?
+ -- XXX cannot sleep, as that would mean we cannot process the
+ -- outputs. need to try a different mechanism to stagger.
+ -- when (total > batch) $
+ -- liftIO $ threadDelay $ nanoToMicroSecs minThreadDelay
+ return r
+ else return False
+
+ where
+ updateGainedLostYields yinfo yields = do
+ let buf = fromIntegral $ svarRateBuffer yinfo
+ when (yields /= 0 && abs yields > buf) $ do
+ let delta =
+ if yields > 0
+ then yields - buf
+ else yields + buf
+ liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta)
+
+ dispatchN n = do
+ if n == 0
+ then return True
+ else do
+ r <- dispatchWorker 0 sv
+ if r
+ then dispatchN (n - 1)
+ else return False
+
+sendWorkerDelayPaced :: SVar t m a -> IO ()
+sendWorkerDelayPaced _ = return ()
+
+sendWorkerDelay :: SVar t m a -> IO ()
+sendWorkerDelay sv = do
-- XXX we need a better way to handle this than hardcoded delays. The
-- delays may be different for different systems.
- ncpu <- liftIO $ getNumCapabilities
+ ncpu <- getNumCapabilities
if ncpu <= 1
then
if (svarStyle sv == AheadVar)
- then liftIO $ threadDelay 100
- else liftIO $ threadDelay 25
+ then threadDelay 100
+ else threadDelay 25
else
if (svarStyle sv == AheadVar)
- then liftIO $ threadDelay 100
- else liftIO $ threadDelay 10
+ then threadDelay 100
+ else threadDelay 10
+{-# NOINLINE sendWorkerWait #-}
+sendWorkerWait
+ :: MonadAsync m
+ => (SVar t m a -> IO ())
+ -> (SVar t m a -> m Bool)
+ -> SVar t m a
+ -> m ()
+sendWorkerWait delay dispatch sv = do
+ -- Note that we are guaranteed to have at least one outstanding worker when
+ -- we enter this function. So if we sleep we are guaranteed to be woken up
+ -- by an outputDoorBell, when the worker exits.
+
+ liftIO $ delay sv
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
-- The queue may be empty temporarily if the worker has dequeued the
@@ -746,6 +1681,21 @@ sendWorkerWait maxWorkerLimit sv = do
-- we are no longer using pull based concurrency rate adaptation.
--
-- XXX update this in the tutorial.
+ --
+ -- Having pending active workers does not mean that we are guaranteed
+ -- to be woken up if we sleep. In case of Ahead streams, there may be
+ -- queued items in the heap even though the outputQueue is empty, and
+ -- we may have active workers which are deadlocked on those items to be
+ -- processed by the consumer. We should either guarantee that any
+ -- worker, before returning, clears the heap or we send a worker to clear
+ -- it. Normally we always send a worker if no output is seen, but if
+ -- the thread limit is reached or we are using pacing then we may not
+ -- send a worker. See the concurrentApplication test in the tests, that
+ -- test case requires at least one yield from the producer to not
+ -- deadlock, if the last workers output is stuck in the heap then this
+ -- test fails. This problem can be extended to n threads when the
+ -- consumer may depend on the evaluation of next n items in the
+ -- producer stream.
-- register for the outputDoorBell before we check the queue so that if we
-- sleep because the queue was empty we are guaranteed to get a
@@ -753,7 +1703,7 @@ sendWorkerWait maxWorkerLimit sv = do
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
liftIO $ storeLoadBarrier
- dispatchWorker maxWorkerLimit sv
+ canDoMore <- dispatch sv
-- XXX test for the case when we miss sending a worker when the worker
-- count is more than 1500.
@@ -762,27 +1712,26 @@ sendWorkerWait maxWorkerLimit sv = do
-- least one outstanding worker. Otherwise we could be sleeping
-- forever.
- done <- liftIO $ isWorkDone sv
- if done
- then do
+ if canDoMore
+ then sendWorkerWait delay dispatch sv
+ else do
liftIO $ withDBGMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
- when (len <= 0) $ sendWorkerWait maxWorkerLimit sv
- else sendWorkerWait maxWorkerLimit sv
+ when (len <= 0) $ sendWorkerWait delay dispatch sv
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw sv = do
(list, len) <- atomicModifyIORefCAS (outputQueue sv) $ \x -> (([],0), x)
#ifdef DIAGNOSTICS
- oqLen <- readIORef (maxOutQSize sv)
- when (len > oqLen) $ writeIORef (maxOutQSize sv) len
+ oqLen <- readIORef (maxOutQSize $ svarStats sv)
+ when (len > oqLen) $ writeIORef (maxOutQSize $ svarStats sv) len
#endif
return (list, len)
-readOutputQBounded :: MonadAsync m => Int -> SVar t m a -> m [ChildEvent a]
-readOutputQBounded n sv = do
+readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
+readOutputQBounded sv = do
(list, len) <- liftIO $ readOutputQRaw sv
-- When there is no output seen we dispatch more workers to help
-- out if there is work pending in the work queue.
@@ -802,32 +1751,116 @@ readOutputQBounded n sv = do
cnt <- liftIO $ readIORef $ workerCount sv
when (cnt <= 0) $ do
done <- liftIO $ isWorkDone sv
- when (not done) $ pushWorker sv
+ when (not done) $ pushWorker 0 sv
{-# INLINE blockingRead #-}
blockingRead = do
- sendWorkerWait n sv
+ sendWorkerWait sendWorkerDelay (dispatchWorker 0) sv
+ liftIO $ (readOutputQRaw sv >>= return . fst)
+
+readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
+readOutputQPaced sv = do
+ (list, len) <- liftIO $ readOutputQRaw sv
+ if len <= 0
+ then blockingRead
+ else do
+ -- XXX send a worker proactively, if needed, even before we start
+ -- processing the output.
+ void $ dispatchWorkerPaced sv
+ return list
+
+ where
+
+ {-# INLINE blockingRead #-}
+ blockingRead = do
+ sendWorkerWait sendWorkerDelayPaced dispatchWorkerPaced sv
liftIO $ (readOutputQRaw sv >>= return . fst)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded sv = do
workersDone <- allThreadsDone sv
- -- There may still be work pending even if there are no workers
- -- pending because all the workers may return if the
- -- outputQueue becomes full. In that case send off a worker to
- -- kickstart the work again.
+ -- There may still be work pending even if there are no workers pending
+ -- because all the workers may return if the outputQueue becomes full. In
+ -- that case send off a worker to kickstart the work again.
+ --
+ -- Note that isWorkDone can only be safely checked if all workers are done.
+ -- When some workers are in progress they may have decremented the yield
+ -- Limit and later ending up incrementing it again. If we look at the yield
+ -- limit in that window we may falsely say that it is 0 and therefore we
+ -- are done.
+ if workersDone
+ then do
+ r <- liftIO $ isWorkDone sv
+ -- Note that we need to guarantee a worker, therefore we cannot just
+ -- use dispatchWorker which may or may not send a worker.
+ when (not r) $ pushWorker 0 sv
+ -- XXX do we need to dispatch many here?
+ -- void $ dispatchWorker sv
+ return r
+ else return False
+
+postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
+postProcessPaced sv = do
+ workersDone <- allThreadsDone sv
+ -- XXX If during consumption we figure out we are getting delayed then we
+ -- should trigger dispatch there as well. We should try to check on the
+ -- workers after consuming every n item from the buffer?
if workersDone
then do
r <- liftIO $ isWorkDone sv
- when (not r) $ pushWorker sv
+ when (not r) $ do
+ void $ dispatchWorkerPaced sv
+ -- Note that we need to guarantee a worker since the work is not
+ -- finished, therefore we cannot just rely on dispatchWorkerPaced
+ -- which may or may not send a worker.
+ noWorker <- allThreadsDone sv
+ when noWorker $ pushWorker 0 sv
return r
else return False
+getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
+getYieldRateInfo st = do
+ -- convert rate in Hertz to latency in Nanoseconds
+ let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r
+ case getStreamRate st of
+ Just (Rate low goal high buf) ->
+ let l = rateToLatency goal
+ minl = rateToLatency high
+ maxl = rateToLatency low
+ in mkYieldRateInfo l (LatencyRange minl maxl) buf
+ Nothing -> return Nothing
+
+ where
+
+ mkYieldRateInfo latency latRange buf = do
+ measured <- newIORef 0
+ wcur <- newIORef (0,0)
+ wcol <- newIORef (0,0)
+ now <- getTime Monotonic
+ wlong <- newIORef (0,now)
+ period <- newIORef 1
+ gainLoss <- newIORef (Count 0)
+
+ return $ Just YieldRateInfo
+ { svarLatencyTarget = latency
+ , svarLatencyRange = latRange
+ , svarRateBuffer = buf
+ , svarGainedLostYields = gainLoss
+ , workerBootstrapLatency = getStreamLatency st
+ , workerPollingInterval = period
+ , workerMeasuredLatency = measured
+ , workerPendingLatency = wcur
+ , workerCollectedLatency = wcol
+ , svarAllTimeLatency = wlong
+ }
+
getAheadSVar :: MonadAsync m
=> State t m a
- -> ( State t m a
- -> IORef ([t m a], Int)
+ -> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Int)
+ -> State t m a
+ -> SVar t m a
+ -> WorkerInfo
-> m ())
-> IO (SVar t m a)
getAheadSVar st f = do
@@ -838,74 +1871,122 @@ getAheadSVar st f = do
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef ([], -1)
- yl <- case yieldLimit st of
+ stopMVar <- newMVar ()
+ yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
+ rateInfo <- getYieldRateInfo st
-#ifdef DIAGNOSTICS
- disp <- newIORef 0
+ disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
+ avgLat <- newIORef (0, NanoSecs 0)
+ maxLat <- newIORef (NanoSecs 0)
+ minLat <- newIORef (NanoSecs 0)
+ stpTime <- newIORef Nothing
+#ifdef DIAGNOSTICS
+ tid <- myThreadId
#endif
- let sv =
- SVar { outputQueue = outQ
- , maxYieldLimit = yl
- , outputDoorBell = outQMv
- , readOutputQ = readOutputQBounded (threadsHigh st) sv
- , postProcess = postProcessBounded sv
- , workerThreads = running
- -- , workLoop = workLoopAhead sv q outH
- , workLoop = f st{streamVar = Just sv} q outH
- , enqueue = enqueueAhead sv q
- , isWorkDone = isWorkDoneAhead q outH
- , needDoorBell = wfw
- , svarStyle = AheadVar
- , workerCount = active
- , accountThread = delThread sv
+
+ let getSVar sv readOutput postProc = SVar
+ { outputQueue = outQ
+ , remainingYields = yl
+ , maxBufferLimit = getMaxBuffer st
+ , maxWorkerLimit = getMaxThreads st
+ , yieldRateInfo = rateInfo
+ , outputDoorBell = outQMv
+ , readOutputQ = readOutput sv
+ , postProcess = postProc sv
+ , workerThreads = running
+ , workLoop = f q outH st{streamVar = Just sv} sv
+ , enqueue = enqueueAhead sv q
+ , isWorkDone = isWorkDoneAhead sv q outH
+ , needDoorBell = wfw
+ , svarStyle = AheadVar
+ , workerCount = active
+ , accountThread = delThread sv
+ , workerStopMVar = stopMVar
+ , svarRef = Nothing
#ifdef DIAGNOSTICS
- , aheadWorkQueue = q
- , outputHeap = outH
- , totalDispatches = disp
- , maxWorkers = maxWrk
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
+ , svarCreator = tid
+ , aheadWorkQueue = q
+ , outputHeap = outH
#endif
- }
+ , svarStats = SVarStats
+ { totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+ , avgWorkerLatency = avgLat
+ , minWorkerLatency = minLat
+ , maxWorkerLatency = maxLat
+ , svarStopTime = stpTime
+ }
+ }
+
+ let sv =
+ case getStreamRate st of
+ Nothing -> getSVar sv readOutputQBounded postProcessBounded
+ Just _ -> getSVar sv readOutputQPaced postProcessPaced
in return sv
where
{-# INLINE isWorkDoneAhead #-}
- isWorkDoneAhead q ref = do
+ isWorkDoneAhead sv q ref = do
heapDone <- do
(hp, _) <- readIORef ref
return (H.size hp <= 0)
queueDone <- checkEmpty q
- return $ queueDone && heapDone
+ yieldsDone <-
+ case remainingYields sv of
+ Just yref -> do
+ n <- readIORef yref
+ return (n <= 0)
+ Nothing -> return False
+ -- XXX note that yieldsDone can only be authoritative only when there
+ -- are no workers running. If there are active workers they can
+ -- later increment the yield count and therefore change the result.
+ return $ (yieldsDone && heapDone) || (queueDone && heapDone)
checkEmpty q = do
(xs, _) <- readIORef q
return $ null xs
-getParallelSVar :: MonadIO m => IO (SVar t m a)
-getParallelSVar = do
+getParallelSVar :: MonadIO m => State t m a -> IO (SVar t m a)
+getParallelSVar st = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
-#ifdef DIAGNOSTICS
+ yl <- case getYieldLimit st of
+ Nothing -> return Nothing
+ Just x -> Just <$> newIORef x
+ rateInfo <- getYieldRateInfo st
+
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
+ avgLat <- newIORef (0, NanoSecs 0)
+ maxLat <- newIORef (NanoSecs 0)
+ minLat <- newIORef (NanoSecs 0)
+ stpTime <- newIORef Nothing
+#ifdef DIAGNOSTICS
+ tid <- myThreadId
#endif
+
let sv =
SVar { outputQueue = outQ
- , maxYieldLimit = Nothing
+ , remainingYields = yl
+ , maxBufferLimit = Unlimited
+ , maxWorkerLimit = Unlimited
+ -- Used only for diagnostics
+ , yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutputQPar sv
, postProcess = allThreadsDone sv
@@ -917,15 +1998,24 @@ getParallelSVar = do
, svarStyle = ParallelVar
, workerCount = active
, accountThread = modifyThread sv
+ , workerStopMVar = undefined
+ , svarRef = Nothing
#ifdef DIAGNOSTICS
+ , svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
- , totalDispatches = disp
- , maxWorkers = maxWrk
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
#endif
+ , svarStats = SVarStats
+ { totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+ , avgWorkerLatency = avgLat
+ , minWorkerLatency = minLat
+ , maxWorkerLatency = maxLat
+ , svarStopTime = stpTime
+ }
}
in return sv
@@ -933,33 +2023,43 @@ getParallelSVar = do
readOutputQPar sv = liftIO $ do
withDBGMVar sv "readOutputQPar: doorbell" $ takeMVar (outputDoorBell sv)
+ case yieldRateInfo sv of
+ Nothing -> return ()
+ Just yinfo -> void $ collectLatency (svarStats sv) yinfo
readOutputQRaw sv >>= return . fst
-sendWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
-sendWorker sv m = do
+sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
+sendFirstWorker sv m = do
-- Note: We must have all the work on the queue before sending the
-- pushworker, otherwise the pushworker may exit before we even get a
-- chance to push.
liftIO $ enqueue sv m
- pushWorker sv
+ case yieldRateInfo sv of
+ Nothing -> pushWorker 0 sv
+ Just yinfo -> do
+ if svarLatencyTarget yinfo == maxBound
+ then liftIO $ threadDelay maxBound
+ else pushWorker 1 sv
return sv
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
- -> ( State t m a
- -> IORef ([t m a], Int)
+ -> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Int)
+ -> State t m a
+ -> SVar t m a
+ -> WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar st m wloop = do
sv <- liftIO $ getAheadSVar st wloop
- sendWorker sv m
+ sendFirstWorker sv m
{-# INLINABLE newParallelVar #-}
-newParallelVar :: MonadAsync m => m (SVar t m a)
-newParallelVar = liftIO $ getParallelSVar
+newParallelVar :: MonadAsync m => State t m a -> m (SVar t m a)
+newParallelVar st = liftIO $ getParallelSVar st
-- XXX this errors out for Parallel/Ahead SVars
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
@@ -971,4 +2071,9 @@ toStreamVar sv m = do
-- XXX This is safe only when called from the consumer thread or when no
-- consumer is present. There may be a race if we are not running in the
-- consumer thread.
- when done $ pushWorker sv
+ -- XXX do this only if the work queue is not empty. The work may have been
+ -- carried out by existing workers.
+ when done $
+ case yieldRateInfo sv of
+ Nothing -> pushWorker 0 sv
+ Just _ -> pushWorker 1 sv
diff --git a/src/Streamly/Streams/Ahead.hs b/src/Streamly/Streams/Ahead.hs
index db8ce50..df624e4 100644
--- a/src/Streamly/Streams/Ahead.hs
+++ b/src/Streamly/Streams/Ahead.hs
@@ -8,6 +8,10 @@
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
+#ifdef DIAGNOSTICS_VERBOSE
+#define DIAGNOSTICS
+#endif
+
-- |
-- Module : Streamly.Streams.Ahead
-- Copyright : (c) 2017 Harendra Kumar
@@ -27,7 +31,8 @@ module Streamly.Streams.Ahead
)
where
-import Control.Monad (ap)
+import Control.Concurrent.MVar (putMVar, takeMVar)
+import Control.Monad (ap, void)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class (MonadError(..))
@@ -35,11 +40,11 @@ import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
-import Data.Atomics (atomicModifyIORefCAS_)
import Data.Heap (Heap, Entry(..))
-import Data.IORef (IORef, readIORef)
+import Data.IORef (IORef, readIORef, atomicModifyIORef)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
+import GHC.Exts (inline)
import qualified Data.Heap as H
@@ -113,111 +118,399 @@ import Prelude hiding (map)
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
+--
+-- XXX Also note that limiting concurrency for cases like "take 10" would not
+-- work well with left associative expressions, because we have no visibility
+-- about how much the left side of the expression would yield.
+--
+-- XXX It may be a good idea to increment sequence numbers for each yield,
+-- currently a stream on the left side of the expression may yield many
+-- elements with the same sequene number. We can then use the seq number to
+-- enforce yieldMax and yieldLImit as well.
-workLoopAhead :: MonadIO m
- => State Stream m a
- -> IORef ([Stream m a], Int)
+-- Invariants:
+--
+-- * A worker should always ensure that it pushes all the consecutive items in
+-- the heap to the outputQueue especially the items on behalf of the workers
+-- that have already left when we were holding the token. This avoids deadlock
+-- conditions when the later workers completion depends on the consumption of
+-- earlier results. For more details see comments in the consumer pull side
+-- code.
+
+{-# INLINE underMaxHeap #-}
+underMaxHeap ::
+ SVar Stream m a
+ -> Heap (Entry Int (AheadHeapEntry Stream m a))
+ -> IO Bool
+underMaxHeap sv hp = do
+ (_, len) <- readIORef (outputQueue sv)
+
+ -- XXX simplify this
+ let maxHeap = case maxBufferLimit sv of
+ Limited lim -> Limited $
+ if (fromIntegral lim) >= len
+ then lim - (fromIntegral len)
+ else 0
+ Unlimited -> Unlimited
+
+ case maxHeap of
+ Limited lim -> do
+ active <- readIORef (workerCount sv)
+ return $ H.size hp + active <= (fromIntegral lim)
+ Unlimited -> return True
+
+-- Return value:
+-- True => stop
+-- False => continue
+preStopCheck ::
+ SVar Stream m a
+ -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
+ -> IO Bool
+preStopCheck sv heap = do
+ -- check the stop condition under a lock before actually
+ -- stopping so that the whole herd does not stop at once.
+ takeMVar (workerStopMVar sv)
+ let stop = do
+ putMVar (workerStopMVar sv) ()
+ return True
+ continue = do
+ putMVar (workerStopMVar sv) ()
+ return False
+ (hp, _) <- readIORef heap
+ heapOk <- underMaxHeap sv hp
+ if heapOk
+ then
+ case yieldRateInfo sv of
+ Nothing -> continue
+ Just yinfo -> do
+ rateOk <- isBeyondMaxRate sv yinfo
+ if rateOk then continue else stop
+ else stop
+
+processHeap :: MonadIO m
+ => IORef ([Stream m a], Int)
+ -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> AheadHeapEntry Stream m a
+ -> Int
+ -> Bool -- we are draining the heap before we stop
+ -> m ()
+processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
+
+ where
+
+ stopIfNeeded ent seqNo r = do
+ stopIt <- liftIO $ preStopCheck sv heap
+ if stopIt
+ then liftIO $ do
+ -- put the entry back in the heap and stop
+ atomicModifyIORef heap $ \(h, _) ->
+ ((H.insert (Entry seqNo ent) h, seqNo), ())
+ sendStop sv winfo
+ else runStreamWithYieldLimit True seqNo r
+
+ loopHeap seqNo ent = do
+#ifdef DIAGNOSTICS
+ liftIO $ do
+ maxHp <- readIORef (maxHeapSize $ svarStats sv)
+ (hp, _) <- readIORef heap
+ when (H.size hp > maxHp) $ writeIORef (maxHeapSize $ svarStats sv)
+ (H.size hp)
+#endif
+ case ent of
+ AheadEntryPure a -> do
+ -- Use 'send' directly so that we do not account this in worker
+ -- latency as this will not be the real latency.
+ -- Don't stop the worker in this case as we are just
+ -- transferring available results from heap to outputQueue.
+ void $ liftIO $ send sv (ChildYield a)
+ nextHeap seqNo
+ AheadEntryStream r -> do
+ if stopping
+ then stopIfNeeded ent seqNo r
+ else runStreamWithYieldLimit True seqNo r
+
+ nextHeap prevSeqNo = do
+ -- XXX use "dequeueIfSeqential prevSeqNo" instead of always
+ -- updating the sequence number in heap.
+ liftIO $ atomicModifyIORef heap $ \(h, _) -> ((h, prevSeqNo + 1), ())
+ ent <- liftIO $ dequeueFromHeap heap
+ case ent of
+ Just (Entry seqNo hent) -> loopHeap seqNo hent
+ Nothing -> do
+ if stopping
+ then do
+ r <- liftIO $ preStopCheck sv heap
+ if r
+ then liftIO $ sendStop sv winfo
+ else processWorkQueue prevSeqNo
+ else (inline processWorkQueue) prevSeqNo
+
+ processWorkQueue prevSeqNo = do
+ work <- dequeueAhead q
+ case work of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just (m, seqNo) -> do
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ if yieldLimitOk
+ then do
+ if seqNo == prevSeqNo + 1
+ then processWithToken q heap st sv winfo m seqNo
+ else processWithoutToken q heap st sv winfo m seqNo
+ else liftIO $ do
+ liftIO $ reEnqueueAhead sv q m
+ incrementYieldLimit sv
+ sendStop sv winfo
+
+ -- We do not stop the worker on buffer full here as we want to proceed to
+ -- nextHeap anyway so that we can clear any subsequent entries. We stop
+ -- only in yield continuation where we may have a remaining stream to be
+ -- pushed on the heap.
+ singleStreamFromHeap seqNo a = do
+ void $ liftIO $ sendYield sv winfo (ChildYield a)
+ nextHeap seqNo
+
+ -- XXX when we have an unfinished stream on the heap we cannot account all
+ -- the yields of that stream until it finishes, so if we have picked up
+ -- and executed more actions beyond that in the parent stream and put them
+ -- on the heap then they would eat up some yield limit which is not
+ -- correct, we will think that our yield limit is over even though we have
+ -- to yield items from unfinished stream before them. For this reason, if
+ -- there are pending items in the heap we drain them unconditionally
+ -- without considering the yield limit.
+ runStreamWithYieldLimit continue seqNo r = do
+ _ <- liftIO $ decrementYieldLimit sv
+ if continue -- see comment above -- && yieldLimitOk
+ then do
+ let stop = do
+ liftIO (incrementYieldLimit sv)
+ nextHeap seqNo
+ unStream r st stop
+ (singleStreamFromHeap seqNo)
+ (yieldStreamFromHeap seqNo)
+ else liftIO $ do
+ atomicModifyIORef heap $ \(h, _) ->
+ ((H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo), ())
+ incrementYieldLimit sv
+ sendStop sv winfo
+
+ yieldStreamFromHeap seqNo a r = do
+ continue <- liftIO $ sendYield sv winfo (ChildYield a)
+ runStreamWithYieldLimit continue seqNo r
+
+{-# NOINLINE drainHeap #-}
+drainHeap :: MonadIO m
+ => IORef ([Stream m a], Int)
+ -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> m ()
+drainHeap q heap st sv winfo = do
+ ent <- liftIO $ dequeueFromHeap heap
+ case ent of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just (Entry seqNo hent) ->
+ processHeap q heap st sv winfo hent seqNo True
+
+processWithoutToken :: MonadIO m
+ => IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> Stream m a
+ -> Int
-> m ()
-workLoopAhead st q heap = runHeap
+processWithoutToken q heap st sv winfo m sno = do
+ -- we have already decremented the yield limit for m
+ let stop = do
+ liftIO (incrementYieldLimit sv)
+ workLoopAhead q heap st sv winfo
+
+ unStream m st stop (singleToHeap sno) (yieldToHeap sno)
where
- sv = fromJust $ streamVar st
- maxBuf = bufferHigh st
+ -- XXX to reduce contention each CPU can have its own heap
toHeap seqNo ent = do
- hp <- liftIO $ atomicModifyIORefCAS heap $ \(h, snum) ->
+ -- Heap insertion is an expensive affair so we use a non CAS based
+ -- modification, otherwise contention and retries can make a thread
+ -- context switch and throw it behind other threads which come later in
+ -- sequence.
+ hp <- liftIO $ atomicModifyIORef heap $ \(h, snum) ->
((H.insert (Entry seqNo ent) h, snum), h)
- (_, len) <- liftIO $ readIORef (outputQueue sv)
- let maxHeap = maxBuf - len
- limit <- case maxYieldLimit sv of
- Nothing -> return maxHeap
- Just ref -> do
- r <- liftIO $ readIORef ref
- return $ if r >= 0 then r else maxHeap
- if H.size hp <= limit
- then runHeap
- else liftIO $ sendStop sv
+
+ heapOk <- liftIO $ underMaxHeap sv hp
+ if heapOk
+ then
+ case yieldRateInfo sv of
+ Nothing -> workLoopAhead q heap st sv winfo
+ Just yinfo -> do
+ rateOk <- liftIO $ workerRateControl sv yinfo winfo
+ if rateOk
+ then workLoopAhead q heap st sv winfo
+ else drainHeap q heap st sv winfo
+ else drainHeap q heap st sv winfo
singleToHeap seqNo a = toHeap seqNo (AheadEntryPure a)
yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `K.cons` r))
+processWithToken :: MonadIO m
+ => IORef ([Stream m a], Int)
+ -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> Stream m a
+ -> Int
+ -> m ()
+processWithToken q heap st sv winfo action sno = do
+ -- Note, we enter this function with yield limit already decremented
+ -- XXX deduplicate stop in all invocations
+ let stop = do
+ liftIO (incrementYieldLimit sv)
+ loopWithToken sno
+
+ unStream action st stop (singleOutput sno) (yieldOutput sno)
+
+ where
+
singleOutput seqNo a = do
- continue <- liftIO $ sendYield maxBuf sv (ChildYield a)
+ continue <- liftIO $ sendYield sv winfo (ChildYield a)
if continue
- then runQueueToken seqNo
- else liftIO $ do
- atomicModifyIORefCAS_ heap $ \(h, _) -> (h, seqNo + 1)
- sendStop sv
-
+ then loopWithToken seqNo
+ else do
+ liftIO $ atomicModifyIORef heap $ \(h, _) -> ((h, seqNo + 1), ())
+ drainHeap q heap st sv winfo
+
+ -- XXX use a wrapper function around stop so that we never miss
+ -- incrementing the yield in a stop continuation. Essentiatlly all
+ -- "unstream" calls in this function must increment yield limit on stop.
yieldOutput seqNo a r = do
- continue <- liftIO $ sendYield maxBuf sv (ChildYield a)
- if continue
- then unStream r st (runQueueToken seqNo)
- (singleOutput seqNo)
- (yieldOutput seqNo)
- else liftIO $ do
- atomicModifyIORefCAS_ heap $ \(h, _) ->
- (H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo)
- sendStop sv
-
- {-# INLINE runQueueToken #-}
- runQueueToken prevSeqNo = do
+ continue <- liftIO $ sendYield sv winfo (ChildYield a)
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ if continue && yieldLimitOk
+ then do
+ let stop = do
+ liftIO (incrementYieldLimit sv)
+ loopWithToken seqNo
+ unStream r st stop
+ (singleOutput seqNo)
+ (yieldOutput seqNo)
+ else do
+ liftIO $ atomicModifyIORef heap $ \(h, _) ->
+ ((H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo), ())
+ liftIO $ incrementYieldLimit sv
+ drainHeap q heap st sv winfo
+
+ loopWithToken prevSeqNo = do
work <- dequeueAhead q
case work of
Nothing -> do
- liftIO $ atomicModifyIORefCAS_ heap $ \(h, _) ->
- (h, prevSeqNo + 1)
- runHeap
+ liftIO $ atomicModifyIORef heap $ \(h, _) ->
+ ((h, prevSeqNo + 1), ())
+ workLoopAhead q heap st sv winfo
+
Just (m, seqNo) -> do
- if seqNo == prevSeqNo + 1
- then
- unStream m st (runQueueToken seqNo)
- (singleOutput seqNo)
- (yieldOutput seqNo)
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ if yieldLimitOk
+ then do
+ if seqNo == prevSeqNo + 1
+ then do
+ let stop = do
+ liftIO (incrementYieldLimit sv)
+ loopWithToken seqNo
+ unStream m st stop
+ (singleOutput seqNo)
+ (yieldOutput seqNo)
+ else do
+ liftIO $ atomicModifyIORef heap $ \(h, _) ->
+ ((h, prevSeqNo + 1), ())
+ liftIO (incrementYieldLimit sv)
+ -- To avoid a race when another thread puts something
+ -- on the heap and goes away, the consumer will not get
+ -- a doorBell and we will not clear the heap before
+ -- executing the next action. If the consumer depends
+ -- on the output that is stuck in the heap then this
+ -- will result in a deadlock. So we always clear the
+ -- heap before executing the next action.
+ liftIO $ reEnqueueAhead sv q m
+ workLoopAhead q heap st sv winfo
else do
- liftIO $ atomicModifyIORefCAS_ heap $ \(h, _) ->
- (h, prevSeqNo + 1)
- unStream m st runHeap
- (singleToHeap seqNo)
- (yieldToHeap seqNo)
- runQueueNoToken = do
- work <- dequeueAhead q
- case work of
- Nothing -> runHeap
- Just (m, seqNo) -> do
- if seqNo == 0
- then
- unStream m st (runQueueToken seqNo)
- (singleOutput seqNo)
- (yieldOutput seqNo)
- else
- unStream m st runHeap
- (singleToHeap seqNo)
- (yieldToHeap seqNo)
-
- {-# NOINLINE runHeap #-}
- runHeap = do
+ liftIO $ atomicModifyIORef heap $ \(h, _) ->
+ ((h, prevSeqNo + 1), ())
+ liftIO $ reEnqueueAhead sv q m
+ liftIO $ incrementYieldLimit sv
+ drainHeap q heap st sv winfo
+
+-- XXX the yield limit changes increased the performance overhead by 30-40%.
+-- Just like AsyncT we can use an implementation without yeidlimit and even
+-- without pacing code to keep the performance higher in the unlimited and
+-- unpaced case.
+--
+-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
+-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
+-- hooks can be used for a more general implementation to even check predicates
+-- and not just yield limit.
+
+workLoopAhead :: MonadIO m
+ => IORef ([Stream m a], Int)
+ -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> m ()
+workLoopAhead q heap st sv winfo = do
#ifdef DIAGNOSTICS
liftIO $ do
- maxHp <- readIORef (maxHeapSize sv)
+ maxHp <- readIORef (maxHeapSize $ svarStats sv)
(hp, _) <- readIORef heap
- when (H.size hp > maxHp) $ writeIORef (maxHeapSize sv) (H.size hp)
+ when (H.size hp > maxHp) $ writeIORef (maxHeapSize $ svarStats sv)
+ (H.size hp)
#endif
ent <- liftIO $ dequeueFromHeap heap
case ent of
Nothing -> do
- done <- queueEmptyAhead q
- if done
- then liftIO $ sendStop sv
- else runQueueNoToken
- Just (Entry seqNo hent) -> do
- case hent of
- AheadEntryPure a -> singleOutput seqNo a
- AheadEntryStream r ->
- unStream r st (runQueueToken seqNo)
- (singleOutput seqNo)
- (yieldOutput seqNo)
+ -- Before we execute the next item from the work queue we check
+ -- if we are beyond the yield limit. It is better to check the
+ -- yield limit before we pick up the next item. Otherwise we
+ -- may have already started more tasks even though we may have
+ -- reached the yield limit. We can avoid this by taking active
+ -- workers into account, but that is not as reliable, because
+ -- workers may go away without picking up work and yielding a
+ -- value.
+ --
+ -- Rate control can be done either based on actual yields in
+ -- the output queue or based on any yield either to the heap or
+ -- to the output queue. In both cases we may have one issue or
+ -- the other. We chose to do this based on actual yields to the
+ -- output queue because it makes the code common to both async
+ -- and ahead streams.
+ --
+ work <- dequeueAhead q
+ case work of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just (m, seqNo) -> do
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ if yieldLimitOk
+ then do
+ if seqNo == 0
+ then processWithToken q heap st sv winfo m seqNo
+ else processWithoutToken q heap st sv winfo m seqNo
+ else liftIO $ do
+ -- If some worker decremented the yield limit but
+ -- then did not yield anything and therefore
+ -- incremented it later, then if we did not requeue
+ -- m here we may find the work queue empty and
+ -- therefore miss executing the remaining action.
+ liftIO $ reEnqueueAhead sv q m
+ incrementYieldLimit sv
+ sendStop sv winfo
+ Just (Entry seqNo hent) ->
+ processHeap q heap st sv winfo hent seqNo False
-------------------------------------------------------------------------------
-- WAhead
@@ -341,7 +634,8 @@ instance IsStream AheadT where
-- @since 0.3.0
{-# INLINE ahead #-}
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
-ahead m1 m2 = fromStream $ aheadS (toStream m1) (toStream m2)
+ahead m1 m2 = fromStream $ Stream $ \st stp sng yld ->
+ unStream (aheadS (toStream m1) (toStream m2)) st stp sng yld
instance MonadAsync m => Semigroup (AheadT m a) where
(<>) = ahead
diff --git a/src/Streamly/Streams/Async.hs b/src/Streamly/Streams/Async.hs
index af1be32..9fbd779 100644
--- a/src/Streamly/Streams/Async.hs
+++ b/src/Streamly/Streams/Async.hs
@@ -4,10 +4,15 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
+#ifdef DIAGNOSTICS_VERBOSE
+#define DIAGNOSTICS
+#endif
+
-- |
-- Module : Streamly.Streams.Async
-- Copyright : (c) 2017 Harendra Kumar
@@ -44,7 +49,7 @@ import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
-import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ)
+import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
@@ -58,51 +63,186 @@ import Streamly.SVar
import Streamly.Streams.StreamK (IsStream(..), Stream(..), adapt)
import qualified Streamly.Streams.StreamK as K
+#ifdef DIAGNOSTICS
+import Control.Concurrent (myThreadId)
+#endif
+
#include "Instances.hs"
-------------------------------------------------------------------------------
-- Async
-------------------------------------------------------------------------------
-{-# INLINE runStreamLIFO #-}
-runStreamLIFO :: MonadIO m
- => State Stream m a -> IORef [Stream m a] -> Stream m a -> m () -> m ()
-runStreamLIFO st q m stop = unStream m st stop single yieldk
+{-# INLINE workLoopLIFO #-}
+workLoopLIFO
+ :: MonadIO m
+ => IORef [Stream m a]
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> m ()
+workLoopLIFO q st sv winfo = run
+
where
- sv = fromJust $ streamVar st
- maxBuf = bufferHigh st
+
+ run = do
+ work <- dequeue
+ case work of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just m -> unStream m st run single yieldk
+
single a = do
- res <- liftIO $ sendYield maxBuf sv (ChildYield a)
- if res then stop else liftIO $ sendStop sv
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ if res then run else liftIO $ sendStop sv winfo
+
yieldk a r = do
- res <- liftIO $ sendYield maxBuf sv (ChildYield a)
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
- then (unStream r) st stop single yieldk
- else liftIO $ enqueueLIFO sv q r >> sendStop sv
+ then unStream r st run single yieldk
+ else liftIO $ do
+ enqueueLIFO sv q r
+ sendStop sv winfo
+
+ dequeue = liftIO $ atomicModifyIORefCAS q $ \case
+ [] -> ([], Nothing)
+ x : xs -> (xs, Just x)
+
+-- We duplicate workLoop for yield limit and no limit cases because it has
+-- around 40% performance overhead in the worst case.
+--
+-- XXX we can pass yinfo directly as an argument here so that we do not have to
+-- make a check every time.
+{-# INLINE workLoopLIFOLimited #-}
+workLoopLIFOLimited
+ :: MonadIO m
+ => IORef [Stream m a]
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
+ -> m ()
+workLoopLIFOLimited q st sv winfo = run
+
+ where
+
+ run = do
+ work <- dequeue
+ case work of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just m -> do
+ -- XXX This is just a best effort minimization of concurrency
+ -- to the yield limit. If the stream is made of concurrent
+ -- streams we do not reserve the yield limit in the constituent
+ -- streams before executing the action. This can be done
+ -- though, by sharing the yield limit ref with downstream
+ -- actions via state passing. Just a todo.
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ if yieldLimitOk
+ then do
+ let stop = liftIO (incrementYieldLimit sv) >> run
+ unStream m st stop single yieldk
+ -- Avoid any side effects, undo the yield limit decrement if we
+ -- never yielded anything.
+ else liftIO $ do
+ enqueueLIFO sv q m
+ incrementYieldLimit sv
+ sendStop sv winfo
+
+ single a = do
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ if res then run else liftIO $ sendStop sv winfo
+
+ -- XXX can we pass on the yield limit downstream to limit the concurrency
+ -- of constituent streams.
+ yieldk a r = do
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ let stop = liftIO (incrementYieldLimit sv) >> run
+ if res && yieldLimitOk
+ then unStream r st stop single yieldk
+ else liftIO $ do
+ incrementYieldLimit sv
+ enqueueLIFO sv q r
+ sendStop sv winfo
+
+ dequeue = liftIO $ atomicModifyIORefCAS q $ \case
+ [] -> ([], Nothing)
+ x : xs -> (xs, Just x)
-------------------------------------------------------------------------------
-- WAsync
-------------------------------------------------------------------------------
-{-# INLINE runStreamFIFO #-}
-runStreamFIFO
+{-# INLINE workLoopFIFO #-}
+workLoopFIFO
:: MonadIO m
- => State Stream m a
- -> LinkedQueue (Stream m a)
- -> Stream m a
+ => LinkedQueue (Stream m a)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
-> m ()
+workLoopFIFO q st sv winfo = run
+
+ where
+
+ run = do
+ work <- liftIO $ tryPopR q
+ case work of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just m -> unStream m st run single yieldk
+
+ single a = do
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ if res then run else liftIO $ sendStop sv winfo
+
+ yieldk a r = do
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ if res
+ then unStream r st run single yieldk
+ else liftIO $ do
+ enqueueFIFO sv q r
+ sendStop sv winfo
+
+{-# INLINE workLoopFIFOLimited #-}
+workLoopFIFOLimited
+ :: MonadIO m
+ => LinkedQueue (Stream m a)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> WorkerInfo
-> m ()
-runStreamFIFO st q m stop = unStream m st stop single yieldk
+workLoopFIFOLimited q st sv winfo = run
+
where
- sv = fromJust $ streamVar st
- maxBuf = bufferHigh st
+
+ run = do
+ work <- liftIO $ tryPopR q
+ case work of
+ Nothing -> liftIO $ sendStop sv winfo
+ Just m -> do
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ if yieldLimitOk
+ then do
+ let stop = liftIO (incrementYieldLimit sv) >> run
+ unStream m st stop single yieldk
+ else liftIO $ do
+ enqueueFIFO sv q m
+ incrementYieldLimit sv
+ sendStop sv winfo
+
single a = do
- res <- liftIO $ sendYield maxBuf sv (ChildYield a)
- if res then stop else liftIO $ sendStop sv
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ if res then run else liftIO $ sendStop sv winfo
+
yieldk a r = do
- res <- liftIO $ sendYield maxBuf sv (ChildYield a)
- liftIO (enqueueFIFO sv q r)
- if res then stop else liftIO $ sendStop sv
+ res <- liftIO $ sendYield sv winfo (ChildYield a)
+ yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ let stop = liftIO (incrementYieldLimit sv) >> run
+ if res && yieldLimitOk
+ then unStream r st stop single yieldk
+ else liftIO $ do
+ incrementYieldLimit sv
+ enqueueFIFO sv q r
+ sendStop sv winfo
-------------------------------------------------------------------------------
-- SVar creation
@@ -120,43 +260,96 @@ getLifoSVar st = do
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
- q <- newIORef []
- yl <- case yieldLimit st of
- Nothing -> return Nothing
- Just x -> Just <$> newIORef x
-#ifdef DIAGNOSTICS
- disp <- newIORef 0
+ q <- newIORef []
+ yl <- case getYieldLimit st of
+ Nothing -> return Nothing
+ Just x -> Just <$> newIORef x
+ rateInfo <- getYieldRateInfo st
+
+ disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
+ avgLat <- newIORef (0, NanoSecs 0)
+ maxLat <- newIORef (NanoSecs 0)
+ minLat <- newIORef (NanoSecs 0)
+ stpTime <- newIORef Nothing
+#ifdef DIAGNOSTICS
+ tid <- myThreadId
#endif
- let checkEmpty = null <$> readIORef q
- let sv =
- SVar { outputQueue = outQ
- , maxYieldLimit = yl
- , outputDoorBell = outQMv
- , readOutputQ = readOutputQBounded (threadsHigh st) sv
- , postProcess = postProcessBounded sv
- , workerThreads = running
- , workLoop = workLoopLIFO runStreamLIFO
- st{streamVar = Just sv} q
- , enqueue = enqueueLIFO sv q
- , isWorkDone = checkEmpty
- , needDoorBell = wfw
- , svarStyle = AsyncVar
- , workerCount = active
- , accountThread = delThread sv
+
+ let isWorkFinished _ = null <$> readIORef q
+
+ let isWorkFinishedLimited sv = do
+ yieldsDone <-
+ case remainingYields sv of
+ Just ref -> do
+ n <- readIORef ref
+ return (n <= 0)
+ Nothing -> return False
+ qEmpty <- null <$> readIORef q
+ return $ qEmpty || yieldsDone
+
+ let getSVar sv readOutput postProc workDone wloop = SVar
+ { outputQueue = outQ
+ , remainingYields = yl
+ , maxBufferLimit = getMaxBuffer st
+ , maxWorkerLimit = getMaxThreads st
+ , yieldRateInfo = rateInfo
+ , outputDoorBell = outQMv
+ , readOutputQ = readOutput sv
+ , postProcess = postProc sv
+ , workerThreads = running
+ , workLoop = wloop q st{streamVar = Just sv} sv
+ , enqueue = enqueueLIFO sv q
+ , isWorkDone = workDone sv
+ , needDoorBell = wfw
+ , svarStyle = AsyncVar
+ , workerCount = active
+ , accountThread = delThread sv
+ , workerStopMVar = undefined
+ , svarRef = Nothing
#ifdef DIAGNOSTICS
- , aheadWorkQueue = undefined
- , outputHeap = undefined
- , maxWorkers = maxWrk
- , totalDispatches = disp
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
+ , svarCreator = tid
+ , aheadWorkQueue = undefined
+ , outputHeap = undefined
#endif
- }
+ , svarStats = SVarStats
+ { totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+ , avgWorkerLatency = avgLat
+ , minWorkerLatency = minLat
+ , maxWorkerLatency = maxLat
+ , svarStopTime = stpTime
+ }
+ }
+
+ let sv =
+ case getStreamRate st of
+ Nothing ->
+ case getYieldLimit st of
+ Nothing -> getSVar sv readOutputQBounded
+ postProcessBounded
+ isWorkFinished
+ workLoopLIFO
+ Just _ -> getSVar sv readOutputQBounded
+ postProcessBounded
+ isWorkFinishedLimited
+ workLoopLIFOLimited
+ Just _ ->
+ case getYieldLimit st of
+ Nothing -> getSVar sv readOutputQPaced
+ postProcessPaced
+ isWorkFinished
+ workLoopLIFO
+ Just _ -> getSVar sv readOutputQPaced
+ postProcessPaced
+ isWorkFinishedLimited
+ workLoopLIFOLimited
in return sv
getFifoSVar :: MonadAsync m => State Stream m a -> IO (SVar Stream m a)
@@ -167,41 +360,94 @@ getFifoSVar st = do
wfw <- newIORef False
running <- newIORef S.empty
q <- newQ
- yl <- case yieldLimit st of
- Nothing -> return Nothing
- Just x -> Just <$> newIORef x
-#ifdef DIAGNOSTICS
+ yl <- case getYieldLimit st of
+ Nothing -> return Nothing
+ Just x -> Just <$> newIORef x
+ rateInfo <- getYieldRateInfo st
+
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
+ avgLat <- newIORef (0, NanoSecs 0)
+ maxLat <- newIORef (NanoSecs 0)
+ minLat <- newIORef (NanoSecs 0)
+ stpTime <- newIORef Nothing
+#ifdef DIAGNOSTICS
+ tid <- myThreadId
#endif
- let sv =
- SVar { outputQueue = outQ
- , maxYieldLimit = yl
- , outputDoorBell = outQMv
- , readOutputQ = readOutputQBounded (threadsHigh st) sv
- , postProcess = postProcessBounded sv
- , workerThreads = running
- , workLoop = workLoopFIFO runStreamFIFO
- st{streamVar = Just sv} q
- , enqueue = enqueueFIFO sv q
- , isWorkDone = nullQ q
- , needDoorBell = wfw
- , svarStyle = WAsyncVar
- , workerCount = active
- , accountThread = delThread sv
+
+ let isWorkFinished _ = nullQ q
+ let isWorkFinishedLimited sv = do
+ yieldsDone <-
+ case remainingYields sv of
+ Just ref -> do
+ n <- readIORef ref
+ return (n <= 0)
+ Nothing -> return False
+ qEmpty <- nullQ q
+ return $ qEmpty || yieldsDone
+
+ let getSVar sv readOutput postProc workDone wloop = SVar
+ { outputQueue = outQ
+ , remainingYields = yl
+ , maxBufferLimit = getMaxBuffer st
+ , maxWorkerLimit = getMaxThreads st
+ , yieldRateInfo = rateInfo
+ , outputDoorBell = outQMv
+ , readOutputQ = readOutput sv
+ , postProcess = postProc sv
+ , workerThreads = running
+ , workLoop = wloop q st{streamVar = Just sv} sv
+ , enqueue = enqueueFIFO sv q
+ , isWorkDone = workDone sv
+ , needDoorBell = wfw
+ , svarStyle = WAsyncVar
+ , workerCount = active
+ , accountThread = delThread sv
+ , workerStopMVar = undefined
+ , svarRef = Nothing
#ifdef DIAGNOSTICS
- , aheadWorkQueue = undefined
- , outputHeap = undefined
- , totalDispatches = disp
+ , svarCreator = tid
+ , aheadWorkQueue = undefined
+ , outputHeap = undefined
+#endif
+ , svarStats = SVarStats
+ { totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
-#endif
- }
+ , avgWorkerLatency = avgLat
+ , minWorkerLatency = minLat
+ , maxWorkerLatency = maxLat
+ , svarStopTime = stpTime
+ }
+ }
+
+ let sv =
+ case getStreamRate st of
+ Nothing ->
+ case getYieldLimit st of
+ Nothing -> getSVar sv readOutputQBounded
+ postProcessBounded
+ isWorkFinished
+ workLoopFIFO
+ Just _ -> getSVar sv readOutputQBounded
+ postProcessBounded
+ isWorkFinishedLimited
+ workLoopFIFOLimited
+ Just _ ->
+ case getYieldLimit st of
+ Nothing -> getSVar sv readOutputQPaced
+ postProcessPaced
+ isWorkFinished
+ workLoopFIFO
+ Just _ -> getSVar sv readOutputQPaced
+ postProcessPaced
+ isWorkFinishedLimited
+ workLoopFIFOLimited
in return sv
{-# INLINABLE newAsyncVar #-}
@@ -209,7 +455,7 @@ newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar st m = do
sv <- liftIO $ getLifoSVar st
- sendWorker sv m
+ sendFirstWorker sv m
-- XXX Get rid of this?
-- | Make a stream asynchronous, triggers the computation and returns a stream
@@ -233,7 +479,7 @@ newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar st m = do
sv <- liftIO $ getFifoSVar st
- sendWorker sv m
+ sendFirstWorker sv m
------------------------------------------------------------------------------
-- Running streams concurrently
@@ -336,8 +582,9 @@ asyncS = joinStreamVarAsync AsyncVar
-- @since 0.2.0
{-# INLINE async #-}
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
-async m1 m2 = fromStream $
- joinStreamVarAsync AsyncVar (toStream m1) (toStream m2)
+async m1 m2 = fromStream $ Stream $ \st stp sng yld ->
+ unStream (joinStreamVarAsync AsyncVar (toStream m1) (toStream m2))
+ st stp sng yld
-- | Same as 'async'.
--
@@ -483,7 +730,8 @@ consMWAsync m r = K.yieldM m `wAsyncS` r
-- @since 0.2.0
{-# INLINE wAsync #-}
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
-wAsync m1 m2 = fromStream $ wAsyncS (toStream m1) (toStream m2)
+wAsync m1 m2 = fromStream $ Stream $ \st stp sng yld ->
+ unStream (wAsyncS (toStream m1) (toStream m2)) st stp sng yld
-- | Wide async composition or async composition with breadth first traversal.
-- The Semigroup instance of 'WAsyncT' concurrently /traverses/ the composed
diff --git a/src/Streamly/Streams/Parallel.hs b/src/Streamly/Streams/Parallel.hs
index 0518e53..6e62164 100644
--- a/src/Streamly/Streams/Parallel.hs
+++ b/src/Streamly/Streams/Parallel.hs
@@ -60,25 +60,33 @@ import qualified Streamly.Streams.StreamK as K
-------------------------------------------------------------------------------
{-# NOINLINE runOne #-}
-runOne :: MonadIO m => State Stream m a -> Stream m a -> m ()
-runOne st m = unStream m st stop single yieldk
+runOne :: MonadIO m => State Stream m a -> Stream m a -> WorkerInfo -> m ()
+runOne st m winfo = unStream m st stop single yieldk
where
sv = fromJust $ streamVar st
- stop = liftIO $ sendStop sv
- sendit a = liftIO $ sendYield (-1) sv (ChildYield a)
- single a = sendit a >> stop
+
+ withLimitCheck action = do
+ yieldLimitOk <- liftIO $ decrementYieldLimitPost sv
+ if yieldLimitOk
+ then action
+ else liftIO $ cleanupSVarFromWorker sv
+
+ stop = liftIO $ sendStop sv winfo
+ sendit a = liftIO $ sendYield sv winfo (ChildYield a)
+ single a = sendit a >> withLimitCheck stop
+
-- XXX there is no flow control in parallel case. We should perhaps use a
-- queue and queue it back on that and exit the thread when the outputQueue
-- overflows. Parallel is dangerous because it can accumulate unbounded
-- output in the buffer.
- yieldk a r = void (sendit a) >> runOne st r
+ yieldk a r = void (sendit a) >> withLimitCheck (runOne st r winfo)
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarPar m r = Stream $ \st stp sng yld -> do
- sv <- newParallelVar
+ sv <- newParallelVar st
pushWorkerPar sv (runOne st{streamVar = Just sv} m)
pushWorkerPar sv (runOne st{streamVar = Just sv} r)
(unStream (fromSVar sv)) (rstState st) stp sng yld
@@ -91,7 +99,7 @@ joinStreamVarPar style m1 m2 = Stream $ \st stp sng yld ->
Just sv | svarStyle sv == style -> do
pushWorkerPar sv (runOne st m1)
unStream m2 st stp sng yld
- _ -> unStream (forkSVarPar m1 m2) (rstState st) stp sng yld
+ _ -> unStream (forkSVarPar m1 m2) st stp sng yld
{-# INLINE parallelStream #-}
parallelStream :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
@@ -109,7 +117,9 @@ consMParallel m r = K.yieldM m `parallelStream` r
-- @since 0.2.0
{-# INLINE parallel #-}
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
-parallel m1 m2 = fromStream $ parallelStream (toStream m1) (toStream m2)
+parallel m1 m2 = fromStream $ Stream $ \st stp sng yld -> do
+ unStream (parallelStream (toStream m1) (toStream m2))
+ st stp sng yld
------------------------------------------------------------------------------
-- Convert a stream to parallel
@@ -117,7 +127,7 @@ parallel m1 m2 = fromStream $ parallelStream (toStream m1) (toStream m2)
mkParallel :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkParallel m = do
- sv <- newParallelVar
+ sv <- newParallelVar defState
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
return $ fromSVar sv
@@ -128,9 +138,9 @@ mkParallel m = do
{-# INLINE applyWith #-}
applyWith :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
applyWith f m = fromStream $ Stream $ \st stp sng yld -> do
- sv <- newParallelVar
+ sv <- newParallelVar (rstState st)
pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m))
- unStream (toStream $ f $ fromSVar sv) st stp sng yld
+ unStream (toStream $ f $ fromSVar sv) (rstState st) stp sng yld
------------------------------------------------------------------------------
-- Stream runner concurrent function application
@@ -139,7 +149,7 @@ applyWith f m = fromStream $ Stream $ \st stp sng yld -> do
{-# INLINE runWith #-}
runWith :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
runWith f m = do
- sv <- newParallelVar
+ sv <- newParallelVar defState
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
f $ fromSVar sv
diff --git a/src/Streamly/Streams/SVar.hs b/src/Streamly/Streams/SVar.hs
index 971fdb7..ceb9fe2 100644
--- a/src/Streamly/Streams/SVar.hs
+++ b/src/Streamly/Streams/SVar.hs
@@ -11,6 +11,10 @@
#include "inline.h"
+#ifdef DIAGNOSTICS_VERBOSE
+#define DIAGNOSTICS
+#endif
+
-- |
-- Module : Streamly.Streams.SVar
-- Copyright : (c) 2017 Harendra Kumar
@@ -28,10 +32,24 @@ module Streamly.Streams.SVar
, maxThreads
, maxBuffer
, maxYields
+ , rate
+ , avgRate
+ , minRate
+ , maxRate
+ , constRate
)
where
+import Control.Exception (fromException)
import Control.Monad.Catch (throwM)
+import Data.Int (Int64)
+import Control.Monad.IO.Class (liftIO)
+import Data.IORef (newIORef, mkWeakIORef)
+#ifdef DIAGNOSTICS
+import Data.IORef (writeIORef)
+import System.IO (hPutStrLn, stderr)
+import System.Clock (Clock(Monotonic), getTime)
+#endif
import Streamly.SVar
import Streamly.Streams.StreamK
@@ -42,6 +60,15 @@ import Streamly.Streams.Serial (SerialT)
-- happen, but it may result in unexpected output when threads are left hanging
-- until they are GCed because the consumer went away.
+#ifdef DIAGNOSTICS
+#ifdef DIAGNOSTICS_VERBOSE
+printSVar :: SVar t m a -> String -> IO ()
+printSVar sv how = do
+ svInfo <- dumpSVar sv
+ hPutStrLn stderr $ "\n" ++ how ++ "\n" ++ svInfo
+#endif
+#endif
+
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
@@ -56,9 +83,10 @@ fromStreamVar sv = Stream $ \st stp sng yld -> do
allDone stp = do
#ifdef DIAGNOSTICS
+ t <- liftIO $ getTime Monotonic
+ liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
#ifdef DIAGNOSTICS_VERBOSE
- svInfo <- liftIO $ dumpSVar sv
- liftIO $ hPutStrLn stderr $ "fromStreamVar done\n" ++ svInfo
+ liftIO $ printSVar sv "SVar Done"
#endif
#endif
stp
@@ -78,11 +106,29 @@ fromStreamVar sv = Stream $ \st stp sng yld -> do
accountThread sv tid
case e of
Nothing -> unStream rest (rstState st) stp sng yld
- Just ex -> throwM ex
+ Just ex ->
+ case fromException ex of
+ Just ThreadAbort ->
+ unStream rest (rstState st) stp sng yld
+ Nothing -> throwM ex
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
-fromSVar sv = fromStream $ fromStreamVar sv
+fromSVar sv = do
+ fromStream $ Stream $ \st stp sng yld -> do
+ ref <- liftIO $ newIORef ()
+ _ <- liftIO $ mkWeakIORef ref hook
+ -- We pass a copy of sv to fromStreamVar, so that we know that it has
+ -- no other references, when that copy gets garbage collected "ref"
+ -- will get garbage collected and our hook will be called.
+ unStream (fromStreamVar sv{svarRef = Just ref}) st stp sng yld
+ where
+
+ hook = do
+#ifdef DIAGNOSTICS_VERBOSE
+ printSVar sv "SVar Garbage Collected"
+#endif
+ cleanupSVar sv
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
@@ -95,22 +141,27 @@ toSVar sv m = toStreamVar sv (toStream m)
--
-- XXX need to write these in direct style otherwise they will break fusion.
--
--- | Specify the maximum number of threads that can be spawned concurrently
--- when using concurrent streams. This is not the grand total number of threads
--- but the maximum number of threads at each point of concurrency.
+-- | Specify the maximum number of threads that can be spawned concurrently for
+-- any concurrent combinator in a stream.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. The default value is 1500.
--
+-- When the actions in a stream are IO bound, having blocking IO calls, this
+-- option can be used to control the maximum number of in-flight IO requests.
+-- When the actions are CPU bound this option can be used to
+-- control the amount of CPU used by the stream.
+--
-- @since 0.4.0
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do
- let n' = if n == 0 then defaultMaxThreads else n
- unStream (toStream m) (st {threadsHigh = n'}) stp sng yld
+ unStream (toStream m) (setMaxThreads n st) stp sng yld
+{-
{-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-}
maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
maxThreadsSerial _ = id
+-}
-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
@@ -118,26 +169,136 @@ maxThreadsSerial _ = id
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
+-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
+-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
+-- presence of infinite streams, or very large streams. Especially, it must
+-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
+-- applicative zip streams generates an infinite stream causing unbounded
+-- concurrent generation with no limit on the buffer or threads.
+--
-- @since 0.4.0
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer n m = fromStream $ Stream $ \st stp sng yld -> do
- let n' = if n == 0 then defaultMaxBuffer else n
- unStream (toStream m) (st {bufferHigh = n'}) stp sng yld
+ unStream (toStream m) (setMaxBuffer n st) stp sng yld
+{-
{-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-}
maxBufferSerial :: Int -> SerialT m a -> SerialT m a
maxBufferSerial _ = id
+-}
+
+-- | Specify the pull rate of a stream.
+-- A 'Nothing' value resets the rate to default which is unlimited. When the
+-- rate is specified, concurrent production may be ramped up or down
+-- automatically to achieve the specified yield rate. The specific behavior for
+-- different styles of 'Rate' specifications is documented under 'Rate'. The
+-- effective maximum production rate achieved by a stream is governed by:
+--
+-- * The 'maxThreads' limit
+-- * The 'maxBuffer' limit
+-- * The maximum rate that the stream producer can achieve
+-- * The maximum rate that the stream consumer can achieve
+--
+-- @since 0.5.0
+{-# INLINE_NORMAL rate #-}
+rate :: IsStream t => Maybe Rate -> t m a -> t m a
+rate r m = fromStream $ Stream $ \st stp sng yld -> do
+ case r of
+ Just (Rate low goal _ _) | goal < low ->
+ error "rate: Target rate cannot be lower than minimum rate."
+ Just (Rate _ goal high _) | goal > high ->
+ error "rate: Target rate cannot be greater than maximum rate."
+ Just (Rate low _ high _) | low > high ->
+ error "rate: Minimum rate cannot be greater than maximum rate."
+ _ -> unStream (toStream m) (setStreamRate r st) stp sng yld
+
+{-
+{-# RULES "rate serial" rate = yieldRateSerial #-}
+yieldRateSerial :: Double -> SerialT m a -> SerialT m a
+yieldRateSerial _ = id
+-}
+
+-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
+--
+-- Specifies the average production rate of a stream in number of yields
+-- per second (i.e. @Hertz@). Concurrent production is ramped up or down
+-- automatically to achieve the specified average yield rate. The rate can
+-- go down to half of the specified rate on the lower side and double of
+-- the specified rate on the higher side.
+--
+-- @since 0.5.0
+avgRate :: IsStream t => Double -> t m a -> t m a
+avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound)
+
+-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
+--
+-- Specifies the minimum rate at which the stream should yield values. As
+-- far as possible the yield rate would never be allowed to go below the
+-- specified rate, even though it may possibly go above it at times, the
+-- upper limit is double of the specified rate.
+--
+-- @since 0.5.0
+minRate :: IsStream t => Double -> t m a -> t m a
+minRate r = rate (Just $ Rate r r (2*r) maxBound)
+
+-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
+--
+-- Specifies the maximum rate at which the stream should yield values. As
+-- far as possible the yield rate would never be allowed to go above the
+-- specified rate, even though it may possibly go below it at times, the
+-- lower limit is half of the specified rate. This can be useful in
+-- applications where certain resource usage must not be allowed to go
+-- beyond certain limits.
+--
+-- @since 0.5.0
+maxRate :: IsStream t => Double -> t m a -> t m a
+maxRate r = rate (Just $ Rate (r/2) r r maxBound)
+
+-- | Same as @rate (Just $ Rate r r r 0)@
+--
+-- Specifies a constant yield rate. If for some reason the actual rate
+-- goes above or below the specified rate we do not try to recover it by
+-- increasing or decreasing the rate in future. This can be useful in
+-- applications like graphics frame refresh where we need to maintain a
+-- constant refresh rate.
+--
+-- @since 0.5.0
+constRate :: IsStream t => Double -> t m a -> t m a
+constRate r = rate (Just $ Rate r r r 0)
+
+-- | Specify the average latency, in nanoseconds, of a single threaded action
+-- in a concurrent composition. Streamly can measure the latencies, but that is
+-- possible only after at least one task has completed. This combinator can be
+-- used to provide a latency hint so that rate control using 'rate' can take
+-- that into account right from the beginning. When not specified then a
+-- default behavior is chosen which could be too slow or too fast, and would be
+-- restricted by any other control parameters configured.
+-- A value of 0 indicates default behavior, a negative value means there is no
+-- limit i.e. zero latency.
+-- This would normally be useful only in high latency and high throughput
+-- cases.
+--
+{-# INLINE_NORMAL _serialLatency #-}
+_serialLatency :: IsStream t => Int -> t m a -> t m a
+_serialLatency n m = fromStream $ Stream $ \st stp sng yld -> do
+ unStream (toStream m) (setStreamLatency n st) stp sng yld
+
+{-
+{-# RULES "serialLatency serial" _serialLatency = serialLatencySerial #-}
+serialLatencySerial :: Int -> SerialT m a -> SerialT m a
+serialLatencySerial _ = id
+-}
-- Stop concurrent dispatches after this limit. This is useful in API's like
-- "take" where we want to dispatch only upto the number of elements "take"
-- needs. This value applies only to the immediate next level and is not
-- inherited by everything in enclosed scope.
{-# INLINE_NORMAL maxYields #-}
-maxYields :: IsStream t => Maybe Int -> t m a -> t m a
+maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields n m = fromStream $ Stream $ \st stp sng yld -> do
- unStream (toStream m) (st {yieldLimit = n}) stp sng yld
+ unStream (toStream m) (setYieldLimit n st) stp sng yld
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
-maxYieldsSerial :: Maybe Int -> SerialT m a -> SerialT m a
+maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial _ = id
diff --git a/src/Streamly/Streams/Serial.hs b/src/Streamly/Streams/Serial.hs
index 0e3d15c..70ebab3 100644
--- a/src/Streamly/Streams/Serial.hs
+++ b/src/Streamly/Streams/Serial.hs
@@ -166,7 +166,9 @@ instance IsStream SerialT where
-- @since 0.2.0
{-# INLINE serial #-}
serial :: IsStream t => t m a -> t m a -> t m a
-serial m1 m2 = fromStream $ K.serial (toStream m1) (toStream m2)
+serial m1 m2 = fromStream $ Stream $ \st stp sng yld ->
+ unStream (K.serial (toStream m1) (toStream m2))
+ (rstState st) stp sng yld
------------------------------------------------------------------------------
-- Monad
@@ -188,6 +190,9 @@ instance Monad m => Monad (SerialT m) where
mapM :: (IsStream t, Monad m) => (a -> m b) -> t m a -> t m b
mapM f m = fromStream $ D.toStreamK $ D.mapM f $ D.fromStreamK (toStream m)
+-- | Same as 'fmap'.
+--
+-- @since 0.4.0
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map f = mapM (return . f)
@@ -295,7 +300,9 @@ interleave m1 m2 = Stream $ \st stp sng yld -> do
-- @since 0.2.0
{-# INLINE wSerial #-}
wSerial :: IsStream t => t m a -> t m a -> t m a
-wSerial m1 m2 = fromStream $ interleave (toStream m1) (toStream m2)
+wSerial m1 m2 = fromStream $ Stream $ \st stp sng yld ->
+ unStream (interleave (toStream m1) (toStream m2))
+ (rstState st) stp sng yld
instance Semigroup (WSerialT m a) where
(<>) = wSerial
diff --git a/src/Streamly/Streams/StreamD.hs b/src/Streamly/Streams/StreamD.hs
index 6b6d66e..80ab83b 100644
--- a/src/Streamly/Streams/StreamD.hs
+++ b/src/Streamly/Streams/StreamD.hs
@@ -564,14 +564,14 @@ dropWhileM f (Stream step state) = Stream step' (DropWhileDrop state)
Yield x s -> do
b <- f x
if b
- then step' (rstState gst) (DropWhileDrop s)
- else step' (rstState gst) (DropWhileYield x s)
+ then step' gst (DropWhileDrop s)
+ else step' gst (DropWhileYield x s)
Stop -> return Stop
step' gst (DropWhileNext st) = do
r <- step (rstState gst) st
case r of
- Yield x s -> step' (rstState gst) (DropWhileYield x s)
+ Yield x s -> step' gst (DropWhileYield x s)
Stop -> return Stop
step' _ (DropWhileYield x st) = return $ Yield x (DropWhileNext st)
@@ -592,7 +592,7 @@ filterM f (Stream step state) = Stream step' state
b <- f x
if b
then return $ Yield x s
- else step' (rstState gst) s
+ else step' gst s
Stop -> return $ Stop
{-# INLINE filter #-}
diff --git a/src/Streamly/Streams/StreamK.hs b/src/Streamly/Streams/StreamK.hs
index 749c562..af07cc7 100644
--- a/src/Streamly/Streams/StreamK.hs
+++ b/src/Streamly/Streams/StreamK.hs
@@ -71,6 +71,7 @@ module Streamly.Streams.StreamK
, foldStream
, foldr
, foldrM
+ , foldr1
, foldl'
, foldlM'
, foldx
@@ -81,6 +82,7 @@ module Streamly.Streams.StreamK
, null
, head
, tail
+ , init
, elem
, notElem
, all
@@ -88,6 +90,9 @@ module Streamly.Streams.StreamK
, last
, minimum
, maximum
+ , findIndices
+ , lookup
+ , find
-- ** Map and Fold
, mapM_
@@ -113,6 +118,9 @@ module Streamly.Streams.StreamK
, mapM
, sequence
+ -- ** Inserting
+ , intersperseM
+
-- ** Map and Filter
, mapMaybe
@@ -141,7 +149,8 @@ import Data.Semigroup (Semigroup(..))
import Prelude
hiding (foldl, foldr, last, map, mapM, mapM_, repeat, sequence,
take, filter, all, any, takeWhile, drop, dropWhile, minimum,
- maximum, elem, notElem, null, head, tail, zipWith)
+ maximum, elem, notElem, null, head, tail, init, zipWith, lookup,
+ foldr1)
import qualified Prelude
import Streamly.SVar
@@ -387,28 +396,9 @@ unfoldrM step = go
-- Special generation
-------------------------------------------------------------------------------
--- Faster than yieldM because there is no bind. Usually we can construct a
--- stream from a pure value using "pure" in an applicative, however in case of
--- Zip streams pure creates an infinite stream.
---
--- | Create a singleton stream from a pure value. In monadic streams, 'pure' or
--- 'return' can be used in place of 'yield', however, in Zip applicative
--- streams 'pure' is equivalent to 'repeat'.
---
--- @since 0.4.0
yield :: IsStream t => a -> t m a
yield a = fromStream $ Stream $ \_ _ single _ -> single a
--- | Create a singleton stream from a monadic action. Same as @m \`consM` nil@
--- but more efficient.
---
--- @
--- > toList $ yieldM getLine
--- hello
--- ["hello"]
--- @
---
--- @since 0.4.0
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM m = fromStream $ Stream $ \_ _ single _ -> m >>= single
@@ -485,6 +475,20 @@ foldrM step acc m = go (toStream m)
yieldk a r = go r >>= step a
in (unStream m1) defState stop single yieldk
+{-# INLINE foldr1 #-}
+foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a)
+foldr1 step m = do
+ r <- uncons m
+ case r of
+ Nothing -> return Nothing
+ Just (h, t) -> go h (toStream t) >>= return . Just
+ where
+ go p m1 =
+ let stp = return p
+ single a = return $ step a p
+ yieldk a r = go a r >>= return . (step p)
+ in unStream m1 defState stp single yieldk
+
-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
-- argument) to the folded value at the end. This is designed to work with the
@@ -570,6 +574,20 @@ tail m =
yieldk _ r = return $ Just $ fromStream r
in unStream (toStream m) defState stop single yieldk
+{-# INLINE init #-}
+init :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
+init m = go1 (toStream m)
+ where
+ go1 m1 = do
+ r <- uncons m1
+ case r of
+ Nothing -> return Nothing
+ Just (h, t) -> return . Just . fromStream $ go h t
+ go p m1 = Stream $ \_ stp sng yld ->
+ let single _ = sng p
+ yieldk a x = yld p $ go a x
+ in unStream m1 defState stp single yieldk
+
{-# INLINE elem #-}
elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
elem e m = go (toStream m)
@@ -659,6 +677,39 @@ maximum m = go Nothing (toStream m)
else go (Just res) r
in unStream m1 defState stop single yieldk
+{-# INLINE lookup #-}
+lookup :: (IsStream t, Monad m, Eq a) => a -> t m (a, b) -> m (Maybe b)
+lookup e m = go (toStream m)
+ where
+ go m1 =
+ let single (a, b) | a == e = return $ Just b
+ | otherwise = return Nothing
+ yieldk (a, b) x | a == e = return $ Just b
+ | otherwise = go x
+ in unStream m1 defState (return Nothing) single yieldk
+
+{-# INLINE find #-}
+find :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m (Maybe a)
+find p m = go (toStream m)
+ where
+ go m1 =
+ let single a | p a = return $ Just a
+ | otherwise = return Nothing
+ yieldk a x | p a = return $ Just a
+ | otherwise = go x
+ in unStream m1 defState (return Nothing) single yieldk
+
+{-# INLINE findIndices #-}
+findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int
+findIndices p = fromStream . go 0 . toStream
+ where
+ go offset m1 = Stream $ \st stp sng yld ->
+ let single a | p a = sng offset
+ | otherwise = stp
+ yieldk a x | p a = yld offset $ go (offset + 1) x
+ | otherwise = unStream (go (offset + 1) x) st stp sng yld
+ in unStream m1 (rstState st) stp single yieldk
+
------------------------------------------------------------------------------
-- Map and Fold
------------------------------------------------------------------------------
@@ -799,6 +850,22 @@ sequence m = go (toStream m)
in (unStream m1) (rstState st) stp single yieldk
-------------------------------------------------------------------------------
+-- Inserting
+-------------------------------------------------------------------------------
+
+{-# INLINE intersperseM #-}
+intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
+intersperseM a m = fromStream $ prependingStart (toStream m)
+ where
+ prependingStart m1 = Stream $ \st stp sng yld ->
+ let yieldk i x = unStream (return i |: go x) st stp sng yld
+ in unStream m1 (rstState st) stp sng yieldk
+ go m2 = fromStream $ Stream $ \st stp sng yld ->
+ let single i = unStream (a |: yield i) st stp sng yld
+ yieldk i x = unStream (a |: return i |: go x) st stp sng yld
+ in unStream m2 (rstState st) stp single yieldk
+
+-------------------------------------------------------------------------------
-- Map and Filter
-------------------------------------------------------------------------------
diff --git a/src/Streamly/Time.hs b/src/Streamly/Time.hs
index e286272..da45a9f 100644
--- a/src/Streamly/Time.hs
+++ b/src/Streamly/Time.hs
@@ -10,6 +10,9 @@
-- Time utilities for reactive programming.
module Streamly.Time
+{-# DEPRECATED
+ "Please use the \"rate\" combinator instead of the functions in this module"
+ #-}
( periodic
, withClock
)
@@ -22,6 +25,7 @@ import Control.Concurrent (threadDelay)
-- second (Hz).
--
-- @since 0.1.0
+{-# DEPRECATED periodic "Please use the \"rate\" combinator instead" #-}
periodic :: Int -> IO () -> IO ()
periodic freq action = do
action
@@ -37,6 +41,7 @@ periodic freq action = do
-- local time as an argument.
--
-- @since 0.1.0
+{-# DEPRECATED withClock "Please use the \"rate\" combinator instead" #-}
withClock :: IO Int -> Int -> (Int -> IO ()) -> IO ()
withClock clock freq action = do
t <- clock
diff --git a/src/Streamly/Tutorial.hs b/src/Streamly/Tutorial.hs
index 0888da4..500226c 100644
--- a/src/Streamly/Tutorial.hs
+++ b/src/Streamly/Tutorial.hs
@@ -1454,16 +1454,15 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- {-\# LANGUAGE FlexibleContexts #-}
--
-- import "Streamly"
--- import Control.Concurrent (threadDelay)
+-- import Streamly.Prelude as S
-- import Control.Monad (when)
-- import Control.Monad.IO.Class (MonadIO(..))
-- import Control.Monad.State (MonadState, get, modify, runStateT)
--- import Data.Semigroup (cycle1)
--
-- data Event = Harm Int | Heal Int | Quit deriving (Show)
--
--- userAction :: MonadIO m => 'SerialT' m Event
--- userAction = cycle1 $ liftIO askUser
+-- userAction :: MonadAsync m => 'SerialT' m Event
+-- userAction = S.repeatM $ liftIO askUser
-- where
-- askUser = do
-- command <- getLine
@@ -1472,8 +1471,8 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- "quit" -> return Quit
-- _ -> putStrLn "What?" >> askUser
--
--- acidRain :: MonadIO m => 'SerialT' m Event
--- acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1)
+-- acidRain :: MonadAsync m => SerialT m Event
+-- acidRain = asyncly $ constRate 1 $ S.repeatM $ liftIO $ return $ Harm 1
--
-- game :: ('MonadAsync' m, MonadState Int m) => 'SerialT' m ()
-- game = do
diff --git a/stack.yaml b/stack.yaml
index 6ad3f4a..5af0dc3 100644
--- a/stack.yaml
+++ b/stack.yaml
@@ -5,7 +5,7 @@ allow-newer: true
extra-deps:
- SDL-0.6.6.0
- gauge-0.2.3
- - bench-graph-0.1.1
+ - bench-graph-0.1.3
- Chart-1.9
- Chart-diagrams-1.9
- SVGFonts-1.6.0.3
diff --git a/streamly.cabal b/streamly.cabal
index 7dd3bba..8239695 100644
--- a/streamly.cabal
+++ b/streamly.cabal
@@ -1,5 +1,5 @@
name: streamly
-version: 0.4.1
+version: 0.5.0
synopsis: Beautiful Streaming, Concurrent and Reactive Composition
description:
Streamly, short for streaming concurrently, provides monadic streams, with a
@@ -173,6 +173,7 @@ library
-- concurrency
, atomic-primops >= 0.8 && < 0.9
, lockfree-queue >= 0.2.3 && < 0.3
+ , clock >= 0.7.1 && < 0.8
-- transfomers
, exceptions >= 0.8 && < 0.11
@@ -189,12 +190,15 @@ library
-- Test suites
-------------------------------------------------------------------------------
+-- Compilation for coverage builds on CI machines takes too long without -O0
+
test-suite test
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
ghc-options: -O0 -Wall -threaded -with-rtsopts=-N
if flag(dev)
+ cpp-options: -DDEVBUILD
ghc-options: -Wmissed-specialisations
-Wall-missed-specialisations
if impl(ghc >= 8.0)
@@ -220,7 +224,7 @@ test-suite properties
type: exitcode-stdio-1.0
main-is: Prop.hs
hs-source-dirs: test
- ghc-options: -O0 -Wall -threaded -with-rtsopts=-N4
+ ghc-options: -Wall -O0 -threaded -with-rtsopts=-N
if flag(dev)
cpp-options: -DDEVBUILD
ghc-options: -Wmissed-specialisations
@@ -237,10 +241,27 @@ test-suite properties
build-depends:
streamly
, base >= 4.8 && < 5
- , QuickCheck >= 2.10 && < 2.12
+ , QuickCheck >= 2.10 && < 2.13
, hspec >= 2.0 && < 3
default-language: Haskell2010
+test-suite maxrate
+ type: exitcode-stdio-1.0
+ default-language: Haskell2010
+ main-is: MaxRate.hs
+ hs-source-dirs: test
+ ghc-options: -O2 -Wall -threaded -with-rtsopts=-N
+ if flag(dev)
+ buildable: True
+ build-Depends:
+ streamly
+ , base >= 4.8 && < 5
+ , clock >= 0.7.1 && < 0.8
+ , hspec >= 2.0 && < 3
+ , random >= 1.0.0 && < 1.2
+ else
+ buildable: False
+
test-suite loops
type: exitcode-stdio-1.0
default-language: Haskell2010
@@ -370,12 +391,13 @@ benchmark base
, gauge >= 0.2.3 && < 0.3
, ghc-prim >= 0.2 && < 0.6
- , containers >= 0.5 && < 0.6
+ , containers >= 0.5 && < 0.7
, heaps >= 0.3 && < 0.4
-- concurrency
, atomic-primops >= 0.8 && < 0.9
, lockfree-queue >= 0.2.3 && < 0.3
+ , clock >= 0.7.1 && < 0.8
, exceptions >= 0.8 && < 0.11
, monad-control >= 1.0 && < 2
@@ -395,7 +417,7 @@ executable chart-linear
buildable: True
build-Depends:
base >= 4.8 && < 5
- , bench-graph
+ , bench-graph >= 0.1 && < 0.2
, split
else
buildable: False
@@ -408,7 +430,7 @@ executable chart-nested
buildable: True
build-Depends:
base >= 4.8 && < 5
- , bench-graph
+ , bench-graph >= 0.1 && < 0.2
, split
else
buildable: False
diff --git a/test/Main.hs b/test/Main.hs
index 97410bf..8fb9c40 100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -12,9 +12,10 @@ import Control.Monad.Error.Class (throwError, MonadError)
import Control.Monad.Trans.Except (runExceptT, ExceptT)
import Data.Foldable (forM_, fold)
import Data.List (sort)
+import System.Mem (performMajorGC)
import Data.IORef
-import Test.Hspec
+import Test.Hspec as H
import Streamly
import Streamly.Prelude ((.:), nil)
@@ -37,6 +38,147 @@ toListParallel = S.toList . wAsyncly
main :: IO ()
main = hspec $ do
+ parallelTests
+
+ -- These are not run parallely because the timing gets affected
+ -- unpredictably when other tests are running on the same machine.
+ describe "Nested parallel and serial compositions" $ do
+ let t = timed
+ p = wAsyncly
+ s = serially
+ {-
+ -- This is not correct, the result can also be [4,4,8,0,8,0,2,2]
+ -- because of parallelism of [8,0] and [8,0].
+ it "Nest <|>, <>, <|> (1)" $
+ let t = timed
+ in toListSerial (
+ ((t 8 <|> t 4) <> (t 2 <|> t 0))
+ <|> ((t 8 <|> t 4) <> (t 2 <|> t 0)))
+ `shouldReturn` ([4,4,8,8,0,0,2,2])
+ -}
+ it "Nest <|>, <>, <|> (2)" $
+ (S.toList . wAsyncly) (
+ s (p (t 4 <> t 8) <> p (t 1 <> t 2))
+ <> s (p (t 4 <> t 8) <> p (t 1 <> t 2)))
+ `shouldReturn` ([4,4,8,8,1,1,2,2])
+ -- FIXME: These two keep failing intermittently on Mac OS X
+ -- Need to examine and fix the tests.
+ {-
+ it "Nest <|>, <=>, <|> (1)" $
+ let t = timed
+ in toListSerial (
+ ((t 8 <|> t 4) <=> (t 2 <|> t 0))
+ <|> ((t 9 <|> t 4) <=> (t 2 <|> t 0)))
+ `shouldReturn` ([4,4,0,0,8,2,9,2])
+ it "Nest <|>, <=>, <|> (2)" $
+ let t = timed
+ in toListSerial (
+ ((t 4 <|> t 8) <=> (t 1 <|> t 2))
+ <|> ((t 4 <|> t 9) <=> (t 1 <|> t 2)))
+ `shouldReturn` ([4,4,1,1,8,2,9,2])
+ -}
+ it "Nest <|>, <|>, <|>" $
+ (S.toList . wAsyncly) (
+ ((t 4 <> t 8) <> (t 0 <> t 2))
+ <> ((t 4 <> t 8) <> (t 0 <> t 2)))
+ `shouldReturn` ([0,0,2,2,4,4,8,8])
+
+ describe "restricts concurrency and cleans up extra tasks" $ do
+ it "take 1 asyncly" $ checkCleanup asyncly (S.take 1)
+ it "take 1 wAsyncly" $ checkCleanup wAsyncly (S.take 1)
+ it "take 1 aheadly" $ checkCleanup aheadly (S.take 1)
+
+ it "takeWhile (< 0) asyncly" $ checkCleanup asyncly (S.takeWhile (< 0))
+ it "takeWhile (< 0) wAsyncly" $ checkCleanup wAsyncly (S.takeWhile (< 0))
+ it "takeWhile (< 0) aheadly" $ checkCleanup aheadly (S.takeWhile (< 0))
+
+#ifdef DEVBUILD
+ -- parallely fails on CI machines, may need more difference in times of
+ -- the events, but that would make tests even slower.
+ it "take 1 parallely" $ checkCleanup parallely (S.take 1)
+ it "takeWhile (< 0) parallely" $ checkCleanup parallely (S.takeWhile (< 0))
+
+ testFoldOpsCleanup "head" S.head
+ testFoldOpsCleanup "null" S.null
+ testFoldOpsCleanup "elem" (S.elem 0)
+ testFoldOpsCleanup "notElem" (S.notElem 0)
+ testFoldOpsCleanup "elemIndex" (S.elemIndex 0)
+ -- S.lookup
+ testFoldOpsCleanup "notElem" (S.notElem 0)
+ testFoldOpsCleanup "find" (S.find (==0))
+ testFoldOpsCleanup "findIndex" (S.findIndex (==0))
+ testFoldOpsCleanup "all" (S.all (==1))
+ testFoldOpsCleanup "any" (S.any (==0))
+ testFoldOpsCleanup "and" (S.and . S.map (==1))
+ testFoldOpsCleanup "or" (S.or . S.map (==0))
+#endif
+
+ ---------------------------------------------------------------------------
+ -- Semigroup/Monoidal Composition strict ordering checks
+ ---------------------------------------------------------------------------
+
+ -- test both (<>) and mappend to make sure we are using correct instance
+ -- for Monoid that is using the right version of semigroup. Instance
+ -- deriving can cause us to pick wrong instances sometimes.
+
+ describe "WSerial interleaved (<>) ordering check" $ interleaveCheck wSerially (<>)
+ describe "WSerial interleaved mappend ordering check" $ interleaveCheck wSerially mappend
+
+ -- describe "WAsync interleaved (<>) ordering check" $ interleaveCheck wAsyncly (<>)
+ -- describe "WAsync interleaved mappend ordering check" $ interleaveCheck wAsyncly mappend
+
+ describe "Async (<>) time order check" $ parallelCheck asyncly (<>)
+ describe "Async mappend time order check" $ parallelCheck asyncly mappend
+
+ -- XXX this keeps failing intermittently, need to investigate
+ -- describe "WAsync (<>) time order check" $ parallelCheck wAsyncly (<>)
+ -- describe "WAsync mappend time order check" $ parallelCheck wAsyncly mappend
+
+ describe "Parallel (<>) time order check" $ parallelCheck parallely (<>)
+ describe "Parallel mappend time order check" $ parallelCheck parallely mappend
+
+checkCleanup :: IsStream t
+ => (t IO Int -> SerialT IO Int)
+ -> (t IO Int -> t IO Int)
+ -> IO ()
+checkCleanup t op = do
+ r <- newIORef (-1 :: Int)
+ runStream . serially $ do
+ _ <- t $ op $ delay r 0 S.|: delay r 1 S.|: delay r 2 S.|: S.nil
+ return ()
+ performMajorGC
+ threadDelay 500000
+ res <- readIORef r
+ res `shouldBe` 0
+ where
+ delay ref i = threadDelay (i*200000) >> writeIORef ref i >> return i
+
+#ifdef DEVBUILD
+checkCleanupFold :: IsStream t
+ => (t IO Int -> SerialT IO Int)
+ -> (SerialT IO Int -> IO (Maybe Int))
+ -> IO ()
+checkCleanupFold t op = do
+ r <- newIORef (-1 :: Int)
+ _ <- op $ t $ delay r 0 S.|: delay r 1 S.|: delay r 2 S.|: S.nil
+ performMajorGC
+ threadDelay 500000
+ res <- readIORef r
+ res `shouldBe` 0
+ where
+ delay ref i = threadDelay (i*200000) >> writeIORef ref i >> return i
+
+testFoldOpsCleanup :: String -> (SerialT IO Int -> IO a) -> Spec
+testFoldOpsCleanup name f = do
+ let testOp op x = op x >> return Nothing
+ it (name ++ " asyncly") $ checkCleanupFold asyncly (testOp f)
+ it (name ++ " wAsyncly") $ checkCleanupFold wAsyncly (testOp f)
+ it (name ++ " aheadly") $ checkCleanupFold aheadly (testOp f)
+ it (name ++ " parallely") $ checkCleanupFold parallely (testOp f)
+#endif
+
+parallelTests :: SpecWith ()
+parallelTests = H.parallel $ do
describe "Runners" $ do
-- XXX move these to property tests
-- XXX use an IORef to store and check the side effects
@@ -142,47 +284,6 @@ main = hspec $ do
, [1, 7, 4, 8, 2, 9, 5, 3, 6]
])
- describe "Nested parallel and serial compositions" $ do
- let t = timed
- p = wAsyncly
- s = serially
- {-
- -- This is not correct, the result can also be [4,4,8,0,8,0,2,2]
- -- because of parallelism of [8,0] and [8,0].
- it "Nest <|>, <>, <|> (1)" $
- let t = timed
- in toListSerial (
- ((t 8 <|> t 4) <> (t 2 <|> t 0))
- <|> ((t 8 <|> t 4) <> (t 2 <|> t 0)))
- `shouldReturn` ([4,4,8,8,0,0,2,2])
- -}
- it "Nest <|>, <>, <|> (2)" $
- (S.toList . wAsyncly) (
- s (p (t 4 <> t 8) <> p (t 1 <> t 2))
- <> s (p (t 4 <> t 8) <> p (t 1 <> t 2)))
- `shouldReturn` ([4,4,8,8,1,1,2,2])
- -- FIXME: These two keep failing intermittently on Mac OS X
- -- Need to examine and fix the tests.
- {-
- it "Nest <|>, <=>, <|> (1)" $
- let t = timed
- in toListSerial (
- ((t 8 <|> t 4) <=> (t 2 <|> t 0))
- <|> ((t 9 <|> t 4) <=> (t 2 <|> t 0)))
- `shouldReturn` ([4,4,0,0,8,2,9,2])
- it "Nest <|>, <=>, <|> (2)" $
- let t = timed
- in toListSerial (
- ((t 4 <|> t 8) <=> (t 1 <|> t 2))
- <|> ((t 4 <|> t 9) <=> (t 1 <|> t 2)))
- `shouldReturn` ([4,4,1,1,8,2,9,2])
- -}
- it "Nest <|>, <|>, <|>" $
- (S.toList . wAsyncly) (
- ((t 4 <> t 8) <> (t 0 <> t 2))
- <> ((t 4 <> t 8) <> (t 0 <> t 2)))
- `shouldReturn` ([0,0,2,2,4,4,8,8])
-
---------------------------------------------------------------------------
-- Monoidal composition recursion loops
---------------------------------------------------------------------------
@@ -364,6 +465,14 @@ main = hspec $ do
describe "take on infinite concurrent stream" $ takeInfinite aheadly
---------------------------------------------------------------------------
+ -- Some ad-hoc tests that failed at times
+ ---------------------------------------------------------------------------
+
+ it "takes n from stream of streams" (takeCombined 1 aheadly)
+ it "takes n from stream of streams" (takeCombined 2 asyncly)
+ it "takes n from stream of streams" (takeCombined 3 wAsyncly)
+
+ ---------------------------------------------------------------------------
-- Folds are strict enough
---------------------------------------------------------------------------
@@ -380,30 +489,6 @@ main = hspec $ do
---------------------------------------------------------------------------
---------------------------------------------------------------------------
- -- Semigroup/Monoidal Composition strict ordering checks
- ---------------------------------------------------------------------------
-
- -- test both (<>) and mappend to make sure we are using correct instance
- -- for Monoid that is using the right version of semigroup. Instance
- -- deriving can cause us to pick wrong instances sometimes.
-
- describe "WSerial interleaved (<>) ordering check" $ interleaveCheck wSerially (<>)
- describe "WSerial interleaved mappend ordering check" $ interleaveCheck wSerially mappend
-
- -- describe "WAsync interleaved (<>) ordering check" $ interleaveCheck wAsyncly (<>)
- -- describe "WAsync interleaved mappend ordering check" $ interleaveCheck wAsyncly mappend
-
- describe "Async (<>) time order check" $ parallelCheck asyncly (<>)
- describe "Async mappend time order check" $ parallelCheck asyncly mappend
-
- -- XXX this keeps failing intermittently, need to investigate
- -- describe "WAsync (<>) time order check" $ parallelCheck wAsyncly (<>)
- -- describe "WAsync mappend time order check" $ parallelCheck wAsyncly mappend
-
- describe "Parallel (<>) time order check" $ parallelCheck parallely (<>)
- describe "Parallel mappend time order check" $ parallelCheck parallely mappend
-
- ---------------------------------------------------------------------------
-- Thread limits
---------------------------------------------------------------------------
@@ -417,6 +502,13 @@ main = hspec $ do
replicate 4000 $ S.yieldM $ threadDelay 1000000)
`shouldReturn` ()
+takeCombined :: (Monad m, Semigroup (t m Int), Show a, Eq a, IsStream t)
+ => Int -> (t m Int -> SerialT IO a) -> IO ()
+takeCombined n t = do
+ let constr = S.fromFoldable
+ r <- (S.toList . t) $
+ S.take n ((constr ([] :: [Int])) <> constr ([] :: [Int]))
+ r `shouldBe` []
checkFoldxStrictness :: IO ()
checkFoldxStrictness = do
diff --git a/test/MaxRate.hs b/test/MaxRate.hs
new file mode 100644
index 0000000..9b1fa81
--- /dev/null
+++ b/test/MaxRate.hs
@@ -0,0 +1,128 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+import Streamly
+import qualified Streamly.Prelude as S
+import Control.Concurrent
+import Control.Monad
+import System.Clock
+import Test.Hspec
+import System.Random
+
+durationShouldBe :: (Double, Double) -> IO () -> Expectation
+durationShouldBe d@(tMin, tMax) action = do
+ t0 <- getTime Monotonic
+ action
+ t1 <- getTime Monotonic
+ let t = (fromIntegral $ toNanoSecs (t1 - t0)) / 1e9
+ -- tMax = fromNanoSecs (round $ d*10^9*1.2)
+ -- tMin = fromNanoSecs (round $ d*10^9*0.8)
+ putStrLn $ "Expected: " ++ show d ++ " Took: " ++ show t
+ (t <= tMax && t >= tMin) `shouldBe` True
+
+toMicroSecs :: Num a => a -> a
+toMicroSecs x = x * 10^(6 :: Int)
+
+measureRate' :: IsStream t
+ => String
+ -> (t IO Int -> SerialT IO Int)
+ -> Double
+ -> Int
+ -> (Double, Double)
+ -> (Double, Double)
+ -> Spec
+measureRate' desc t rval consumerDelay producerDelay dur = do
+ it (desc ++ " rate: " ++ show rval
+ ++ ", consumer latency: " ++ show consumerDelay
+ ++ ", producer latency: " ++ show producerDelay)
+ $ durationShouldBe dur $ do
+ runStream
+ $ (if consumerDelay > 0
+ then S.mapM $ \x ->
+ threadDelay (toMicroSecs consumerDelay) >> return x
+ else id)
+ $ t
+ $ maxBuffer (-1)
+ $ maxThreads (-1)
+ $ avgRate rval
+ $ S.take (round $ rval * 10)
+ $ S.repeatM $ do
+ let (t1, t2) = producerDelay
+ r <- if t1 == t2
+ then return $ round $ toMicroSecs t1
+ else randomRIO ( round $ toMicroSecs t1
+ , round $ toMicroSecs t2)
+ when (r > 0) $ do
+ -- t1 <- getTime Monotonic
+ threadDelay r
+ -- t2 <- getTime Monotonic
+ -- let delta = fromIntegral (toNanoSecs (t2 - t1)) / 1000000000
+ -- putStrLn $ "delay took: " ++ show delta
+ -- when (delta > 2) $ do
+ -- putStrLn $ "delay took high: " ++ show delta
+ return 1
+
+measureRate :: IsStream t
+ => String
+ -> (t IO Int -> SerialT IO Int)
+ -> Double
+ -> Int
+ -> Int
+ -> (Double, Double)
+ -> Spec
+measureRate desc t rval consumerDelay producerDelay dur =
+ let d = fromIntegral producerDelay
+ in measureRate' desc t rval consumerDelay (d, d) dur
+
+main :: IO ()
+main = hspec $ do
+ let range = (8,12)
+
+ -- Note that because after the last yield we don't wait, the last period
+ -- will be effectively shorter. This becomes significant when the rates are
+ -- lower (1 or lower). For rate 1 we lose 1 second in the end and for rate
+ -- 10 0.1 second.
+ let rates = [1, 10, 100, 1000, 10000, 100000, 1000000]
+ in describe "asyncly no consumer delay no producer delay" $ do
+ forM_ rates (\r -> measureRate "asyncly" asyncly r 0 0 range)
+
+ -- XXX try staggering the dispatches to achieve higher rates
+ let rates = [1, 10, 100, 1000, 10000, 25000]
+ in describe "asyncly no consumer delay and 1 sec producer delay" $ do
+ forM_ rates (\r -> measureRate "asyncly" asyncly r 0 1 range)
+
+ -- At lower rates (1/10) this is likely to vary quite a bit depending on
+ -- the spread of random producer latencies generated.
+ let rates = [1, 10, 100, 1000, 10000, 25000]
+ in describe "asyncly no consumer delay and variable producer delay" $ do
+ forM_ rates $ \r ->
+ measureRate' "asyncly" asyncly r 0 (0.1, 3) range
+
+ let rates = [1, 10, 100, 1000, 10000, 100000, 1000000]
+ in describe "wAsyncly no consumer delay no producer delay" $ do
+ forM_ rates (\r -> measureRate "wAsyncly" wAsyncly r 0 0 range)
+
+ let rates = [1, 10, 100, 1000, 10000, 25000]
+ in describe "wAsyncly no consumer delay and 1 sec producer delay" $ do
+ forM_ rates (\r -> measureRate "wAsyncly" wAsyncly r 0 1 range)
+
+ -- XXX does not work well at a million ops per second, need to fix.
+ let rates = [1, 10, 100, 1000, 10000, 100000]
+ in describe "aheadly no consumer delay no producer delay" $ do
+ forM_ rates (\r -> measureRate "aheadly" aheadly r 0 0 range)
+
+ let rates = [1, 10, 100, 1000, 10000, 25000]
+ in describe "aheadly no consumer delay and 1 sec producer delay" $ do
+ forM_ rates (\r -> measureRate "aheadly" aheadly r 0 1 range)
+
+ describe "asyncly with 1 sec producer delay and some consumer delay" $ do
+ -- ideally it should take 10 x 1 + 1 seconds
+ forM_ [1] (\r -> measureRate "asyncly" asyncly r 1 1 (11, 16))
+ -- ideally it should take 10 x 2 + 1 seconds
+ forM_ [1] (\r -> measureRate "asyncly" asyncly r 2 1 (21, 23))
+ -- ideally it should take 10 x 3 + 1 seconds
+ forM_ [1] (\r -> measureRate "asyncly" asyncly r 3 1 (31, 33))
+
+ describe "aheadly with 1 sec producer delay and some consumer delay" $ do
+ forM_ [1] (\r -> measureRate "aheadly" aheadly r 1 1 (11, 16))
+ forM_ [1] (\r -> measureRate "aheadly" aheadly r 2 1 (21, 23))
+ forM_ [1] (\r -> measureRate "aheadly" aheadly r 3 1 (31, 33))
diff --git a/test/Prop.hs b/test/Prop.hs
index 646680b..71d0241 100644
--- a/test/Prop.hs
+++ b/test/Prop.hs
@@ -4,20 +4,22 @@ module Main (main) where
import Control.Exception (BlockedIndefinitelyOnMVar(..), catches,
BlockedIndefinitelyOnSTM(..), Handler(..))
-import Control.Monad (when)
+import Control.Monad (when, forM_)
import Control.Applicative (ZipList(..))
import Control.Concurrent (MVar, takeMVar, putMVar, newEmptyMVar)
import Control.Monad (replicateM, replicateM_)
+import Data.Function ((&))
import Data.IORef (readIORef, modifyIORef, newIORef)
-import Data.List (sort, foldl', scanl')
+import Data.List (sort, foldl', scanl', findIndices, findIndex, elemIndices,
+ elemIndex, find, intersperse, foldl1')
import Data.Maybe (mapMaybe)
import GHC.Word (Word8)
-import Test.Hspec.QuickCheck (prop)
+import Test.Hspec.QuickCheck
import Test.QuickCheck (counterexample, Property, withMaxSuccess)
import Test.QuickCheck.Monadic (run, monadicIO, monitor, assert, PropertyM)
-import Test.Hspec
+import Test.Hspec as H
import Streamly
import Streamly.Prelude ((.:), nil)
@@ -50,24 +52,22 @@ equals eq stream list = do
constructWithReplicateM
:: IsStream t
=> (t IO Int -> SerialT IO Int)
- -> Int
- -> Int
-> Word8
-> Property
-constructWithReplicateM op thr buf len = withMaxSuccess maxTestCount $
+constructWithReplicateM op len = withMaxSuccess maxTestCount $
monadicIO $ do
let x = return (1 :: Int)
- stream <- run $ (S.toList . op) (maxThreads thr $ maxBuffer buf $
- S.replicateM (fromIntegral len) x)
+ stream <- run $ (S.toList . op) (S.replicateM (fromIntegral len) x)
list <- run $ replicateM (fromIntegral len) x
equals (==) stream list
transformFromList
- :: ([Int] -> t IO Int)
- -> ([Int] -> [Int] -> Bool)
- -> ([Int] -> [Int])
- -> (t IO Int -> SerialT IO Int)
- -> [Int]
+ :: Show b =>
+ ([a] -> t IO a)
+ -> ([b] -> [b] -> Bool)
+ -> ([a] -> [b])
+ -> (t IO a -> SerialT IO b)
+ -> [a]
-> Property
transformFromList constr eq listOp op a =
monadicIO $ do
@@ -180,8 +180,12 @@ concurrentUnfoldrM eq op n =
return x
equals eq stream list
-concurrentApplication :: Word8 -> Property
-concurrentApplication n =
+concurrentApplication :: IsStream t
+ => ([Word8] -> [Word8] -> Bool)
+ -> (t IO Word8 -> SerialT IO Word8)
+ -> Word8
+ -> Property
+concurrentApplication eq t n = withMaxSuccess maxTestCount $
monadicIO $ do
-- XXX we should test empty list case as well
let list = [0..n]
@@ -191,7 +195,7 @@ concurrentApplication n =
-- since unfoldr happens in parallel with the stream processing we
-- can do two takeMVar in one iteration. If it is not parallel then
-- this will not work and the test will fail.
- S.toList $ do
+ (S.toList . t) $ do
sourceUnfoldrM mv n |&
(S.mapM $ \x -> do
let msg = show x ++ "/" ++ show n
@@ -205,7 +209,7 @@ concurrentApplication n =
else return ()
else return ()
return x)
- equals (==) stream list
+ equals eq stream list
sourceUnfoldrM1 :: IsStream t => Word8 -> t IO Word8
sourceUnfoldrM1 n = S.unfoldrM step 0
@@ -264,10 +268,10 @@ foldFromList constr op eq a = transformFromList constr eq id op a
eliminateOp
:: (Show a, Eq a)
- => ([Int] -> t IO Int)
- -> ([Int] -> a)
- -> (t IO Int -> IO a)
- -> [Int]
+ => ([s] -> t IO s)
+ -> ([s] -> a)
+ -> (t IO s -> IO a)
+ -> [s]
-> Property
eliminateOp constr listOp op a =
monadicIO $ do
@@ -292,10 +296,10 @@ functorOps
:: Functor (t IO)
=> ([Int] -> t IO Int)
-> String
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> Spec
-functorOps constr desc t eq = do
+functorOps constr desc eq t = do
prop (desc ++ " id") $ transformFromList constr eq id $ t
prop (desc ++ " fmap (+1)") $ transformFromList constr eq (fmap (+1)) $ t . (fmap (+1))
@@ -303,10 +307,10 @@ transformOps
:: IsStream t
=> ([Int] -> t IO Int)
-> String
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> Spec
-transformOps constr desc t eq = do
+transformOps constr desc eq t = do
let transform = transformFromList constr eq
-- Filtering
prop (desc ++ " filter False") $
@@ -347,14 +351,20 @@ transformOps constr desc t eq = do
prop (desc ++ " scan") $ transform (scanl' (+) 0) $ t . (S.scanl' (+) 0)
prop (desc ++ " reverse") $ transform reverse $ t . S.reverse
+ prop (desc ++ " findIndices") $ transform (findIndices odd) $ t . (S.findIndices odd)
+ prop (desc ++ " elemIndices") $ transform (elemIndices 3) $ t . (S.elemIndices 3)
+
+ prop (desc ++ " intersperseM") $ transform (intersperse 3) $ t . (S.intersperseM (return 3))
+
+
concurrentOps
:: IsStream t
=> ([Word8] -> t IO Word8)
-> String
- -> (t IO Word8 -> SerialT IO Word8)
-> ([Word8] -> [Word8] -> Bool)
+ -> (t IO Word8 -> SerialT IO Word8)
-> Spec
-concurrentOps constr desc t eq = do
+concurrentOps constr desc eq t = do
let prop1 d p = prop d $ withMaxSuccess maxTestCount p
prop1 (desc ++ " fromFoldableM") $ concurrentFromFoldable eq t
@@ -375,10 +385,10 @@ transformCombineOpsCommon
:: (IsStream t, Semigroup (t IO Int))
=> ([Int] -> t IO Int)
-> String
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> Spec
-transformCombineOpsCommon constr desc t eq = do
+transformCombineOpsCommon constr desc eq t = do
let transform = transformCombineFromList constr eq
-- Filtering
prop (desc ++ " filter False") $
@@ -432,17 +442,26 @@ transformCombineOpsCommon constr desc t eq = do
(S.scanlM' (\_ a -> return a) 0)
prop (desc ++ " reverse") $ transform reverse t S.reverse
+ prop (desc ++ " intersperseM") $
+ transform (intersperse 3) t (S.intersperseM $ return 3)
+
transformCombineOpsOrdered
:: (IsStream t, Semigroup (t IO Int))
=> ([Int] -> t IO Int)
-> String
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> Spec
-transformCombineOpsOrdered constr desc t eq = do
+transformCombineOpsOrdered constr desc eq t = do
let transform = transformCombineFromList constr eq
-- Filtering
prop (desc ++ " take 1") $ transform (take 1) t (S.take 1)
+#ifdef DEVBUILD
+ prop (desc ++ " take 2") $ transform (take 2) t (S.take 2)
+ prop (desc ++ " take 3") $ transform (take 3) t (S.take 3)
+ prop (desc ++ " take 4") $ transform (take 4) t (S.take 4)
+ prop (desc ++ " take 5") $ transform (take 5) t (S.take 5)
+#endif
prop (desc ++ " take 10") $ transform (take 10) t (S.take 10)
prop (desc ++ " takeWhile > 0") $
@@ -455,6 +474,15 @@ transformCombineOpsOrdered constr desc t eq = do
transform (dropWhile (> 0)) t (S.dropWhile (> 0))
prop (desc ++ " scan") $ transform (scanl' (+) 0) t (S.scanl' (+) 0)
+ -- XXX this does not fail when the SVar is shared, need to fix.
+ prop (desc ++ " concurrent application") $
+ transform (& (map (+1))) t (|& (S.map (+1)))
+
+ prop (desc ++ " findIndices") $
+ transform (findIndices odd) t (S.findIndices odd)
+ prop (desc ++ " elemIndices") $
+ transform (elemIndices 0) t (S.elemIndices 0)
+
wrapMaybe :: Eq a1 => ([a1] -> a2) -> [a1] -> Maybe a2
wrapMaybe f =
\x ->
@@ -470,10 +498,18 @@ eliminationOps
eliminationOps constr desc t = do
-- Elimination
prop (desc ++ " null") $ eliminateOp constr null $ S.null . t
- prop (desc ++ " foldl") $
+ prop (desc ++ " foldl'") $
eliminateOp constr (foldl' (+) 0) $ (S.foldl' (+) 0) . t
+ prop (desc ++ " foldl1'") $
+ eliminateOp constr (wrapMaybe $ foldl1' (+)) $ (S.foldl1' (+)) . t
+ prop (desc ++ " foldr1") $
+ eliminateOp constr (wrapMaybe $ foldr1 (+)) $ (S.foldr1 (+)) . t
prop (desc ++ " all") $ eliminateOp constr (all even) $ (S.all even) . t
prop (desc ++ " any") $ eliminateOp constr (any even) $ (S.any even) . t
+ prop (desc ++ " and") $ eliminateOp constr (and . map (> 0)) $
+ (S.and . S.map (> 0)) . t
+ prop (desc ++ " or") $ eliminateOp constr (or . map (> 0)) $
+ (S.or . S.map (> 0)) . t
prop (desc ++ " length") $ eliminateOp constr length $ S.length . t
prop (desc ++ " sum") $ eliminateOp constr sum $ S.sum . t
prop (desc ++ " product") $ eliminateOp constr product $ S.product . t
@@ -481,6 +517,14 @@ eliminationOps constr desc t = do
prop (desc ++ " maximum") $ eliminateOp constr (wrapMaybe maximum) $ S.maximum . t
prop (desc ++ " minimum") $ eliminateOp constr (wrapMaybe minimum) $ S.minimum . t
+ prop (desc ++ " findIndex") $ eliminateOp constr (findIndex odd) $ (S.findIndex odd) . t
+ prop (desc ++ " elemIndex") $ eliminateOp constr (elemIndex 3) $ (S.elemIndex 3) . t
+
+ prop (desc ++ " find") $ eliminateOp constr (find even) $ (S.find even) . t
+ prop (desc ++ " lookup") $
+ eliminateOp constr (lookup 3 . flip zip [1..]) $
+ S.lookup 3 . S.zipWith (\a b -> (b, a)) (S.fromList [(1::Int)..]) . t
+
-- head/tail/last may depend on the order in case of parallel streams
-- so we test these only for serial streams.
serialEliminationOps
@@ -496,6 +540,11 @@ serialEliminationOps constr desc t = do
Nothing -> return Nothing
Just s -> S.toList s >>= return . Just
prop (desc ++ " last") $ eliminateOp constr (wrapMaybe last) $ S.last . t
+ prop (desc ++ " init") $ eliminateOp constr (wrapMaybe init) $ \x -> do
+ r <- S.init (t x)
+ case r of
+ Nothing -> return Nothing
+ Just s -> S.toList s >>= return . Just
transformOpsWord8
:: ([Word8] -> t IO Word8)
@@ -515,21 +564,21 @@ semigroupOps
#endif
, Monoid (t IO Int))
=> String
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> Spec
-semigroupOps desc t eq = do
+semigroupOps desc eq t = do
prop (desc ++ " <>") $ foldFromList (foldMapWith (<>) singleton) t eq
prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend singleton) t eq
applicativeOps
:: Applicative (t IO)
=> ([Int] -> t IO Int)
- -> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
+ -> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([Int], [Int])
-> Property
-applicativeOps constr t eq (a, b) = withMaxSuccess maxTestCount $
+applicativeOps constr eq t (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
stream <- run ((S.toList . t) ((,) <$> (constr a) <*> (constr b)))
let list = (,) <$> a <*> b
@@ -538,11 +587,11 @@ applicativeOps constr t eq (a, b) = withMaxSuccess maxTestCount $
zipApplicative
:: (IsStream t, Applicative (t IO))
=> ([Int] -> t IO Int)
- -> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
+ -> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([Int], [Int])
-> Property
-zipApplicative constr t eq (a, b) = withMaxSuccess maxTestCount $
+zipApplicative constr eq t (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
stream1 <- run ((S.toList . t) ((,) <$> (constr a) <*> (constr b)))
stream2 <- run ((S.toList . t) (pure (,) <*> (constr a) <*> (constr b)))
@@ -555,11 +604,27 @@ zipApplicative constr t eq (a, b) = withMaxSuccess maxTestCount $
zipMonadic
:: IsStream t
=> ([Int] -> t IO Int)
+ -> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> (t IO (Int, Int) -> SerialT IO (Int, Int))
+ -> ([Int], [Int])
+ -> Property
+zipMonadic constr eq t (a, b) = withMaxSuccess maxTestCount $
+ monadicIO $ do
+ stream1 <-
+ run
+ ((S.toList . t)
+ (S.zipWithM (\x y -> return (x, y)) (constr a) (constr b)))
+ let list = getZipList $ (,) <$> ZipList a <*> ZipList b
+ equals eq stream1 list
+
+zipAsyncMonadic
+ :: IsStream t
+ => ([Int] -> t IO Int)
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
+ -> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([Int], [Int])
-> Property
-zipMonadic constr t eq (a, b) = withMaxSuccess maxTestCount $
+zipAsyncMonadic constr eq t (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
stream1 <-
run
@@ -576,11 +641,11 @@ zipMonadic constr t eq (a, b) = withMaxSuccess maxTestCount $
monadThen
:: Monad (t IO)
=> ([Int] -> t IO Int)
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> ([Int], [Int])
-> Property
-monadThen constr t eq (a, b) = withMaxSuccess maxTestCount $ monadicIO $ do
+monadThen constr eq t (a, b) = withMaxSuccess maxTestCount $ monadicIO $ do
stream <- run ((S.toList . t) ((constr a) >> (constr b)))
let list = a >> b
equals eq stream list
@@ -588,11 +653,11 @@ monadThen constr t eq (a, b) = withMaxSuccess maxTestCount $ monadicIO $ do
monadBind
:: Monad (t IO)
=> ([Int] -> t IO Int)
- -> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
+ -> (t IO Int -> SerialT IO Int)
-> ([Int], [Int])
-> Property
-monadBind constr t eq (a, b) = withMaxSuccess maxTestCount $
+monadBind constr eq t (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
stream <-
run
@@ -601,239 +666,326 @@ monadBind constr t eq (a, b) = withMaxSuccess maxTestCount $
let list = a >>= \x -> b >>= return . (+ x)
equals eq stream list
-constructionConcurrent :: Int -> Int -> Spec
-constructionConcurrent thr buf = do
- describe (" threads = " ++ show thr ++ "buffer = " ++ show buf) $ do
- prop "asyncly replicateM" $ constructWithReplicateM asyncly thr buf
- prop "wAsyncly replicateM" $ constructWithReplicateM wAsyncly thr buf
- prop "parallely replicateM" $ constructWithReplicateM parallely thr buf
- prop "aheadly replicateM" $ constructWithReplicateM aheadly thr buf
-
--- XXX test all concurrent ops for all these combinations
-concurrentAll :: String -> (Int -> Int -> Spec) -> Spec
-concurrentAll desc f = do
- describe desc $ do
- f 0 0 -- default
- f 0 1 -- single buffer
- f 1 0 -- single thread
- f (-1) (-1) -- unbounded threads and buffer
+constructWithIterate :: IsStream t => (t IO Int -> SerialT IO Int) -> Spec
+constructWithIterate t = do
+ it "iterate" $
+ (S.toList . t . (S.take 100) $ (S.iterate (+ 1) (0 :: Int)))
+ `shouldReturn` (take 100 $ iterate (+ 1) 0)
+ it "iterateM" $ do
+ let addM = (\ y -> return (y + 1))
+ S.toList . t . (S.take 100) $ S.iterateM addM (0 :: Int)
+ `shouldReturn` (take 100 $ iterate (+ 1) 0)
main :: IO ()
-main = hspec $ do
+main = hspec
+ $ H.parallel
+#ifdef COVERAGE_BUILD
+ $ modifyMaxSuccess (const 10)
+#endif
+ $ do
let folded :: IsStream t => [a] -> t IO a
folded = serially . (\xs ->
case xs of
[x] -> return x -- singleton stream case
_ -> foldMapWith (<>) return xs
)
+
+ let makeOps t =
+ [ ("default", t)
+#ifndef COVERAGE_BUILD
+ , ("rate AvgRate 10000", t . avgRate 10000)
+ , ("rate Nothing", t . rate Nothing)
+ , ("maxBuffer 0", t . maxBuffer 0)
+ , ("maxBuffer 1", t . maxBuffer 1)
+ , ("maxThreads 0", t . maxThreads 0)
+ , ("maxThreads 1", t . maxThreads 1)
+ , ("maxThreads -1", t . maxThreads (-1))
+#endif
+ ]
+
+ let mapOps spec = mapM_ (\(desc, f) -> describe desc $ spec f)
+ let serialOps :: IsStream t => ((SerialT IO a -> t IO a) -> Spec) -> Spec
+ serialOps spec = mapOps spec $ (makeOps serially)
+#ifndef COVERAGE_BUILD
+ ++ [("rate AvgRate 0.00000001", serially . avgRate 0.00000001)]
+ ++ [("maxBuffer -1", serially . maxBuffer (-1))]
+#endif
+ let wSerialOps :: IsStream t => ((WSerialT IO a -> t IO a) -> Spec) -> Spec
+ wSerialOps spec = mapOps spec $ makeOps wSerially
+#ifndef COVERAGE_BUILD
+ ++ [("rate AvgRate 0.00000001", wSerially . avgRate 0.00000001)]
+ ++ [("maxBuffer (-1)", wSerially . maxBuffer (-1))]
+#endif
+ let asyncOps :: IsStream t => ((AsyncT IO a -> t IO a) -> Spec) -> Spec
+ asyncOps spec = mapOps spec $ makeOps asyncly
+#ifndef COVERAGE_BUILD
+ ++ [("maxBuffer (-1)", asyncly . maxBuffer (-1))]
+#endif
+ let wAsyncOps :: IsStream t => ((WAsyncT IO a -> t IO a) -> Spec) -> Spec
+ wAsyncOps spec = mapOps spec $ makeOps wAsyncly
+#ifndef COVERAGE_BUILD
+ ++ [("maxBuffer (-1)", wAsyncly . maxBuffer (-1))]
+#endif
+ let aheadOps :: IsStream t => ((AheadT IO a -> t IO a) -> Spec) -> Spec
+ aheadOps spec = mapOps spec $ makeOps aheadly
+#ifndef COVERAGE_BUILD
+ ++ [("maxBuffer (-1)", aheadly . maxBuffer (-1))]
+#endif
+ let parallelOps :: IsStream t => ((ParallelT IO a -> t IO a) -> Spec) -> Spec
+ parallelOps spec = mapOps spec $ makeOps parallely
+#ifndef COVERAGE_BUILD
+ ++ [("rate AvgRate 0.00000001", parallely . avgRate 0.00000001)]
+ ++ [("maxBuffer (-1)", parallely . maxBuffer (-1))]
+#endif
+ let zipSerialOps :: IsStream t => ((ZipSerialM IO a -> t IO a) -> Spec) -> Spec
+ zipSerialOps spec = mapOps spec $ makeOps zipSerially
+#ifndef COVERAGE_BUILD
+ ++ [("rate AvgRate 0.00000001", zipSerially . avgRate 0.00000001)]
+ ++ [("maxBuffer (-1)", zipSerially . maxBuffer (-1))]
+#endif
+ -- Note, the "pure" of applicative Zip streams generates and infinite
+ -- stream and therefore maxBuffer (-1) must not be used for that case.
+ let zipAsyncOps :: IsStream t => ((ZipAsyncM IO a -> t IO a) -> Spec) -> Spec
+ zipAsyncOps spec = mapOps spec $ makeOps zipAsyncly
+
describe "Construction" $ do
- prop "serially replicateM" $ constructWithReplicateM serially 0 0
- it "iterate" $
- (S.toList . serially . (S.take 100) $ (S.iterate (+ 1) (0 :: Int)))
- `shouldReturn` (take 100 $ iterate (+ 1) 0)
+ serialOps $ prop "serially replicateM" . constructWithReplicateM
+ wSerialOps $ prop "wSerially replicateM" . constructWithReplicateM
+ aheadOps $ prop "aheadly replicateM" . constructWithReplicateM
+ asyncOps $ prop "asyncly replicateM" . constructWithReplicateM
+ wAsyncOps $ prop "wAsyncly replicateM" . constructWithReplicateM
+ parallelOps $ prop "parallely replicateM" . constructWithReplicateM
-- XXX test for all types of streams
- it "iterateM" $ do
- let addM = (\ y -> return (y + 1))
- S.toList . serially . (S.take 100) $ S.iterateM addM (0 :: Int)
- `shouldReturn` (take 100 $ iterate (+ 1) 0)
- concurrentAll "Construction" constructionConcurrent
+ constructWithIterate serially
describe "Functor operations" $ do
- functorOps S.fromFoldable "serially" serially (==)
- functorOps folded "serially folded" serially (==)
- functorOps S.fromFoldable "wSerially" wSerially (==)
- functorOps folded "wSerially folded" wSerially (==)
- functorOps S.fromFoldable "aheadly" aheadly (==)
- functorOps folded "aheadly folded" aheadly (==)
- functorOps S.fromFoldable "asyncly" asyncly sortEq
- functorOps folded "asyncly folded" asyncly sortEq
- functorOps S.fromFoldable "wAsyncly" wAsyncly sortEq
- functorOps folded "wAsyncly folded" wAsyncly sortEq
- functorOps S.fromFoldable "parallely" parallely sortEq
- functorOps folded "parallely folded" parallely sortEq
- functorOps S.fromFoldable "zipSerially" zipSerially (==)
- functorOps folded "zipSerially folded" zipSerially (==)
- functorOps S.fromFoldable "zipAsyncly" zipAsyncly (==)
- functorOps folded "zipAsyncly folded" zipAsyncly (==)
+ serialOps $ functorOps S.fromFoldable "serially" (==)
+ serialOps $ functorOps folded "serially folded" (==)
+ wSerialOps $ functorOps S.fromFoldable "wSerially" (==)
+ wSerialOps $ functorOps folded "wSerially folded" (==)
+ aheadOps $ functorOps S.fromFoldable "aheadly" (==)
+ aheadOps $ functorOps folded "aheadly folded" (==)
+ asyncOps $ functorOps S.fromFoldable "asyncly" sortEq
+ asyncOps $ functorOps folded "asyncly folded" sortEq
+ wAsyncOps $ functorOps S.fromFoldable "wAsyncly" sortEq
+ wAsyncOps $ functorOps folded "wAsyncly folded" sortEq
+ parallelOps $ functorOps S.fromFoldable "parallely" sortEq
+ parallelOps $ functorOps folded "parallely folded" sortEq
+ zipSerialOps $ functorOps S.fromFoldable "zipSerially" (==)
+ zipSerialOps $ functorOps folded "zipSerially folded" (==)
+ zipAsyncOps $ functorOps S.fromFoldable "zipAsyncly" (==)
+ zipAsyncOps $ functorOps folded "zipAsyncly folded" (==)
describe "Semigroup operations" $ do
- semigroupOps "serially" serially (==)
- semigroupOps "wSerially" wSerially (==)
- semigroupOps "aheadly" aheadly (==)
- semigroupOps "asyncly" asyncly sortEq
- semigroupOps "wAsyncly" wAsyncly sortEq
- semigroupOps "parallely" parallely sortEq
- semigroupOps "zipSerially" zipSerially (==)
- semigroupOps "zipAsyncly" zipAsyncly (==)
+ serialOps $ semigroupOps "serially" (==)
+ wSerialOps $ semigroupOps "wSerially" (==)
+ aheadOps $ semigroupOps "aheadly" (==)
+ asyncOps $ semigroupOps "asyncly" sortEq
+ wAsyncOps $ semigroupOps "wAsyncly" sortEq
+ parallelOps $ semigroupOps "parallely" sortEq
+ zipSerialOps $ semigroupOps "zipSerially" (==)
+ zipAsyncOps $ semigroupOps "zipAsyncly" (==)
describe "Applicative operations" $ do
-- The tests using sorted equality are weaker tests
-- We need to have stronger unit tests for all those
-- XXX applicative with three arguments
- prop "serially applicative" $ applicativeOps S.fromFoldable serially (==)
- prop "serially applicative folded" $ applicativeOps folded serially (==)
- prop "aheadly applicative" $ applicativeOps S.fromFoldable aheadly (==)
- prop "aheadly applicative folded" $ applicativeOps folded aheadly (==)
- prop "wSerially applicative" $ applicativeOps S.fromFoldable wSerially sortEq
- prop "wSerially applicative folded" $ applicativeOps folded wSerially sortEq
- prop "asyncly applicative" $ applicativeOps S.fromFoldable asyncly sortEq
- prop "asyncly applicative folded" $ applicativeOps folded asyncly sortEq
- prop "wAsyncly applicative folded" $ applicativeOps folded wAsyncly sortEq
- prop "parallely applicative folded" $ applicativeOps folded parallely sortEq
+ serialOps $ prop "serially applicative" . applicativeOps S.fromFoldable (==)
+ serialOps $ prop "serially applicative folded" . applicativeOps folded (==)
+ wSerialOps $ prop "wSerially applicative" . applicativeOps S.fromFoldable sortEq
+ wSerialOps $ prop "wSerially applicative folded" . applicativeOps folded sortEq
+ aheadOps $ prop "aheadly applicative" . applicativeOps S.fromFoldable (==)
+ aheadOps $ prop "aheadly applicative folded" . applicativeOps folded (==)
+ asyncOps $ prop "asyncly applicative" . applicativeOps S.fromFoldable sortEq
+ asyncOps $ prop "asyncly applicative folded" . applicativeOps folded sortEq
+ wAsyncOps $ prop "wAsyncly applicative" . applicativeOps S.fromFoldable sortEq
+ wAsyncOps $ prop "wAsyncly applicative folded" . applicativeOps folded sortEq
+ parallelOps $ prop "parallely applicative folded" . applicativeOps folded sortEq
describe "Zip operations" $ do
- prop "zipSerially applicative" $ zipApplicative S.fromFoldable zipSerially (==)
- prop "zipSerially applicative folded" $ zipApplicative folded zipSerially (==)
- prop "zipAsyncly applicative" $ zipApplicative S.fromFoldable zipAsyncly (==)
- prop "zipAsyncly applicative folded" $ zipApplicative folded zipAsyncly (==)
-
- prop "zip monadic serially" $ zipMonadic S.fromFoldable serially (==)
- prop "zip monadic serially folded" $ zipMonadic folded serially (==)
- prop "zip monadic aheadly" $ zipMonadic S.fromFoldable aheadly (==)
- prop "zip monadic aheadly folded" $ zipMonadic folded aheadly (==)
- prop "zip monadic wSerially" $ zipMonadic S.fromFoldable wSerially (==)
- prop "zip monadic wSerially folded" $ zipMonadic folded wSerially (==)
- prop "zip monadic asyncly" $ zipMonadic S.fromFoldable asyncly (==)
- prop "zip monadic asyncly folded" $ zipMonadic folded asyncly (==)
- prop "zip monadic wAsyncly" $ zipMonadic S.fromFoldable wAsyncly (==)
- prop "zip monadic wAsyncly folded" $ zipMonadic folded wAsyncly (==)
- prop "zip monadic parallely" $ zipMonadic S.fromFoldable parallely (==)
- prop "zip monadic parallely folded" $ zipMonadic folded parallely (==)
+ zipSerialOps $ prop "zipSerially applicative" . zipApplicative S.fromFoldable (==)
+ zipSerialOps $ prop "zipSerially applicative folded" . zipApplicative folded (==)
+ zipAsyncOps $ prop "zipAsyncly applicative" . zipApplicative S.fromFoldable (==)
+ zipAsyncOps $ prop "zipAsyncly applicative folded" . zipApplicative folded (==)
+
+ -- We test only the serial zip with serial streams and the parallel
+ -- stream, because the rate setting in these streams can slow down
+ -- zipAsync.
+ serialOps $ prop "zip monadic serially" . zipMonadic S.fromFoldable (==)
+ serialOps $ prop "zip monadic serially folded" . zipMonadic folded (==)
+ wSerialOps $ prop "zip monadic wSerially" . zipMonadic S.fromFoldable (==)
+ wSerialOps $ prop "zip monadic wSerially folded" . zipMonadic folded (==)
+ aheadOps $ prop "zip monadic aheadly" . zipAsyncMonadic S.fromFoldable (==)
+ aheadOps $ prop "zip monadic aheadly folded" . zipAsyncMonadic folded (==)
+ asyncOps $ prop "zip monadic asyncly" . zipAsyncMonadic S.fromFoldable (==)
+ asyncOps $ prop "zip monadic asyncly folded" . zipAsyncMonadic folded (==)
+ wAsyncOps $ prop "zip monadic wAsyncly" . zipAsyncMonadic S.fromFoldable (==)
+ wAsyncOps $ prop "zip monadic wAsyncly folded" . zipAsyncMonadic folded (==)
+ parallelOps $ prop "zip monadic parallely" . zipMonadic S.fromFoldable (==)
+ parallelOps $ prop "zip monadic parallely folded" . zipMonadic folded (==)
describe "Monad operations" $ do
- prop "serially monad then" $ monadThen S.fromFoldable serially (==)
- prop "aheadly monad then" $ monadThen S.fromFoldable aheadly (==)
- prop "wSerially monad then" $ monadThen S.fromFoldable wSerially sortEq
- prop "asyncly monad then" $ monadThen S.fromFoldable asyncly sortEq
- prop "wAsyncly monad then" $ monadThen S.fromFoldable wAsyncly sortEq
- prop "parallely monad then" $ monadThen S.fromFoldable parallely sortEq
-
- prop "serially monad then folded" $ monadThen folded serially (==)
- prop "aheadly monad then folded" $ monadThen folded aheadly (==)
- prop "wSerially monad then folded" $ monadThen folded wSerially sortEq
- prop "asyncly monad then folded" $ monadThen folded asyncly sortEq
- prop "wAsyncly monad then folded" $ monadThen folded wAsyncly sortEq
- prop "parallely monad then folded" $ monadThen folded parallely sortEq
-
- prop "serially monad bind" $ monadBind S.fromFoldable serially (==)
- prop "aheadly monad bind" $ monadBind S.fromFoldable aheadly (==)
- prop "wSerially monad bind" $ monadBind S.fromFoldable wSerially sortEq
- prop "asyncly monad bind" $ monadBind S.fromFoldable asyncly sortEq
- prop "wAsyncly monad bind" $ monadBind S.fromFoldable wAsyncly sortEq
- prop "parallely monad bind" $ monadBind S.fromFoldable parallely sortEq
+ serialOps $ prop "serially monad then" . monadThen S.fromFoldable (==)
+ wSerialOps $ prop "wSerially monad then" . monadThen S.fromFoldable sortEq
+ aheadOps $ prop "aheadly monad then" . monadThen S.fromFoldable (==)
+ asyncOps $ prop "asyncly monad then" . monadThen S.fromFoldable sortEq
+ wAsyncOps $ prop "wAsyncly monad then" . monadThen S.fromFoldable sortEq
+ parallelOps $ prop "parallely monad then" . monadThen S.fromFoldable sortEq
+
+ serialOps $ prop "serially monad then folded" . monadThen folded (==)
+ wSerialOps $ prop "wSerially monad then folded" . monadThen folded sortEq
+ aheadOps $ prop "aheadly monad then folded" . monadThen folded (==)
+ asyncOps $ prop "asyncly monad then folded" . monadThen folded sortEq
+ wAsyncOps $ prop "wAsyncly monad then folded" . monadThen folded sortEq
+ parallelOps $ prop "parallely monad then folded" . monadThen folded sortEq
+
+ serialOps $ prop "serially monad bind" . monadBind S.fromFoldable (==)
+ wSerialOps $ prop "wSerially monad bind" . monadBind S.fromFoldable sortEq
+ aheadOps $ prop "aheadly monad bind" . monadBind S.fromFoldable (==)
+ asyncOps $ prop "asyncly monad bind" . monadBind S.fromFoldable sortEq
+ wAsyncOps $ prop "wAsyncly monad bind" . monadBind S.fromFoldable sortEq
+ parallelOps $ prop "parallely monad bind" . monadBind S.fromFoldable sortEq
+
+ serialOps $ prop "serially monad bind folded" . monadBind folded (==)
+ wSerialOps $ prop "wSerially monad bind folded" . monadBind folded sortEq
+ aheadOps $ prop "aheadly monad bind folded" . monadBind folded (==)
+ asyncOps $ prop "asyncly monad bind folded" . monadBind folded sortEq
+ wAsyncOps $ prop "wAsyncly monad bind folded" . monadBind folded sortEq
+ parallelOps $ prop "parallely monad bind folded" . monadBind folded sortEq
describe "Stream transform operations" $ do
- transformOps S.fromFoldable "serially" serially (==)
- transformOps S.fromFoldable "aheadly" aheadly (==)
- transformOps S.fromFoldable "wSerially" wSerially (==)
- transformOps S.fromFoldable "zipSerially" zipSerially (==)
- transformOps S.fromFoldable "zipAsyncly" zipAsyncly (==)
- transformOps S.fromFoldable "asyncly" asyncly sortEq
- transformOps S.fromFoldable "wAsyncly" wAsyncly sortEq
- transformOps S.fromFoldable "parallely" parallely sortEq
-
- transformOps folded "serially folded" serially (==)
- transformOps folded "aheadly folded" aheadly (==)
- transformOps folded "wSerially folded" wSerially (==)
- transformOps folded "zipSerially folded" zipSerially (==)
- transformOps folded "zipAsyncly folded" zipAsyncly (==)
- transformOps folded "asyncly folded" asyncly sortEq
- transformOps folded "wAsyncly folded" wAsyncly sortEq
- transformOps folded "parallely folded" parallely sortEq
-
- transformOpsWord8 S.fromFoldable "serially" serially
- transformOpsWord8 S.fromFoldable "aheadly" aheadly
- transformOpsWord8 S.fromFoldable "wSerially" wSerially
- transformOpsWord8 S.fromFoldable "zipSerially" zipSerially
- transformOpsWord8 S.fromFoldable "zipAsyncly" zipAsyncly
- transformOpsWord8 S.fromFoldable "asyncly" asyncly
- transformOpsWord8 S.fromFoldable "wAsyncly" wAsyncly
- transformOpsWord8 S.fromFoldable "parallely" parallely
-
- transformOpsWord8 folded "serially folded" serially
- transformOpsWord8 folded "aheadly folded" aheadly
- transformOpsWord8 folded "wSerially folded" wSerially
- transformOpsWord8 folded "zipSerially folded" zipSerially
- transformOpsWord8 folded "zipAsyncly folded" zipAsyncly
- transformOpsWord8 folded "asyncly folded" asyncly
- transformOpsWord8 folded "wAsyncly folded" wAsyncly
- transformOpsWord8 folded "parallely folded" parallely
-
- -- XXX add tests with outputQueue size set to 1
+ serialOps $ transformOps S.fromFoldable "serially" (==)
+ wSerialOps $ transformOps S.fromFoldable "wSerially" (==)
+ aheadOps $ transformOps S.fromFoldable "aheadly" (==)
+ asyncOps $ transformOps S.fromFoldable "asyncly" sortEq
+ wAsyncOps $ transformOps S.fromFoldable "wAsyncly" sortEq
+ parallelOps $ transformOps S.fromFoldable "parallely" sortEq
+ zipSerialOps $ transformOps S.fromFoldable "zipSerially" (==)
+ zipAsyncOps $ transformOps S.fromFoldable "zipAsyncly" (==)
+
+ serialOps $ transformOps folded "serially folded" (==)
+ wSerialOps $ transformOps folded "wSerially folded" (==)
+ aheadOps $ transformOps folded "aheadly folded" (==)
+ asyncOps $ transformOps folded "asyncly folded" sortEq
+ wAsyncOps $ transformOps folded "wAsyncly folded" sortEq
+ parallelOps $ transformOps folded "parallely folded" sortEq
+ zipSerialOps $ transformOps folded "zipSerially folded" (==)
+ zipAsyncOps $ transformOps folded "zipAsyncly folded" (==)
+
+ serialOps $ transformOpsWord8 S.fromFoldable "serially"
+ wSerialOps $ transformOpsWord8 S.fromFoldable "wSerially"
+ aheadOps $ transformOpsWord8 S.fromFoldable "aheadly"
+ asyncOps $ transformOpsWord8 S.fromFoldable "asyncly"
+ wAsyncOps $ transformOpsWord8 S.fromFoldable "wAsyncly"
+ parallelOps $ transformOpsWord8 S.fromFoldable "parallely"
+ zipSerialOps $ transformOpsWord8 S.fromFoldable "zipSerially"
+ zipAsyncOps $ transformOpsWord8 S.fromFoldable "zipAsyncly"
+
+ serialOps $ transformOpsWord8 folded "serially folded"
+ wSerialOps $ transformOpsWord8 folded "wSerially folded"
+ aheadOps $ transformOpsWord8 folded "aheadly folded"
+ asyncOps $ transformOpsWord8 folded "asyncly folded"
+ wAsyncOps $ transformOpsWord8 folded "wAsyncly folded"
+ parallelOps $ transformOpsWord8 folded "parallely folded"
+ zipSerialOps $ transformOpsWord8 folded "zipSerially folded"
+ zipAsyncOps $ transformOpsWord8 folded "zipAsyncly folded"
+
+ -- These tests won't work with maxBuffer or maxThreads set to 1, so we
+ -- exclude those cases from these.
+ let mkOps t =
+ [ ("default", t)
+#ifndef COVERAGE_BUILD
+ , ("rate Nothing", t . rate Nothing)
+ , ("maxBuffer 0", t . maxBuffer 0)
+ , ("maxThreads 0", t . maxThreads 0)
+ , ("maxThreads 0", t . maxThreads (-1))
+#endif
+ ]
+
+ let forOps ops spec = forM_ ops (\(desc, f) -> describe desc $ spec f)
describe "Stream concurrent operations" $ do
- concurrentOps S.fromFoldable "aheadly" aheadly (==)
- concurrentOps S.fromFoldable "asyncly" asyncly sortEq
- concurrentOps S.fromFoldable "wAsyncly" wAsyncly sortEq
- concurrentOps S.fromFoldable "parallely" parallely sortEq
-
- concurrentOps folded "aheadly folded" aheadly (==)
- concurrentOps folded "asyncly folded" asyncly sortEq
- concurrentOps folded "wAsyncly folded" wAsyncly sortEq
- concurrentOps folded "parallely folded" parallely sortEq
-
- prop "concurrent application" $ withMaxSuccess maxTestCount $
- concurrentApplication
+ forOps (mkOps aheadly) $ concurrentOps S.fromFoldable "aheadly" (==)
+ forOps (mkOps asyncly) $ concurrentOps S.fromFoldable "asyncly" sortEq
+ forOps (mkOps wAsyncly) $ concurrentOps S.fromFoldable "wAsyncly" sortEq
+ forOps (mkOps parallely) $ concurrentOps S.fromFoldable "parallely" sortEq
+
+ forOps (mkOps aheadly) $ concurrentOps folded "aheadly folded" (==)
+ forOps (mkOps asyncly) $ concurrentOps folded "asyncly folded" sortEq
+ forOps (mkOps wAsyncly) $ concurrentOps folded "wAsyncly folded" sortEq
+ forOps (mkOps parallely) $ concurrentOps folded "parallely folded" sortEq
+
+ describe "Concurrent application" $ do
+ serialOps $ prop "serial" . concurrentApplication (==)
+ asyncOps $ prop "async" . concurrentApplication sortEq
+ aheadOps $ prop "ahead" . concurrentApplication (==)
+ parallelOps $ prop "parallel" . concurrentApplication sortEq
+
prop "concurrent foldr application" $ withMaxSuccess maxTestCount $
concurrentFoldrApplication
prop "concurrent foldl application" $ withMaxSuccess maxTestCount $
concurrentFoldlApplication
-- These tests are specifically targeted towards detecting illegal sharing
- -- of SVar across conurrent streams.
+ -- of SVar across conurrent streams. All transform ops must be added here.
describe "Stream transform and combine operations" $ do
- transformCombineOpsCommon S.fromFoldable "serially" serially (==)
- transformCombineOpsCommon S.fromFoldable "aheadly" aheadly (==)
- transformCombineOpsCommon S.fromFoldable "wSerially" wSerially sortEq
- transformCombineOpsCommon S.fromFoldable "zipSerially" zipSerially (==)
- transformCombineOpsCommon S.fromFoldable "zipAsyncly" zipAsyncly (==)
- transformCombineOpsCommon S.fromFoldable "asyncly" asyncly sortEq
- transformCombineOpsCommon S.fromFoldable "wAsyncly" wAsyncly sortEq
- transformCombineOpsCommon S.fromFoldable "parallely" parallely sortEq
-
- transformCombineOpsCommon folded "serially" serially (==)
- transformCombineOpsCommon folded "aheadly" aheadly (==)
- transformCombineOpsCommon folded "wSerially" wSerially sortEq
- transformCombineOpsCommon folded "zipSerially" zipSerially (==)
- transformCombineOpsCommon folded "zipAsyncly" zipAsyncly (==)
- transformCombineOpsCommon folded "asyncly" asyncly sortEq
- transformCombineOpsCommon folded "wAsyncly" wAsyncly sortEq
- transformCombineOpsCommon folded "parallely" parallely sortEq
-
- transformCombineOpsOrdered S.fromFoldable "serially" serially (==)
- transformCombineOpsOrdered S.fromFoldable "serially" aheadly (==)
- transformCombineOpsOrdered S.fromFoldable "zipSerially" zipSerially (==)
- transformCombineOpsOrdered S.fromFoldable "zipAsyncly" zipAsyncly (==)
+ serialOps $ transformCombineOpsCommon S.fromFoldable "serially" (==)
+ wSerialOps $ transformCombineOpsCommon S.fromFoldable "wSerially" sortEq
+ aheadOps $ transformCombineOpsCommon S.fromFoldable "aheadly" (==)
+ asyncOps $ transformCombineOpsCommon S.fromFoldable "asyncly" sortEq
+ wAsyncOps $ transformCombineOpsCommon S.fromFoldable "wAsyncly" sortEq
+ parallelOps $ transformCombineOpsCommon S.fromFoldable "parallely" sortEq
+ zipSerialOps $ transformCombineOpsCommon S.fromFoldable "zipSerially" (==)
+ zipAsyncOps $ transformCombineOpsCommon S.fromFoldable "zipAsyncly" (==)
+
+ serialOps $ transformCombineOpsCommon folded "serially" (==)
+ wSerialOps $ transformCombineOpsCommon folded "wSerially" sortEq
+ aheadOps $ transformCombineOpsCommon folded "aheadly" (==)
+ asyncOps $ transformCombineOpsCommon folded "asyncly" sortEq
+ wAsyncOps $ transformCombineOpsCommon folded "wAsyncly" sortEq
+ parallelOps $ transformCombineOpsCommon folded "parallely" sortEq
+ zipSerialOps $ transformCombineOpsCommon folded "zipSerially" (==)
+ zipAsyncOps $ transformCombineOpsCommon folded "zipAsyncly" (==)
+
+ serialOps $ transformCombineOpsOrdered S.fromFoldable "serially" (==)
+ aheadOps $ transformCombineOpsOrdered S.fromFoldable "aheadly" (==)
+ zipSerialOps $ transformCombineOpsOrdered S.fromFoldable "zipSerially" (==)
+ zipAsyncOps $ transformCombineOpsOrdered S.fromFoldable "zipAsyncly" (==)
+
+ serialOps $ transformCombineOpsOrdered folded "serially" (==)
+ aheadOps $ transformCombineOpsOrdered folded "aheadly" (==)
+ zipSerialOps $ transformCombineOpsOrdered folded "zipSerially" (==)
+ zipAsyncOps $ transformCombineOpsOrdered folded "zipAsyncly" (==)
describe "Stream elimination operations" $ do
- eliminationOps S.fromFoldable "serially" serially
- eliminationOps S.fromFoldable "aheadly" aheadly
- eliminationOps S.fromFoldable "wSerially" wSerially
- eliminationOps S.fromFoldable "zipSerially" zipSerially
- eliminationOps S.fromFoldable "zipAsyncly" zipAsyncly
- eliminationOps S.fromFoldable "asyncly" asyncly
- eliminationOps S.fromFoldable "wAsyncly" wAsyncly
- eliminationOps S.fromFoldable "parallely" parallely
-
- eliminationOps folded "serially folded" serially
- eliminationOps folded "aheadly folded" aheadly
- eliminationOps folded "wSerially folded" wSerially
- eliminationOps folded "zipSerially folded" zipSerially
- eliminationOps folded "zipAsyncly folded" zipAsyncly
- eliminationOps folded "asyncly folded" asyncly
- eliminationOps folded "wAsyncly folded" wAsyncly
- eliminationOps folded "parallely folded" parallely
+ serialOps $ eliminationOps S.fromFoldable "serially"
+ wSerialOps $ eliminationOps S.fromFoldable "wSerially"
+ aheadOps $ eliminationOps S.fromFoldable "aheadly"
+ asyncOps $ eliminationOps S.fromFoldable "asyncly"
+ wAsyncOps $ eliminationOps S.fromFoldable "wAsyncly"
+ parallelOps $ eliminationOps S.fromFoldable "parallely"
+ zipSerialOps $ eliminationOps S.fromFoldable "zipSerially"
+ zipAsyncOps $ eliminationOps S.fromFoldable "zipAsyncly"
+
+ serialOps $ eliminationOps folded "serially folded"
+ wSerialOps $ eliminationOps folded "wSerially folded"
+ aheadOps $ eliminationOps folded "aheadly folded"
+ asyncOps $ eliminationOps folded "asyncly folded"
+ wAsyncOps $ eliminationOps folded "wAsyncly folded"
+ parallelOps $ eliminationOps folded "parallely folded"
+ zipSerialOps $ eliminationOps folded "zipSerially folded"
+ zipAsyncOps $ eliminationOps folded "zipAsyncly folded"
-- XXX Add a test where we chain all transformation APIs and make sure that
-- the state is being passed through all of them.
describe "Stream serial elimination operations" $ do
- serialEliminationOps S.fromFoldable "serially" serially
- serialEliminationOps S.fromFoldable "aheadly" aheadly
- serialEliminationOps S.fromFoldable "wSerially" wSerially
- serialEliminationOps S.fromFoldable "zipSerially" zipSerially
- serialEliminationOps S.fromFoldable "zipAsyncly" zipAsyncly
-
- serialEliminationOps folded "serially folded" serially
- serialEliminationOps folded "aheadly folded" aheadly
- serialEliminationOps folded "wSerially folded" wSerially
- serialEliminationOps folded "zipSerially folded" zipSerially
- serialEliminationOps folded "zipAsyncly folded" zipAsyncly
+ serialOps $ serialEliminationOps S.fromFoldable "serially"
+ wSerialOps $ serialEliminationOps S.fromFoldable "wSerially"
+ aheadOps $ serialEliminationOps S.fromFoldable "aheadly"
+ zipSerialOps $ serialEliminationOps S.fromFoldable "zipSerially"
+ zipAsyncOps $ serialEliminationOps S.fromFoldable "zipAsyncly"
+
+ serialOps $ serialEliminationOps folded "serially folded"
+ wSerialOps $ serialEliminationOps folded "wSerially folded"
+ aheadOps $ serialEliminationOps folded "aheadly folded"
+ zipSerialOps $ serialEliminationOps folded "zipSerially folded"
+ zipAsyncOps $ serialEliminationOps folded "zipAsyncly folded"