summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharendra <>2018-06-19 17:52:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-06-19 17:52:00 (GMT)
commitc4c86471690da861df0215567cf6da5b29b2a59c (patch)
tree32abb7b43d38a1bec66e41e2830d9b3fa18b9586
parentd5e7e088186f1857539128e34749d0c89d00e13a (diff)
version 0.3.00.3.0
-rw-r--r--Changelog.md28
-rw-r--r--README.md126
-rwxr-xr-xbench.sh1
-rw-r--r--benchmark/ChartLinear.hs23
-rw-r--r--benchmark/Linear.hs142
-rw-r--r--benchmark/LinearOps.hs76
-rw-r--r--benchmark/Nested.hs43
-rw-r--r--benchmark/NestedOps.hs85
-rw-r--r--examples/ListDir.hs8
-rw-r--r--examples/SearchQuery.hs17
-rw-r--r--src/Streamly.hs61
-rw-r--r--src/Streamly/Core.hs1141
-rw-r--r--src/Streamly/Prelude.hs217
-rw-r--r--src/Streamly/Streams.hs416
-rw-r--r--src/Streamly/Tutorial.hs404
-rw-r--r--stack-7.10.yaml1
-rw-r--r--stack-8.0.yaml16
-rw-r--r--stack.yaml2
-rw-r--r--streamly.cabal103
-rw-r--r--test/Main.hs100
-rw-r--r--test/Prop.hs305
21 files changed, 2611 insertions, 704 deletions
diff --git a/Changelog.md b/Changelog.md
index 932ab83..7ed99af 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -1,3 +1,31 @@
+## 0.3.0
+
+### Breaking changes
+
+* Some prelude functions, to whom concurrency capability has been added, will
+ now require a `MonadAsync` constraint.
+
+### Bug Fixes
+
+* Fixed a race due to which, in a rare case, we might block indefinitely on
+ an MVar due to a lost wakeup.
+* Fixed an issue in adaptive concurrency. The issue caused us to stop creating
+ more worker threads in some cases due to a race. This bug would not cause any
+ functional issue but may reduce concurrency in some cases.
+
+### Enhancements
+* Added a concurrent lookahead stream type `Ahead`
+* Added `fromFoldableM` API that creates a stream from a container of monadic
+ actions
+* Monadic stream generation functions `consM`, `|:`, `unfoldrM`, `replicateM`,
+ `repeatM`, `iterateM` and `fromFoldableM` can now generate streams
+ concurrently when used with concurrent stream types.
+* Monad transformation functions `mapM` and `sequence` can now map actions
+ concurrently when used at appropriate stream types.
+* Added concurrent function application operators to run stages of a
+ stream processing function application pipeline concurrently.
+* Added `mapMaybe` and `mapMaybeM`.
+
## 0.2.1
### Bug Fixes
diff --git a/README.md b/README.md
index 867a43d..2cfba90 100644
--- a/README.md
+++ b/README.md
@@ -2,56 +2,55 @@
## Stream`ing` `Concurrent`ly
-Streamly, short for streaming concurrently, is a simple yet powerful streaming
-library with concurrent merging and concurrent nested looping support. A stream
-is just like a list except that it is a list of monadic actions rather than
-pure values. Streamly streams can be generated, consumed, combined, or
-transformed serially or concurrently. We can loop over a stream serially or
-concurrently. We can also have serial or concurrent nesting of loops. For
-those familiar with the list transformer concept streamly is a concurrent list
-transformer. Streamly uses standard composition abstractions. Concurrent
-composition is just the same as serial composition except that we use a simple
-combinator to request a concurrent composition instead of serial. The
-programmer does not have to be aware of threads, locking or synchronization to
-write scalable concurrent programs.
-
-Streamly provides functionality that is equivalent to streaming libraries
-like [pipes](https://hackage.haskell.org/package/pipes) and
-[conduit](https://hackage.haskell.org/package/conduit) but with a list like
-API. The streaming API of streamly is close to the monadic streams API of the
-[vector](https://hackage.haskell.org/package/vector) package and similar in
-concept to the [streaming](https://hackage.haskell.org/package/streaming)
-package. In addition to providing streaming functionality, streamly subsumes
+Streamly, short for streaming concurrently, provides monadic streams, with a
+simple API, almost identical to standard lists, and an in-built support for
+concurrency. By using stream-style combinators on stream composition,
+streams can be generated, merged, chained, mapped, zipped, and consumed
+concurrently – providing a generalized high level programming framework
+unifying streaming and concurrency. Controlled concurrency allows even infinite
+streams to be evaluated concurrently. Concurrency is auto scaled based on
+feedback from the stream consumer. The programmer does not have to be aware of
+threads, locking or synchronization to write scalable concurrent programs.
+
+The basic streaming functionality of streamly is equivalent to that provided by
+streaming libraries like
+[vector](https://hackage.haskell.org/package/vector),
+[streaming](https://hackage.haskell.org/package/streaming),
+[pipes](https://hackage.haskell.org/package/pipes), and
+[conduit](https://hackage.haskell.org/package/conduit).
+In addition to providing streaming functionality, streamly subsumes
the functionality of list transformer libraries like `pipes` or
-[list-t](https://hackage.haskell.org/package/list-t) and also the logic
+[list-t](https://hackage.haskell.org/package/list-t), and also the logic
programming library [logict](https://hackage.haskell.org/package/logict). On
the concurrency side, it subsumes the functionality of the
[async](https://hackage.haskell.org/package/async) package. Because it supports
streaming with concurrency we can write FRP applications similar in concept to
[Yampa](https://hackage.haskell.org/package/Yampa) or
-[reflex](https://hackage.haskell.org/package/reflex). To understand the
-streaming library ecosystem and where streamly fits in you may want to read
-[streaming libraries](https://github.com/composewell/streaming-benchmarks#streaming-libraries)
-as well. Also see the [Comparison with Existing
-Packages](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
-section in the streamly tutorial.
+[reflex](https://hackage.haskell.org/package/reflex).
Why use streamly?
- * Simple list like streaming API, if you know how to use lists then you know
- how to use streamly.
- * Powerful yet simple and scalable concurrency. Concurrency is not intrusive,
- concurrent programs are written exactly the same way as non-concurrent
- ones. There is no other package that provides such high level, simple and
- flexible concurrency support.
- * It is a general programming framework providing you all the necessary tools
- to solve a wide range of programming problems, unifying the functionality
- provided by several disparate packages in a concise and simple API.
- * Best in class performance. See
+ * _Simplicity_: Simple list like streaming API, if you know how to use lists
+ then you know how to use streamly. This library is built with simplicity
+ and ease of use as a design goal.
+ * _Concurrency_: Simple, powerful, and scalable concurrency. Concurrency is
+ built-in, and not intrusive, concurrent programs are written exactly the
+ same way as non-concurrent ones.
+ * _Generality_: Unifies functionality provided by several disparate packages
+ (streaming, concurrency, list transformer, logic programming, reactive
+ programming) in a concise API.
+ * _Performance_: Streamly is designed for high performance. See
[streaming-benchmarks](https://github.com/composewell/streaming-benchmarks)
for a comparison of popular streaming libraries on micro-benchmarks.
- For more information, see:
+For more details on streaming library ecosystem and where streamly fits in,
+please see
+[streaming libraries](https://github.com/composewell/streaming-benchmarks#streaming-libraries).
+Also, see the [Comparison with Existing
+Packages](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
+section in the streamly tutorial.
+
+For more information on streamly, see:
* [Streamly.Tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html) module in the haddock documentation for a detailed introduction
* [examples](https://github.com/composewell/streamly/tree/master/examples) directory in the package for some simple practical examples
@@ -83,6 +82,51 @@ main = runStream $
& S.mapM print
```
+## Concurrent Stream Generation
+
+Monadic construction and generation functions e.g. `consM`, `unfoldrM`,
+`replicateM`, `repeatM`, `iterateM` and `fromFoldableM` etc. work concurrently
+when used with appropriate stream type combinator.
+
+The following code finishes in 3 seconds (6 seconds when serial):
+
+```
+> let p n = threadDelay (n * 1000000) >> return n
+> S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil
+[3,2,1]
+
+> S.toList $ parallely $ p 3 |: p 2 |: p 1 |: S.nil
+[1,2,3]
+```
+
+The following finishes in 10 seconds (100 seconds when serial):
+
+```
+runStream $ asyncly $ S.replicateM 10 $ p 10
+```
+
+## Concurrent Streaming Pipelines
+
+Use `|&` or `|$` to apply stream processing functions concurrently. In the
+following example "hello" is printed every second, if you use `&` instead of
+`|&` you will see that the delay doubles to 2 seconds instead because of serial
+application.
+
+```
+main = runStream $
+ S.repeatM (threadDelay 1000000 >> return "hello")
+ |& S.mapM (\x -> threadDelay 1000000 >> putStrLn x)
+```
+
+## Mapping Concurrently
+
+We can use `mapM` or `sequence` concurrently on a stream.
+
+```
+> let p n = threadDelay (n * 1000000) >> return n
+> runStream $ aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ repeatM (p 1))
+```
+
## Serial and Concurrent Merging
Semigroup and Monoid instances can be used to fold streams serially or
@@ -214,7 +258,7 @@ import Path.IO (listDir, getCurrentDir)
import Streamly
import qualified Streamly.Prelude as S
-main = runStream $ asyncly $ getCurrentDir >>= readdir
+main = runStream $ aheadly $ getCurrentDir >>= readdir
where readdir d = do
(dirs, files) <- S.once $ listDir d
S.once $ mapM_ putStrLn $ map show files
@@ -224,8 +268,8 @@ main = runStream $ asyncly $ getCurrentDir >>= readdir
In the above examples we do not think in terms of threads, locking or
synchronization, rather we think in terms of what can run in parallel, the rest
-is taken care of automatically. When using `asyncly` the programmer does
-not have to worry about how many threads are to be created they are
+is taken care of automatically. When using `aheadly` the programmer does
+not have to worry about how many threads are to be created, they are
automatically adjusted based on the demand of the consumer.
The concurrency facilities provided by streamly can be compared with
diff --git a/bench.sh b/bench.sh
index 0162078..81d0714 100755
--- a/bench.sh
+++ b/bench.sh
@@ -63,6 +63,7 @@ if test "$GRAPH" != "0"
then
CHART_PROG="chart-$BENCHMARK"
prog=$($STACK exec which $CHART_PROG)
+ hash -r
if test ! -x "$prog"
then
echo "Building charting executable"
diff --git a/benchmark/ChartLinear.hs b/benchmark/ChartLinear.hs
index b4d8cbb..15676e2 100644
--- a/benchmark/ChartLinear.hs
+++ b/benchmark/ChartLinear.hs
@@ -6,7 +6,7 @@ module Main where
import Data.List
import Data.List.Split
import BenchGraph (bgraph, defaultConfig, Config(..), ComparisonStyle(..))
-import Control.Exception (handle, catch, SomeException, ErrorCall)
+import Control.Exception (handle, catch, SomeException, ErrorCall(..))
main :: IO ()
main = do
@@ -15,8 +15,8 @@ main = do
, comparisonStyle = CompareDelta
}
- ignoringErr a = catch a (\(_ :: ErrorCall) ->
- putStrLn "Failed. Skipping.")
+ ignoringErr a = catch a (\(ErrorCall err :: ErrorCall) ->
+ putStrLn $ "Failed with error:\n" ++ err ++ "\nSkipping.")
-- bgraph <input> <output> <field in csv file to be plotted>
-- other interesting fields to plot are:
-- allocated
@@ -26,18 +26,29 @@ main = do
ignoringErr $ bgraph "charts/results.csv" "operations" "time" $ cfg
{ chartTitle = Just "Streamly operations (time)"
, classifyBenchmark = \b ->
- if "compose" `isPrefixOf` b || "/concat" `isSuffixOf` b
+ if (not $ "serially/" `isPrefixOf` b)
+ || "/generation" `isInfixOf` b
+ || "/compose" `isInfixOf` b
+ || "/concat" `isSuffixOf` b
then Nothing
else Just ("Streamly", last $ splitOn "/" b)
}
+ ignoringErr $ bgraph "charts/results.csv" "generation" "time" $ cfg
+ { chartTitle = Just "Stream generation (time)"
+ , classifyBenchmark = \b ->
+ if "serially/generation" `isPrefixOf` b
+ then Just ("Streamly", last $ splitOn "/" b)
+ else Nothing
+ }
+
ignoringErr $ bgraph "charts/results.csv" "composition" "time" $ cfg
{ chartTitle = Just "Streamly composition performance (time)"
- , classifyBenchmark = fmap ("Streamly",) . stripPrefix "compose/"
+ , classifyBenchmark = fmap ("Streamly",) . stripPrefix "serially/compose/"
}
ignoringErr $ bgraph "charts/results.csv" "composition-scaling" "time"
$ cfg
{ chartTitle = Just "Streamly composition scaling (time)"
- , classifyBenchmark = fmap ("Streamly",) . stripPrefix "compose-"
+ , classifyBenchmark = fmap ("Streamly",) . stripPrefix "serially/compose-"
}
diff --git a/benchmark/Linear.hs b/benchmark/Linear.hs
index 9e6606d..75a4c71 100644
--- a/benchmark/Linear.hs
+++ b/benchmark/Linear.hs
@@ -6,7 +6,7 @@
-- Maintainer : harendra.kumar@gmail.com
import Control.DeepSeq (NFData)
-import Data.Functor.Identity (Identity, runIdentity)
+-- import Data.Functor.Identity (Identity, runIdentity)
import System.Random (randomRIO)
import qualified LinearOps as Ops
@@ -15,58 +15,108 @@ import Gauge
-- We need a monadic bind here to make sure that the function f does not get
-- completely optimized out by the compiler in some cases.
-benchIO :: (NFData b) => String -> (Ops.Stream m Int -> IO b) -> Benchmark
+benchIO :: (IsStream t, NFData b) => String -> (t IO Int -> IO b) -> Benchmark
benchIO name f = bench name $ nfIO $ randomRIO (1,1000) >>= f . Ops.source
-benchIOAppend :: (NFData b) => String -> (Int -> IO b) -> Benchmark
-benchIOAppend name f = bench name $ nfIO $ randomRIO (1,1000) >>= f
+benchSrcIO
+ :: (t IO Int -> SerialT IO Int)
+ -> String
+ -> (Int -> t IO Int)
+ -> Benchmark
+benchSrcIO t name f
+ = bench name $ nfIO $ randomRIO (1,1000) >>= Ops.toNull t . f
-_benchId :: (NFData b) => String -> (Ops.Stream m Int -> Identity b) -> Benchmark
+{-
+_benchId :: NFData b => String -> (Ops.Stream m Int -> Identity b) -> Benchmark
_benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
+-}
main :: IO ()
main = do
defaultMain
- [ bgroup "elimination"
- [ benchIO "toNull" Ops.toNull
- , benchIO "toList" Ops.toList
- , benchIO "fold" Ops.foldl
- , benchIO "last" Ops.last
+ [ bgroup "serially"
+ [ bgroup "generation"
+ [ benchSrcIO serially "unfoldr" $ Ops.sourceUnfoldr
+ , benchSrcIO serially "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO serially "fromFoldable" Ops.sourceFromFoldable
+ , benchSrcIO serially "fromFoldableM" Ops.sourceFromFoldableM
+ , benchSrcIO serially "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO serially "foldMapWithM" Ops.sourceFoldMapWithM
+ ]
+ , bgroup "elimination"
+ [ benchIO "toList" Ops.toList
+ , benchIO "fold" Ops.foldl
+ , benchIO "last" Ops.last
+ ]
+ , bgroup "transformation"
+ [ benchIO "scan" Ops.scan
+ , benchIO "map" Ops.map
+ , benchIO "mapM" (Ops.mapM serially)
+ , benchIO "concat" Ops.concat
+ ]
+ , bgroup "filtering"
+ [ benchIO "filter-even" Ops.filterEven
+ , benchIO "filter-all-out" Ops.filterAllOut
+ , benchIO "filter-all-in" Ops.filterAllIn
+ , benchIO "take-all" Ops.takeAll
+ , benchIO "takeWhile-true" Ops.takeWhileTrue
+ , benchIO "drop-all" Ops.dropAll
+ , benchIO "dropWhile-true" Ops.dropWhileTrue
+ ]
+ , benchIO "zip" $ Ops.zip
+ , bgroup "compose"
+ [ benchIO "mapM" Ops.composeMapM
+ , benchIO "map-with-all-in-filter" Ops.composeMapAllInFilter
+ , benchIO "all-in-filters" Ops.composeAllInFilters
+ , benchIO "all-out-filters" Ops.composeAllOutFilters
+ ]
+ -- Scaling with same operation in sequence
+ , bgroup "compose-scaling"
+ [ benchIO "1" $ Ops.composeScaling 1
+ , benchIO "2" $ Ops.composeScaling 2
+ , benchIO "3" $ Ops.composeScaling 3
+ , benchIO "4" $ Ops.composeScaling 4
+ ]
]
- , bgroup "transformation"
- [ benchIO "scan" Ops.scan
- , benchIO "map" Ops.map
- , benchIO "mapM" Ops.mapM
- , benchIO "concat" Ops.concat
+ , bgroup "asyncly"
+ [ -- benchIO "unfoldr" $ Ops.toNull asyncly
+ -- , benchSrcIO asyncly "fromFoldable" Ops.sourceFromFoldable
+ benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO asyncly "fromFoldableM" Ops.sourceFromFoldableM
+ , benchSrcIO asyncly "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO asyncly "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM asyncly
+ ]
+ , bgroup "wAsyncly"
+ [ -- benchIO "unfoldr" $ Ops.toNull wAsyncly
+ -- , benchSrcIO wAsyncly "fromFoldable" Ops.sourceFromFoldable
+ benchSrcIO wAsyncly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO wAsyncly "fromFoldableM" Ops.sourceFromFoldableM
+ , benchSrcIO wAsyncly "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO wAsyncly "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM wAsyncly
+ ]
+ -- unfoldr and fromFoldable are always serial and thereofore the same for
+ -- all stream types.
+ , bgroup "aheadly"
+ [ -- benchIO "unfoldr" $ Ops.toNull aheadly
+ -- , benchSrcIO aheadly "fromFoldable" Ops.sourceFromFoldable
+ benchSrcIO aheadly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO aheadly "fromFoldableM" Ops.sourceFromFoldableM
+ , benchSrcIO aheadly "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO aheadly "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM aheadly
+ ]
+ -- XXX need to use smaller streams to finish in reasonable time
+ , bgroup "parallely"
+ [ --benchIO "unfoldr" $ Ops.toNull parallely
+ --, benchSrcIO parallely "fromFoldable" Ops.sourceFromFoldable
+ benchSrcIO parallely "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO parallely "fromFoldableM" Ops.sourceFromFoldableM
+ , benchSrcIO parallely "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO parallely "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM parallely
+ -- Zip has only one parallel flavor
+ , benchIO "zip" $ Ops.zipAsync
+ ]
]
- , bgroup "filtering"
- [ benchIO "filter-even" Ops.filterEven
- , benchIO "filter-all-out" Ops.filterAllOut
- , benchIO "filter-all-in" Ops.filterAllIn
- , benchIO "take-all" Ops.takeAll
- , benchIO "takeWhile-true" Ops.takeWhileTrue
- , benchIO "drop-all" Ops.dropAll
- , benchIO "dropWhile-true" Ops.dropWhileTrue
- ]
- , benchIO "zip" Ops.zip
- , bgroup "append"
- [ benchIOAppend "serially" $ Ops.append serially
- , benchIOAppend "wSerially" $ Ops.append wSerially
- , benchIOAppend "asyncly" $ Ops.append asyncly
- , benchIOAppend "wAsyncly" $ Ops.append wAsyncly
- , benchIOAppend "parallely" $ Ops.append parallely
- ]
- , bgroup "compose"
- [ benchIO "mapM" Ops.composeMapM
- , benchIO "map-with-all-in-filter" Ops.composeMapAllInFilter
- , benchIO "all-in-filters" Ops.composeAllInFilters
- , benchIO "all-out-filters" Ops.composeAllOutFilters
- ]
- , bgroup "compose-scaling"
- -- Scaling with same operation in sequence
- [ benchIO "1" $ Ops.composeScaling 1
- , benchIO "2" $ Ops.composeScaling 2
- , benchIO "3" $ Ops.composeScaling 3
- , benchIO "4" $ Ops.composeScaling 4
- ]
- ]
diff --git a/benchmark/LinearOps.hs b/benchmark/LinearOps.hs
index fdbb297..abd826b 100644
--- a/benchmark/LinearOps.hs
+++ b/benchmark/LinearOps.hs
@@ -5,18 +5,19 @@
-- License : MIT
-- Maintainer : harendra.kumar@gmail.com
+{-# LANGUAGE FlexibleContexts #-}
+
module LinearOps where
import Prelude
(Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=),
- subtract, undefined, Maybe, Monoid, foldMap)
+ subtract, undefined, Maybe(..))
import qualified Streamly as S
import qualified Streamly.Prelude as S
-value, appendValue, maxValue :: Int
-value = 1000000
-appendValue = 100000
+value, maxValue :: Int
+value = 100000
maxValue = value + 1000
-------------------------------------------------------------------------------
@@ -44,25 +45,68 @@ maxValue = value + 1000
{-# INLINE composeAllInFilters #-}
{-# INLINE composeAllOutFilters #-}
{-# INLINE composeMapAllInFilter #-}
-toNull, scan, map, filterEven, mapM, filterAllOut,
+scan, map, filterEven, filterAllOut,
filterAllIn, takeOne, takeAll, takeWhileTrue, dropAll, dropWhileTrue, zip,
- concat, composeMapM, composeAllInFilters, composeAllOutFilters,
+ concat, composeAllInFilters, composeAllOutFilters,
composeMapAllInFilter
:: Monad m
=> Stream m Int -> m ()
+composeMapM :: S.MonadAsync m => Stream m Int -> m ()
toList :: Monad m => Stream m Int -> m [Int]
foldl :: Monad m => Stream m Int -> m Int
last :: Monad m => Stream m Int -> m (Maybe Int)
+toNull :: Monad m => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
+mapM :: (S.IsStream t, S.MonadAsync m)
+ => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
+zipAsync :: S.MonadAsync m => Stream m Int -> m ()
+
-------------------------------------------------------------------------------
-- Stream generation and elimination
-------------------------------------------------------------------------------
type Stream m a = S.SerialT m a
-source :: Int -> Stream m Int
-source n = S.fromFoldable [n..n+value]
+{-# INLINE source #-}
+source :: (S.MonadAsync m, S.IsStream t) => Int -> t m Int
+source n = S.serially $ sourceUnfoldrM n
+
+{-# INLINE sourceFromFoldable #-}
+sourceFromFoldable :: S.IsStream t => Int -> t m Int
+sourceFromFoldable n = S.fromFoldable [n..n+value]
+
+{-# INLINE sourceFromFoldableM #-}
+sourceFromFoldableM :: (S.IsStream t, S.MonadAsync m) => Int -> t m Int
+sourceFromFoldableM n = S.fromFoldableM (Prelude.fmap return [n..n+value])
+
+{-# INLINE sourceFoldMapWith #-}
+sourceFoldMapWith :: (S.IsStream t, Monad (t m), S.Semigroup (t m Int))
+ => Int -> t m Int
+sourceFoldMapWith n = S.foldMapWith (S.<>) return [n..n+value]
+
+{-# INLINE sourceFoldMapWithM #-}
+sourceFoldMapWithM :: (S.IsStream t, Monad m, S.Semigroup (t m Int))
+ => Int -> t m Int
+sourceFoldMapWithM n = S.foldMapWith (S.<>) (S.once . return) [n..n+value]
+
+{-# INLINE sourceUnfoldr #-}
+sourceUnfoldr :: S.IsStream t => Int -> t m Int
+sourceUnfoldr n = S.unfoldr step n
+ where
+ step cnt =
+ if cnt > n + value
+ then Nothing
+ else (Just (cnt, cnt + 1))
+
+{-# INLINE sourceUnfoldrM #-}
+sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> t m Int
+sourceUnfoldrM n = S.unfoldrM step n
+ where
+ step cnt =
+ if cnt > n + value
+ then return Nothing
+ else return (Just (cnt, cnt + 1))
{-# INLINE runStream #-}
runStream :: Monad m => Stream m a -> m ()
@@ -72,7 +116,7 @@ runStream = S.runStream
-- Elimination
-------------------------------------------------------------------------------
-toNull = runStream
+toNull t = runStream . t
toList = S.toList
foldl = S.foldl' (+) 0
last = S.last
@@ -87,7 +131,7 @@ transform = runStream
scan = transform . S.scanl' (+) 0
map = transform . fmap (+1)
-mapM = transform . S.mapM return
+mapM t = transform . t . S.mapM return
filterEven = transform . S.filter even
filterAllOut = transform . S.filter (> maxValue)
filterAllIn = transform . S.filter (<= maxValue)
@@ -102,19 +146,10 @@ dropWhileTrue = transform . S.dropWhile (<= maxValue)
-------------------------------------------------------------------------------
zip src = transform $ (S.zipWith (,) src src)
+zipAsync src = transform $ (S.zipAsyncWith (,) src src)
concat _n = return ()
-------------------------------------------------------------------------------
--- Append
--------------------------------------------------------------------------------
-
-{-# INLINE append #-}
-append
- :: (Monoid (t m Int), Monad m, Monad (t m))
- => (t m Int -> S.SerialT m Int) -> Int -> m ()
-append t n = runStream $ t $ foldMap return [n..n+appendValue]
-
--------------------------------------------------------------------------------
-- Composition
-------------------------------------------------------------------------------
@@ -127,6 +162,7 @@ composeAllInFilters = compose (S.filter (<= maxValue))
composeAllOutFilters = compose (S.filter (> maxValue))
composeMapAllInFilter = compose (S.filter (<= maxValue) . fmap (subtract 1))
+{-# INLINABLE composeScaling #-}
composeScaling :: Monad m => Int -> Stream m Int -> m ()
composeScaling m =
case m of
diff --git a/benchmark/Nested.hs b/benchmark/Nested.hs
index 4924fa4..f627af7 100644
--- a/benchmark/Nested.hs
+++ b/benchmark/Nested.hs
@@ -22,16 +22,8 @@ main :: IO ()
main = do
-- TBD Study scaling with 10, 100, 1000 loop iterations
defaultMain
- [ bgroup "linear"
- [ benchIO "toNullLinear" Ops.toNullLinear
- , benchIO "toListLinear" Ops.toListLinear
- ]
-
- , bgroup "serially"
- [ benchIO "append" $ Ops.append serially
- , benchIO "toNull0" $ Ops.toNull0 serially
- , benchIO "toList0" $ Ops.toList0 serially
- , benchIO "toNull" $ Ops.toNull serially
+ [ bgroup "serially"
+ [ benchIO "toNull" $ Ops.toNull serially
, benchIO "toList" $ Ops.toList serially
, benchIO "toListSome" $ Ops.toListSome serially
, benchIO "filterAllOut" $ Ops.filterAllOut serially
@@ -41,10 +33,7 @@ main = do
]
, bgroup "wSerially"
- [ benchIO "append" $ Ops.append wSerially
- , benchIO "toNull0" $ Ops.toNull0 wSerially
- , benchIO "toList0" $ Ops.toList0 wSerially
- , benchIO "toNull" $ Ops.toNull wSerially
+ [ benchIO "toNull" $ Ops.toNull wSerially
, benchIO "toList" $ Ops.toList wSerially
, benchIO "toListSome" $ Ops.toListSome wSerially
, benchIO "filterAllOut" $ Ops.filterAllOut wSerially
@@ -53,11 +42,19 @@ main = do
, benchIO "breakAfterSome" $ Ops.breakAfterSome wSerially
]
+ , bgroup "aheadly"
+ [ benchIO "toNull" $ Ops.toNull aheadly
+ , benchIO "toList" $ Ops.toList aheadly
+ , benchIO "toListSome" $ Ops.toListSome aheadly
+ , benchIO "filterAllOut" $ Ops.filterAllOut aheadly
+ , benchIO "filterAllIn" $ Ops.filterAllIn aheadly
+ -- this hangs, need to investigate
+ , benchIO "filterSome" $ Ops.filterSome aheadly
+ , benchIO "breakAfterSome" $ Ops.breakAfterSome aheadly
+ ]
+
, bgroup "asyncly"
- [ benchIO "append" $ Ops.append asyncly
- , benchIO "toNull0" $ Ops.toNull0 asyncly
- , benchIO "toList0" $ Ops.toList0 asyncly
- , benchIO "toNull" $ Ops.toNull asyncly
+ [ benchIO "toNull" $ Ops.toNull asyncly
, benchIO "toList" $ Ops.toList asyncly
, benchIO "toListSome" $ Ops.toListSome asyncly
, benchIO "filterAllOut" $ Ops.filterAllOut asyncly
@@ -67,10 +64,7 @@ main = do
]
, bgroup "wAsyncly"
- [ benchIO "append" $ Ops.append wAsyncly
- , benchIO "toNull0" $ Ops.toNull0 wAsyncly
- , benchIO "toList0" $ Ops.toList0 wAsyncly
- , benchIO "toNull" $ Ops.toNull wAsyncly
+ [ benchIO "toNull" $ Ops.toNull wAsyncly
, benchIO "toList" $ Ops.toList wAsyncly
, benchIO "toListSome" $ Ops.toListSome wAsyncly
, benchIO "filterAllOut" $ Ops.filterAllOut wAsyncly
@@ -80,10 +74,7 @@ main = do
]
, bgroup "parallely"
- [ benchIO "append" $ Ops.append parallely
- , benchIO "toNull0" $ Ops.toNull0 parallely
- , benchIO "toList0" $ Ops.toList0 parallely
- , benchIO "toNull" $ Ops.toNull parallely
+ [ benchIO "toNull" $ Ops.toNull parallely
, benchIO "toList" $ Ops.toList parallely
, benchIO "toListSome" $ Ops.toListSome parallely
, benchIO "filterAllOut" $ Ops.filterAllOut parallely
diff --git a/benchmark/NestedOps.hs b/benchmark/NestedOps.hs
index c2e6381..df7195e 100644
--- a/benchmark/NestedOps.hs
+++ b/benchmark/NestedOps.hs
@@ -5,6 +5,7 @@
-- License : MIT
-- Maintainer : harendra.kumar@gmail.com
+{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module NestedOps where
@@ -28,8 +29,26 @@ prodCount = 1000
type Stream m a = S.SerialT m a
{-# INLINE source #-}
-source :: S.IsStream t => Int -> Int -> t m Int
-source start n = S.fromFoldable [start..start+n]
+source :: (S.MonadAsync m, S.IsStream t) => Int -> Int -> t m Int
+source = sourceUnfoldrM
+
+{-# INLINE sourceUnfoldrM #-}
+sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int
+sourceUnfoldrM n value = S.serially $ S.unfoldrM step n
+ where
+ step cnt =
+ if cnt > n + value
+ then return Nothing
+ else return (Just (cnt, cnt + 1))
+
+{-# INLINE sourceUnfoldr #-}
+sourceUnfoldr :: S.IsStream t => Int -> Int -> t m Int
+sourceUnfoldr start n = S.unfoldr step start
+ where
+ step cnt =
+ if cnt > start + n
+ then Nothing
+ else (Just (cnt, cnt + 1))
{-# INLINE runStream #-}
runStream :: Monad m => Stream m a -> m ()
@@ -43,98 +62,66 @@ runToList = S.toList
-- Benchmark ops
-------------------------------------------------------------------------------
-{-# INLINE toNullLinear #-}
-toNullLinear :: Monad m => Int -> m ()
-toNullLinear start = runStream $ source start sumCount
-
-{-# INLINE toListLinear #-}
-toListLinear :: Monad m => Int -> m [Int]
-toListLinear start = runToList $ source start sumCount
-
-{-# INLINE append #-}
-append
- :: (Monoid (t m Int), Monad m, Monad (t m))
- => (t m Int -> S.SerialT m Int) -> Int -> m ()
-append t start = runStream $ t $ foldMap return [start..start+sumCount]
-
-{-# INLINE toNull0 #-}
-toNull0
- :: (S.IsStream t, Monad m, Monad (t m))
- => (t m (Int, Int) -> S.SerialT m (Int, Int)) -> Int -> m ()
-toNull0 t start = runStream . t $ do
- x <- source start prodCount
- y <- source start prodCount
- return (x,y)
-
-{-# INLINE toList0 #-}
-toList0
- :: (S.IsStream t, Monad m, Monad (t m))
- => (t m (Int, Int) -> S.SerialT m (Int, Int)) -> Int -> m [(Int, Int)]
-toList0 t start = runToList . t $ do
- x <- source start prodCount
- y <- source start prodCount
- return (x,y)
-
{-# INLINE toNull #-}
toNull
- :: (S.IsStream t, Monad m, Monad (t m))
+ :: (S.IsStream t, S.MonadAsync m, Monad (t m))
=> (t m Int -> S.SerialT m Int) -> Int -> m ()
toNull t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
- return $ x * x + y * y
+ return $ x + y
{-# INLINE toList #-}
toList
- :: (S.IsStream t, Monad m, Monad (t m))
+ :: (S.IsStream t, S.MonadAsync m, Monad (t m))
=> (t m Int -> S.SerialT m Int) -> Int -> m [Int]
toList t start = runToList . t $ do
x <- source start prodCount
y <- source start prodCount
- return $ x * x + y * y
+ return $ x + y
{-# INLINE toListSome #-}
toListSome
- :: (S.IsStream t, Monad m, Monad (t m))
+ :: (S.IsStream t, S.MonadAsync m, Monad (t m))
=> (t m Int -> S.SerialT m Int) -> Int -> m [Int]
toListSome t start =
runToList . t $ S.take 1000 $ do
x <- source start prodCount
y <- source start prodCount
- return $ x * x + y * y
+ return $ x + y
{-# INLINE filterAllOut #-}
filterAllOut
- :: (S.IsStream t, Monad m, Monad (t m))
+ :: (S.IsStream t, S.MonadAsync m, Monad (t m))
=> (t m Int -> S.SerialT m Int) -> Int -> m ()
filterAllOut t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
- let s = x * x + y * y
- if (s < 1)
+ let s = x + y
+ if (s < 0)
then return s
else S.nil
{-# INLINE filterAllIn #-}
filterAllIn
- :: (S.IsStream t, Monad m, Monad (t m))
+ :: (S.IsStream t, S.MonadAsync m, Monad (t m))
=> (t m Int -> S.SerialT m Int) -> Int -> m ()
filterAllIn t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
- let s = x * x + y * y
- if (s > 1)
+ let s = x + y
+ if (s > 0)
then return s
else S.nil
{-# INLINE filterSome #-}
filterSome
- :: (S.IsStream t, Monad m, Monad (t m))
+ :: (S.IsStream t, S.MonadAsync m, Monad (t m))
=> (t m Int -> S.SerialT m Int) -> Int -> m ()
filterSome t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
- let s = x * x + y * y
+ let s = x + y
if (s > 1100000)
then return s
else S.nil
@@ -147,7 +134,7 @@ breakAfterSome t start = do
(_ :: Either ErrorCall ()) <- try $ runStream . t $ do
x <- source start prodCount
y <- source start prodCount
- let s = x * x + y * y
+ let s = x + y
if (s > 1100000)
then error "break"
else return s
diff --git a/examples/ListDir.hs b/examples/ListDir.hs
index 446d037..fb535ea 100644
--- a/examples/ListDir.hs
+++ b/examples/ListDir.hs
@@ -1,18 +1,18 @@
import Control.Monad.IO.Class (liftIO)
import Path.IO (listDir, getCurrentDir)
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
-import Streamly (runStream, asyncly)
+import Streamly (runStream, aheadly)
-- | List the current directory recursively using concurrent processing
--
-- This example demonstrates that there is little difference between regular
-- IO code and concurrent streamly code. You can just remove
--- 'runStream . asyncly' and this becomes your regular IO code.
+-- 'runStream . aheadly' and this becomes your regular IO code.
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
- runStream . asyncly $ getCurrentDir >>= readdir
+ runStream . aheadly $ getCurrentDir >>= readdir
where readdir d = do
- (ds, fs) <- liftIO $ listDir d
+ (ds, fs) <- listDir d
liftIO $ mapM_ putStrLn $ map show fs ++ map show ds
foldMap readdir ds
diff --git a/examples/SearchQuery.hs b/examples/SearchQuery.hs
index 401657f..c729080 100644
--- a/examples/SearchQuery.hs
+++ b/examples/SearchQuery.hs
@@ -1,5 +1,5 @@
import Streamly
-import Streamly.Prelude (once)
+import Streamly.Prelude (nil, once, (|:))
import Network.HTTP.Simple
-- | Runs three search engine queries in parallel and prints the search engine
@@ -9,17 +9,20 @@ import Network.HTTP.Simple
--
main :: IO ()
main = do
- putStrLn "Using parallel semigroup composition"
- runStream . parallely $ google <> bing <> duckduckgo
+ putStrLn "Using parallel stream construction"
+ runStream . parallely $ google |: bing |: duckduckgo |: nil
+
+ putStrLn "\nUsing parallel semigroup composition"
+ runStream . parallely $ once google <> once bing <> once duckduckgo
putStrLn "\nUsing parallel applicative zip"
- runStream . zipAsyncly $ (,,) <$> google <*> bing <*> duckduckgo
+ runStream . zipAsyncly $ (,,) <$> once google <*> once bing <*> once duckduckgo
where
- get :: IsStream t => String -> t IO ()
- get s = once (httpNoBody (parseRequest_ s) >> putStrLn (show s))
+ get :: String -> IO ()
+ get s = httpNoBody (parseRequest_ s) >> putStrLn (show s)
- google, bing, duckduckgo :: IsStream t => t IO ()
+ google, bing, duckduckgo :: IO ()
google = get "https://www.google.com/search?q=haskell"
bing = get "https://www.bing.com/search?q=haskell"
duckduckgo = get "https://www.duckduckgo.com/?q=haskell"
diff --git a/src/Streamly.hs b/src/Streamly.hs
index c6728f3..61a80d4 100644
--- a/src/Streamly.hs
+++ b/src/Streamly.hs
@@ -65,7 +65,11 @@ module Streamly
, SerialT
, WSerialT
- -- ** Parallel Streams
+ -- ** Concurrent Lookahead Streams
+ -- $lookahead
+ , AheadT
+
+ -- ** Concurrent Asynchronous Streams
-- $async
, AsyncT
, WAsyncT
@@ -76,21 +80,40 @@ module Streamly
, ZipSerialM
, ZipAsyncM
- -- * Polymorphic Sum Operations
+ -- * Running Streams
+ , runStream
+
+ -- * Parallel Function Application
+ -- $application
+ , (|$)
+ , (|&)
+ , (|$.)
+ , (|&.)
+ , mkAsync
+
+ -- * Merging Streams
-- $sum
, serial
, wSerial
+ , ahead
, async
, wAsync
, parallel
+ -- * Folding Containers of Streams
+ -- $foldutils
+ , foldWith
+ , foldMapWith
+ , forEachWith
+
-- * Stream Type Adapters
-- $adapters
- , IsStream
+ , IsStream ()
, serially
, wSerially
, asyncly
+ , aheadly
, wAsyncly
, parallely
, zipSerially
@@ -100,24 +123,13 @@ module Streamly
-- * IO Streams
, Serial
, WSerial
+ , Ahead
, Async
, WAsync
, Parallel
, ZipSerial
, ZipAsync
- -- * Running Streams
- , runStream
-
- -- * Transformation
- , mkAsync
-
- -- * Polymorphic Fold Utilities
- -- $foldutils
- , foldWith
- , foldMapWith
- , forEachWith
-
-- * Re-exports
, Semigroup (..)
-- * Deprecated
@@ -146,7 +158,7 @@ import Data.Semigroup (Semigroup(..))
-- $serial
--
-- Serial streams compose serially or non-concurrently. In a composed stream,
--- each action is executed only after the prvious action has finished. The two
+-- each action is executed only after the previous action has finished. The two
-- serial stream types 'SerialT' and 'WSerialT' differ in how they traverse the
-- streams in a 'Semigroup' or 'Monad' composition.
@@ -167,6 +179,23 @@ import Data.Semigroup (Semigroup(..))
-- corresponding elements of two streams together. Note that these types are
-- not monads.
+-- $application
+--
+-- Stream processing functions can be composed in a chain using function
+-- application with or without the '$' operator, or with reverse function
+-- application operator '&'. Streamly provides concurrent versions of these
+-- operators applying stream processing functions such that each stage of the
+-- stream can run in parallel. The operators start with a @|@; we can read '|$'
+-- as "@parallel dollar@" to remember that @|@ comes before '$'.
+--
+-- Imports for the code snippets below:
+--
+-- @
+-- import Streamly
+-- import qualified Streamly.Prelude as S
+-- import Control.Concurrent
+-- @
+
-- $sum
-- The 'Semigroup' operation '<>' of each stream type combines two streams in a
-- type specific manner. This section provides polymorphic versions of '<>'
diff --git a/src/Streamly/Core.hs b/src/Streamly/Core.hs
index 7471d36..986f9b9 100644
--- a/src/Streamly/Core.hs
+++ b/src/Streamly/Core.hs
@@ -1,3 +1,4 @@
+{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
@@ -25,66 +26,70 @@ module Streamly.Core
-- * Streams
, Stream (..)
- -- * Construction
+ -- * Construction (pure)
+ , nil
+ , cons
, singleton
, once
- , cons
- , consM
, repeat
- , nil
+
+ -- * Construction (monadic)
+ , consM
+ , consMAhead
+ , consMAsync
+ , consMWAsync
+ , consMParallel
-- * Semigroup Style Composition
, serial
, wSerial
+ , ahead
, async
, wAsync
, parallel
- -- * Alternative
- , alt
+ -- * applications
+ , applyWith
+ , runWith
-- * zip
, zipWith
, zipAsyncWith
- -- * Transformers
- , withLocal
- , withCatchError
-
-- * Concurrent Stream Vars (SVars)
, SVar
- , SVarSched (..)
- , SVarTag (..)
, SVarStyle (..)
- , newEmptySVar
, newStreamVar1
- , newStreamVar2
- , joinStreamVar2
, fromStreamVar
, toStreamVar
)
where
-import Control.Concurrent (ThreadId, myThreadId, threadDelay)
-import Control.Concurrent.MVar (MVar, newEmptyMVar, tryTakeMVar,
- tryPutMVar, takeMVar, readMVar)
-import Control.Exception (SomeException (..))
-import qualified Control.Exception.Lifted as EL
+import Control.Concurrent (ThreadId, myThreadId,
+ threadDelay, getNumCapabilities)
+import Control.Concurrent.MVar (MVar, newEmptyMVar,
+ tryPutMVar, takeMVar)
+import Control.Exception (SomeException (..), catch, mask)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow, throwM)
-import Control.Monad.Error.Class (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
-import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
-import Control.Monad.Trans.Control (MonadBaseControl, liftBaseWith)
+import Control.Monad.Trans.Control (MonadBaseControl, control)
import Data.Atomics (casIORef, readForCAS, peekTicket
- ,atomicModifyIORefCAS_)
+ ,atomicModifyIORefCAS_
+ ,writeBarrier,storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, pushL,
tryPopR, nullQ)
import Data.Functor (void)
+import Data.Heap (Heap, Entry(..))
+import qualified Data.Heap as H
import Data.IORef (IORef, modifyIORef, newIORef,
- readIORef, atomicModifyIORef)
-import Data.Maybe (isNothing, fromJust)
+ readIORef, atomicModifyIORef
+#ifdef DIAGNOSTICS
+ , writeIORef
+#endif
+ )
+import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
import Data.Set (Set)
import qualified Data.Set as S
@@ -94,6 +99,19 @@ import GHC.Exts
import GHC.Conc (ThreadId(..))
import GHC.IO (IO(..))
+-- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we
+-- can keep it on in production to debug problems quickly if and when they
+-- happen, but it may result in unexpected output when threads are left hanging
+-- until they are GCed because the consumer went away.
+
+#ifdef DIAGNOSTICS
+import Control.Concurrent.MVar (tryTakeMVar)
+import Control.Exception (catches, throwIO, Handler(..),
+ BlockedIndefinitelyOnMVar(..),
+ BlockedIndefinitelyOnSTM(..))
+import System.IO (hPutStrLn, stderr)
+#endif
+
------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------
@@ -103,24 +121,24 @@ data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
+-- | Sorting out-of-turn outputs in a heap for Ahead style streams
+data AheadHeapEntry m a =
+ AheadEntryPure a
+ | AheadEntryStream (Stream m a)
+
------------------------------------------------------------------------------
-- State threaded around the monad for thread management
------------------------------------------------------------------------------
--- | Conjunction is used for monadic/product style composition. Disjunction is
--- used for fold/sum style composition. We need to distinguish the two types of
--- SVars so that the scheduling of the two is independent.
-data SVarTag = Conjunction | Disjunction deriving Eq
-
-data SVarSched =
- LIFO -- depth first concurrent
- | FIFO -- breadth first concurrent
- | Par -- all parallel
- deriving Eq
-
+-- XXX use a separate data structure for each type of SVar
-- | Identify the type of the SVar. Two computations using the same style can
-- be scheduled on the same SVar.
-data SVarStyle = SVarStyle SVarTag SVarSched deriving Eq
+data SVarStyle =
+ AsyncVar -- depth first concurrent
+ | WAsyncVar -- breadth first concurrent
+ | ParallelVar -- all parallel
+ | AheadVar -- Concurrent look ahead
+ deriving (Eq, Show)
-- | An SVar or a Stream Var is a conduit to the output from multiple streams
-- running concurrently and asynchronously. An SVar can be thought of as an
@@ -144,20 +162,116 @@ data SVarStyle = SVarStyle SVarTag SVarSched deriving Eq
--
-- New work is enqueued either at the time of creation of the SVar or as a
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
--- already enqueued computations get evaluated. See 'joinStreamVar2'.
+-- already enqueued computations get evaluated. See 'joinStreamVarAsync'.
--
data SVar m a =
- SVar { outputQueue :: IORef ([ChildEvent a], Int)
- , doorBell :: MVar () -- signal the consumer about output
- , siren :: MVar () -- hooter for workers to begin work
+ SVar {
+ -- Read only state
+ svarStyle :: SVarStyle
+
+ -- Shared output queue (events, length)
+ , outputQueue :: IORef ([ChildEvent a], Int)
+ , doorBell :: MVar () -- signal the consumer about output
+
+ -- Output synchronization mechanism for Ahead streams (Ahead and
+ -- wAhead). We maintain a heap of out of sequence ahead of time
+ -- generated outputs and the sequence number of the task that is
+ -- currently at the head of the stream. Concurrent execute ahead
+ -- tasks that have a sequence number greater than the task at the
+ -- head should add their output to the heap.
+ , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry m a))
+ , Int
+ )
+
+ -- Shared work queue (stream, seqNo)
+ , workQueue :: IORef ([Stream m a], Int)
, enqueue :: Stream m a -> IO ()
+ , queueEmpty :: m Bool
+ , waitingForWork :: IORef Bool
, runqueue :: m ()
+
+ -- Shared, thread tracking
, runningThreads :: IORef (Set ThreadId)
- , queueEmpty :: m Bool
, activeWorkers :: IORef Int
- , svarStyle :: SVarStyle
+#ifdef DIAGNOSTICS
+ , totalDispatches :: IORef Int
+ , maxWorkers :: IORef Int
+ , maxOutQSize :: IORef Int
+ , maxHeapSize :: IORef Int
+ , maxWorkQSize :: IORef Int
+#endif
}
+#ifdef DIAGNOSTICS
+{-# NOINLINE dumpSVar #-}
+dumpSVar :: SVar m a -> IO String
+dumpSVar sv = do
+ tid <- myThreadId
+ (oqList, oqLen) <- readIORef $ outputQueue sv
+ db <- tryTakeMVar $ doorBell sv
+ aheadDump <-
+ if svarStyle sv == AheadVar
+ then do
+ (oheap, oheapSeq) <- readIORef $ outputHeap sv
+ (wq, wqSeq) <- readIORef $ workQueue sv
+ maxHp <- readIORef $ maxHeapSize sv
+ return $ unlines
+ [ "heap length = " ++ show (H.size oheap)
+ , "heap seqeunce = " ++ show oheapSeq
+ , "work queue length = " ++ show (length wq)
+ , "work queue sequence = " ++ show wqSeq
+ , "heap max size = " ++ show maxHp
+ ]
+ else return []
+
+ waiting <- readIORef $ waitingForWork sv
+ rthread <- readIORef $ runningThreads sv
+ workers <- readIORef $ activeWorkers sv
+ maxWrk <- readIORef $ maxWorkers sv
+ dispatches <- readIORef $ totalDispatches sv
+ maxOq <- readIORef $ maxOutQSize sv
+ -- XXX queueEmpty should be made IO return type
+
+ return $ unlines
+ [ "tid = " ++ show tid
+ , "style = " ++ show (svarStyle sv)
+ , "outputQueue length computed = " ++ show (length oqList)
+ , "outputQueue length maintained = " ++ show oqLen
+ , "output doorBell = " ++ show db
+ , "total dispatches = " ++ show dispatches
+ , "max workers = " ++ show maxWrk
+ , "max outQSize = " ++ show maxOq
+ ]
+ ++ aheadDump ++ unlines
+ [ "waitingForWork = " ++ show waiting
+ , "running threads = " ++ show rthread
+ , "running thread count = " ++ show workers
+ ]
+
+{-# NOINLINE mvarExcHandler #-}
+mvarExcHandler :: SVar m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
+mvarExcHandler sv label e@BlockedIndefinitelyOnMVar = do
+ svInfo <- dumpSVar sv
+ hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnMVar\n" ++ svInfo
+ throwIO e
+
+{-# NOINLINE stmExcHandler #-}
+stmExcHandler :: SVar m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
+stmExcHandler sv label e@BlockedIndefinitelyOnSTM = do
+ svInfo <- dumpSVar sv
+ hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnSTM\n" ++ svInfo
+ throwIO e
+
+withDBGMVar :: SVar m a -> String -> IO () -> IO ()
+withDBGMVar sv label action =
+ action `catches` [ Handler (mvarExcHandler sv label)
+ , Handler (stmExcHandler sv label)
+ ]
+#else
+withDBGMVar :: SVar m a -> String -> IO () -> IO ()
+withDBGMVar _ _ action = action
+#endif
+
-- Slightly faster version of CAS. Gained some improvement by avoiding the use
-- of "evaluate" because we know we do not have exceptions in fn.
{-# INLINE atomicModifyIORefCAS #-}
@@ -168,7 +282,7 @@ atomicModifyIORefCAS ref fn = do
where
- retries = 30 :: Int
+ retries = 25 :: Int
loop _ 0 = atomicModifyIORef ref fn
loop old tries = do
let (new, result) = fn $ peekTicket old
@@ -209,21 +323,22 @@ newtype Stream m a =
nil :: Stream m a
nil = Stream $ \_ stp _ _ -> stp
-once :: Monad m => m a -> Stream m a
-once m = Stream $ \_ _ single _ -> m >>= single
+-- | faster than consM because there is no bind.
+cons :: a -> Stream m a -> Stream m a
+cons a r = Stream $ \_ _ _ yld -> yld a r
-{-# INLINE singleton #-}
--- | Same as @once . return@
+-- | Same as @once . return@ but may be faster because there is no bind
singleton :: a -> Stream m a
singleton a = Stream $ \_ _ single _ -> single a
+{-# INLINE once #-}
+once :: Monad m => m a -> Stream m a
+once m = Stream $ \_ _ single _ -> m >>= single
+
+{-# INLINE consM #-}
consM :: Monad m => m a -> Stream m a -> Stream m a
consM m r = Stream $ \_ _ _ yld -> m >>= \a -> yld a r
--- | Same as @consM . return@
-cons :: a -> Stream m a -> Stream m a
-cons a r = Stream $ \_ _ _ yld -> yld a r
-
repeat :: a -> Stream m a
repeat a = let x = cons a x in x
@@ -233,6 +348,7 @@ repeat a = let x = cons a x in x
-- | Concatenates two streams sequentially i.e. the first stream is
-- exhausted completely before yielding any element from the second stream.
+{-# INLINE serial #-}
serial :: Stream m a -> Stream m a -> Stream m a
serial m1 m2 = go m1
where
@@ -257,6 +373,7 @@ instance Monoid (Stream m a) where
-- Interleave
------------------------------------------------------------------------------
+{-# INLINE wSerial #-}
wSerial :: Stream m a -> Stream m a -> Stream m a
wSerial m1 m2 = Stream $ \_ stp sng yld -> do
let stop = (runStream m2) Nothing stp sng yld
@@ -288,28 +405,34 @@ rawForkIO action = IO $ \ s ->
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
- -> (SomeException -> m ())
+ -> (SomeException -> IO ())
-> m ThreadId
doFork action exHandler =
- EL.mask $ \restore ->
- liftBaseWith $ \runInIO -> rawForkIO $ do
- _ <- runInIO $ EL.catch (restore action) exHandler
- -- XXX restore state here?
- return ()
+ control $ \runInIO ->
+ mask $ \restore -> do
+ tid <- rawForkIO $ catch (restore $ void $ runInIO action)
+ exHandler
+ runInIO (return tid)
-- XXX exception safety of all atomic/MVar operations
-- TBD Each worker can have their own queue and the consumer can empty one
-- queue at a time, that way contention can be reduced.
-{-# INLINE send #-}
-send :: MonadIO m => SVar m a -> ChildEvent a -> m ()
-send sv msg = liftIO $ do
+
+maxOutputQLen :: Int
+maxOutputQLen = 1500
+
+-- | This function is used by the producer threads to queue output for the
+-- consumer thread to consume. Returns whether the queue has more space.
+{-# NOINLINE send #-}
+send :: SVar m a -> ChildEvent a -> IO Bool
+send sv msg = do
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
- if (len <= 0) then do
- -- XXX need a memory barrier? The wake up must happen only after the
- -- store has finished otherwise we can have lost wakeup problems.
- --
+ when (len <= 0) $ do
+ -- The wake up must happen only after the store has finished otherwise
+ -- we can have lost wakeup problems.
+ writeBarrier
-- Since multiple workers can try this at the same time, it is possible
-- that we may put a spurious MVar after the consumer has already seen
-- the output. But that's harmless, at worst it may cause the consumer
@@ -317,36 +440,36 @@ send sv msg = liftIO $ do
-- The important point is that the consumer is guaranteed to receive a
-- doorbell if something was added to the queue after it empties it.
void $ tryPutMVar (doorBell sv) ()
- -- The first worker who notices the output queue was emptied puts the
- -- siren off.
- void $ tryTakeMVar (siren sv)
- else if (len + 1 >= 1500) then do
- -- We are guaranteed to receive the siren if the consumer reads the
- -- queue because the consumer puts the siren on before reading the
- -- queue.
- --
- -- We may get the siren between the siren being swicthed on and the
- -- queue getting read but that's harmless, at amost everyone will go
- -- back to work and will have to sleep again if queue was still not
- -- emptied.
- --
- -- If even before a worker could read the MVar, the queue gets emptied
- -- and another worker queuing on it switches off the siren, then we may
- -- sleep here. In that case we are guaranteed to be woken up on the
- -- next siren. The next siren is guaranteed as we send a doorbell
- -- before switching off the siren, and the consumer switches on the
- -- siren after receiving the doorbell.
- readMVar (siren sv)
- else return ()
-
-{-# INLINE sendStop #-}
-sendStop :: MonadIO m => SVar m a -> m ()
-sendStop sv = liftIO myThreadId >>= \tid -> send sv (ChildStop tid Nothing)
-
--- Note: Left associated compositions can grow this queue to a large size
+ return (len < maxOutputQLen)
+
+{-# NOINLINE sendStop #-}
+sendStop :: SVar m a -> IO ()
+sendStop sv = do
+ liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n - 1
+ myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
+
+-------------------------------------------------------------------------------
+-- Async
+-------------------------------------------------------------------------------
+
+-- Note: For purely right associated expressions this queue should have at most
+-- one element. It grows to more than one when we have left associcated
+-- expressions. Large left associated compositions can grow this to a
+-- large size
{-# INLINE enqueueLIFO #-}
-enqueueLIFO :: IORef [Stream m a] -> Stream m a -> IO ()
-enqueueLIFO q m = atomicModifyIORefCAS_ q $ \ ms -> m : ms
+enqueueLIFO :: SVar m a -> IORef [Stream m a] -> Stream m a -> IO ()
+enqueueLIFO sv q m = do
+ atomicModifyIORefCAS_ q $ \ms -> m : ms
+ storeLoadBarrier
+ w <- readIORef $ waitingForWork sv
+ when w $ do
+ -- Note: the sequence of operations is important for correctness here.
+ -- We need to set the flag to false strictly before sending the
+ -- doorBell, otherwise the doorBell may get processed too early and
+ -- then we may set the flag to False to later making the consumer lose
+ -- the flag, even without receiving a doorBell.
+ atomicModifyIORefCAS_ (waitingForWork sv) (const False)
+ void $ tryPutMVar (doorBell sv) ()
runqueueLIFO :: MonadIO m => SVar m a -> IORef [Stream m a] -> m ()
runqueueLIFO sv q = run
@@ -356,20 +479,44 @@ runqueueLIFO sv q = run
run = do
work <- dequeue
case work of
- Nothing -> sendStop sv
+ Nothing -> liftIO $ sendStop sv
Just m -> (runStream m) (Just sv) run single yield
- sendit a = send sv (ChildYield a)
- single a = sendit a >> run
- yield a r = sendit a >> (runStream r) (Just sv) run single yield
+ single a = do
+ res <- liftIO $ send sv (ChildYield a)
+ if res then run else liftIO $ sendStop sv
+ yield a r = do
+ res <- liftIO $ send sv (ChildYield a)
+ if res
+ then (runStream r) (Just sv) run single yield
+ else liftIO $ enqueueLIFO sv q r >> sendStop sv
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
+-------------------------------------------------------------------------------
+-- WAsync
+-------------------------------------------------------------------------------
+
+-- XXX we can use the Ahead style sequence/heap mechanism to make the best
+-- effort to always try to finish the streams on the left side of an expression
+-- first as long as possible.
+
{-# INLINE enqueueFIFO #-}
-enqueueFIFO :: LinkedQueue (Stream m a) -> Stream m a -> IO ()
-enqueueFIFO = pushL
+enqueueFIFO :: SVar m a -> LinkedQueue (Stream m a) -> Stream m a -> IO ()
+enqueueFIFO sv q m = do
+ pushL q m
+ storeLoadBarrier
+ w <- readIORef $ waitingForWork sv
+ when w $ do
+ -- Note: the sequence of operations is important for correctness here.
+ -- We need to set the flag to false strictly before sending the
+ -- doorBell, otherwise the doorBell may get processed too early and
+ -- then we may set the flag to False to later making the consumer lose
+ -- the flag, even without receiving a doorBell.
+ atomicModifyIORefCAS_ (waitingForWork sv) (const False)
+ void $ tryPutMVar (doorBell sv) ()
runqueueFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m ()
runqueueFIFO sv q = run
@@ -379,24 +526,250 @@ runqueueFIFO sv q = run
run = do
work <- dequeue
case work of
- Nothing -> sendStop sv
+ Nothing -> liftIO $ sendStop sv
Just m -> (runStream m) (Just sv) run single yield
dequeue = liftIO $ tryPopR q
- sendit a = send sv (ChildYield a)
- single a = sendit a >> run
- yield a r = sendit a >> liftIO (enqueueFIFO q r) >> run
+ single a = do
+ res <- liftIO $ send sv (ChildYield a)
+ if res then run else liftIO $ sendStop sv
+ yield a r = do
+ res <- liftIO $ send sv (ChildYield a)
+ liftIO (enqueueFIFO sv q r)
+ if res then run else liftIO $ sendStop sv
-{-# INLINE runOne #-}
+-------------------------------------------------------------------------------
+-- Parallel
+-------------------------------------------------------------------------------
+
+{-# NOINLINE runOne #-}
runOne :: MonadIO m => SVar m a -> Stream m a -> m ()
runOne sv m = (runStream m) (Just sv) stop single yield
where
- stop = sendStop sv
- sendit a = send sv (ChildYield a)
+ stop = liftIO $ sendStop sv
+ sendit a = liftIO $ send sv (ChildYield a)
single a = sendit a >> stop
- yield a r = sendit a >> runOne sv r
+ -- XXX there is no flow control in parallel case. We should perhaps use a
+ -- queue and queue it back on that and exit the thread when the outputQueue
+ -- overflows. Parallel is dangerous because it can accumulate unbounded
+ -- output in the buffer.
+ yield a r = void (sendit a) >> runOne sv r
+
+-------------------------------------------------------------------------------
+-- Ahead
+-------------------------------------------------------------------------------
+
+-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
+-- but always serve them in the same order as they appear in the stream. To
+-- implement lookahead streams efficiently we assign a sequence number to each
+-- task when the task is picked up for execution. When the task finishes, the
+-- output is tagged with the same sequence number and we rearrange the outputs
+-- in sequence based on that number.
+--
+-- To explain the mechanism imagine that the current task at the head of the
+-- stream has a "token" to yield to the outputQueue. The ownership of the token
+-- is determined by the current sequence number is maintained in outputHeap.
+-- Sequence number is assigned when a task is queued. When a thread dequeues a
+-- task it picks up the sequence number as well and when the output is ready it
+-- uses the sequence number to queue the output to the outputQueue.
+--
+-- The thread with current sequence number sends the output directly to the
+-- outputQueue. Other threads push the output to the outputHeap. When the task
+-- being queued on the heap is a stream of many elements we evaluate only the
+-- first element and keep the rest of the unevaluated computation in the heap.
+-- When such a task gets the "token" for outputQueue it evaluates and directly
+-- yields all the elements to the outputQueue without checking for the
+-- "token".
+--
+-- Note that no two outputs in the heap can have the same sequence numbers and
+-- therefore we do not need a stable heap. We have also separated the buffer
+-- for the current task (outputQueue) and the pending tasks (outputHeap) so
+-- that the pending tasks cannot interfere with the current task. Note that for
+-- a single task just the outputQueue is enough and for the case of many
+-- threads just a heap is good enough. However we balance between these two
+-- cases, so that both are efficient.
+--
+-- For bigger streams it may make sense to have separate buffers for each
+-- stream. However, for singleton streams this may become inefficient. However,
+-- if we do not have separate buffers, then the streams that come later in
+-- sequence may hog the buffer, hindering the streams that are ahead. For this
+-- reason we have a single element buffer limitation for the streams being
+-- executed in advance.
+--
+-- This scheme works pretty efficiently with less than 40% extra overhead
+-- compared to the Async streams where we do not have any kind of sequencing of
+-- the outputs. It is especially devised so that we are most efficient when we
+-- have short tasks and need just a single thread. Also when a thread yields
+-- many items it can hold lockfree access to the outputQueue and do it
+-- efficiently.
+--
+-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
+-- that they do not hog the resources and hinder the progress of the threads in
+-- front of them.
+
+-- Left associated ahead expressions are expensive. We start a new SVar for
+-- each left associative expression. The queue is used only for right
+-- associated expression, we queue the right expression and execute the left.
+-- Thererefore the queue never has more than on item in it.
+{-# INLINE enqueueAhead #-}
+enqueueAhead :: SVar m a -> IORef ([Stream m a], Int) -> Stream m a -> IO ()
+enqueueAhead sv q m = do
+ atomicModifyIORefCAS_ q $ \ case
+ ([], n) -> ([m], n + 1) -- increment sequence
+ _ -> error "not empty"
+ storeLoadBarrier
+ w <- readIORef $ waitingForWork sv
+ when w $ do
+ -- Note: the sequence of operations is important for correctness here.
+ -- We need to set the flag to false strictly before sending the
+ -- doorBell, otherwise the doorBell may get processed too early and
+ -- then we may set the flag to False to later making the consumer lose
+ -- the flag, even without receiving a doorBell.
+ atomicModifyIORefCAS_ (waitingForWork sv) (const False)
+ void $ tryPutMVar (doorBell sv) ()
+
+-- Normally the thread that has the token should never go away. The token gets
+-- handed over to another thread, but someone or the other has the token at any
+-- point of time. But if the task that has the token finds that the outputQueue
+-- is full, in that case it can go away without even handing over the token to
+-- another thread. In that case it sets the nextSequence number in the heap its
+-- own sequence number before going away. To handle this case, any task that
+-- does not have the token tries to dequeue from the heap first before
+-- dequeuing from the work queue. If it finds that the task at the top of the
+-- heap is the one that owns the current sequence number then it grabs the
+-- token and starts with that.
+--
+-- XXX instead of queueing just the head element and the remaining computation
+-- on the heap, evaluate as many as we can and place them on the heap. But we
+-- need to give higher priority to the lower sequence numbers so that lower
+-- priority tasks do not fill up the heap making higher priority tasks block
+-- due to full heap. Maybe we can have a weighted space for them in the heap.
+-- The weight is inversely proportional to the sequence number.
+--
+-- XXX review for livelock
+--
+runqueueAhead :: MonadIO m => SVar m a -> IORef ([Stream m a], Int) -> m ()
+runqueueAhead sv q = runHeap
+
+ where
+
+ maxHeap = 1500
+
+ toHeap seqNo ent = do
+ hp <- liftIO $ atomicModifyIORefCAS (outputHeap sv) $ \(h, snum) ->
+ ((H.insert (Entry seqNo ent) h, snum), h)
+ if H.size hp <= maxHeap
+ then runHeap
+ else liftIO $ sendStop sv
+
+ singleToHeap seqNo a = toHeap seqNo (AheadEntryPure a)
+ yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `cons` r))
+
+ singleOutput seqNo a = do
+ continue <- liftIO $ send sv (ChildYield a)
+ if continue
+ then runQueueToken seqNo
+ else liftIO $ do
+ atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) -> (h, seqNo + 1)
+ sendStop sv
+
+ yieldOutput seqNo a r = do
+ continue <- liftIO $ send sv (ChildYield a)
+ if continue
+ then (runStream r) (Just sv) (runQueueToken seqNo)
+ (singleOutput seqNo)
+ (yieldOutput seqNo)
+ else liftIO $ do
+ atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) ->
+ (H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo)
+ sendStop sv
+
+ {-# INLINE runQueueToken #-}
+ runQueueToken prevSeqNo = do
+ work <- dequeue
+ case work of
+ Nothing -> do
+ liftIO $ atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) ->
+ (h, prevSeqNo + 1)
+ runHeap
+ Just (m, seqNo) -> do
+ if seqNo == prevSeqNo + 1
+ then
+ (runStream m) (Just sv) (runQueueToken seqNo)
+ (singleOutput seqNo)
+ (yieldOutput seqNo)
+ else do
+ liftIO $ atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) ->
+ (h, prevSeqNo + 1)
+ (runStream m) (Just sv) runHeap
+ (singleToHeap seqNo)
+ (yieldToHeap seqNo)
+ runQueueNoToken = do
+ work <- dequeue
+ case work of
+ Nothing -> runHeap
+ Just (m, seqNo) -> do
+ if seqNo == 0
+ then
+ (runStream m) (Just sv) (runQueueToken seqNo)
+ (singleOutput seqNo)
+ (yieldOutput seqNo)
+ else
+ (runStream m) (Just sv) runHeap
+ (singleToHeap seqNo)
+ (yieldToHeap seqNo)
+
+ {-# NOINLINE runHeap #-}
+ runHeap = do
+#ifdef DIAGNOSTICS
+ liftIO $ do
+ maxHp <- readIORef (maxHeapSize sv)
+ (hp, _) <- readIORef (outputHeap sv)
+ when (H.size hp > maxHp) $ writeIORef (maxHeapSize sv) (H.size hp)
+#endif
+ ent <- liftIO $ dequeueFromHeap (outputHeap sv)
+ case ent of
+ Nothing -> do
+ done <- queueEmpty sv
+ if done
+ then liftIO $ sendStop sv
+ else runQueueNoToken
+ Just (Entry seqNo hent) -> do
+ case hent of
+ AheadEntryPure a -> singleOutput seqNo a
+ AheadEntryStream r ->
+ (runStream r) (Just sv) (runQueueToken seqNo)
+ (singleOutput seqNo)
+ (yieldOutput seqNo)
+
+ dequeue = liftIO $ do
+ atomicModifyIORefCAS q $ \case
+ ([], n) -> (([], n), Nothing)
+ (x : [], n) -> (([], n), Just (x, n))
+ _ -> error "more than one item on queue"
+
+ dequeueFromHeap
+ :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Int)
+ -> IO (Maybe (Entry Int (AheadHeapEntry m a)))
+ dequeueFromHeap hpRef = do
+ atomicModifyIORefCAS hpRef $ \hp@(h, snum) -> do
+ let r = H.uncons h
+ case r of
+ Nothing -> (hp, Nothing)
+ Just (ent@(Entry seqNo _ev), hp') ->
+ if (seqNo == snum)
+ then ((hp', seqNo), Just ent)
+ else (hp, Nothing)
+
+-------------------------------------------------------------------------------
+-- WAhead
+-------------------------------------------------------------------------------
+
+-- XXX To be implemented. Use a linked queue like WAsync and put back the
+-- remaining computation at the back of the queue instead of the heap, and
+-- increment the sequence number.
-- Thread tracking is needed for two reasons:
--
@@ -411,17 +784,17 @@ addThread :: MonadIO m => SVar m a -> ThreadId -> m ()
addThread sv tid =
liftIO $ modifyIORef (runningThreads sv) (S.insert tid)
-{-
+-- This is cheaper than modifyThread because we do not have to send a doorBell
+-- This can make a difference when more workers are being dispatched.
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar m a -> ThreadId -> m ()
delThread sv tid =
liftIO $ modifyIORef (runningThreads sv) $ (\s -> S.delete tid s)
--}
-- If present then delete else add. This takes care of out of order add and
-- delete i.e. a delete arriving before we even added a thread.
-- This occurs when the forked thread is done even before the 'addThread' right
--- after the fork is executed.
+-- after the fork gets a chance to run.
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar m a -> ThreadId -> m ()
modifyThread sv tid = do
@@ -430,7 +803,9 @@ modifyThread sv tid = do
then let new = (S.delete tid old) in (new, new)
else let new = (S.insert tid old) in (new, old)
if null changed
- then liftIO $ void $ tryPutMVar (doorBell sv) ()
+ then liftIO $ do
+ writeBarrier
+ void $ tryPutMVar (doorBell sv) ()
else return ()
-- | This is safe even if we are adding more threads concurrently because if
@@ -441,15 +816,27 @@ allThreadsDone :: MonadIO m => SVar m a -> m Bool
allThreadsDone sv = liftIO $ S.null <$> readIORef (runningThreads sv)
{-# NOINLINE handleChildException #-}
-handleChildException :: MonadIO m => SVar m a -> SomeException -> m ()
+handleChildException :: SVar m a -> SomeException -> IO ()
handleChildException sv e = do
- tid <- liftIO myThreadId
- send sv (ChildStop tid (Just e))
+ tid <- myThreadId
+ void $ send sv (ChildStop tid (Just e))
+
+#ifdef DIAGNOSTICS
+recordMaxWorkers :: MonadIO m => SVar m a -> m ()
+recordMaxWorkers sv = liftIO $ do
+ active <- readIORef (activeWorkers sv)
+ maxWrk <- readIORef (maxWorkers sv)
+ when (active > maxWrk) $ writeIORef (maxWorkers sv) active
+ modifyIORef (totalDispatches sv) (+1)
+#endif
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => SVar m a -> m ()
pushWorker sv = do
liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n + 1
+#ifdef DIAGNOSTICS
+ recordMaxWorkers sv
+#endif
doFork (runqueue sv) (handleChildException sv) >>= addThread sv
-- | In contrast to pushWorker which always happens only from the consumer
@@ -459,102 +846,248 @@ pushWorker sv = do
-- using a CAS based modification.
{-# NOINLINE pushWorkerPar #-}
pushWorkerPar :: MonadAsync m => SVar m a -> Stream m a -> m ()
-pushWorkerPar sv m =
+pushWorkerPar sv m = do
+ -- We do not use activeWorkers in case of ParallelVar but still there is no
+ -- harm in maintaining it correctly.
+#ifdef DIAGNOSTICS
+ liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n + 1
+ recordMaxWorkers sv
+#endif
doFork (runOne sv m) (handleChildException sv) >>= modifyThread sv
--- XXX When the queue is LIFO we can put a limit on the number of dispatches.
--- Also, if a worker blocks on the output queue we can decide if we want to
--- block or make it go away entirely, depending on the number of workers and
--- the type of the queue.
-{-# INLINE sendWorkerWait #-}
+{-# INLINE workDone #-}
+workDone :: MonadIO m => SVar m a -> m Bool
+workDone sv = do
+ heapDone <-
+ if (svarStyle sv == AheadVar)
+ then do
+ (hp, _) <- liftIO $ readIORef (outputHeap sv)
+ return (H.size hp <= 0)
+ else return True
+ queueDone <- queueEmpty sv
+ return $ queueDone && heapDone
+
+maxWorkerLimit :: Int
+maxWorkerLimit = 1500
+
+dispatchWorker :: MonadAsync m => SVar m a -> m ()
+dispatchWorker sv = do
+ done <- workDone sv
+ when (not done) $ do
+ -- Note that the worker count is only decremented during event
+ -- processing in fromStreamVar and therefore it is safe to read and
+ -- use it without a lock.
+ cnt <- liftIO $ readIORef $ activeWorkers sv
+ -- Note that we may deadlock if the previous workers (tasks in the
+ -- stream) wait/depend on the future workers (tasks in the stream)
+ -- executing. In that case we should either configure the maxWorker
+ -- count to higher or use parallel style instead of ahead or async
+ -- style.
+ when (cnt < maxWorkerLimit) $ pushWorker sv
+
+{-# NOINLINE sendWorkerWait #-}
sendWorkerWait :: MonadAsync m => SVar m a -> m ()
sendWorkerWait sv = do
- -- When there is no output seen we dispatch more workers to help out if
- -- there is work pending in the work queue. But we wait a little while
- -- and check the output again so that we are not too aggressive.
- -- If there is no output pending to process and there is no worker to be
- -- sent then we block, so that we do not keep looping fruitlessly.
+ -- Note that we are guaranteed to have at least one outstanding worker when
+ -- we enter this function. So if we sleep we are guaranteed to be woken up
+ -- by a doorBell, when the worker exits.
+
+ -- XXX we need a better way to handle this than hardcoded delays. The
+ -- delays may be different for different systems.
+ ncpu <- liftIO $ getNumCapabilities
+ if ncpu <= 1
+ then
+ if (svarStyle sv == AheadVar)
+ then liftIO $ threadDelay 100
+ else liftIO $ threadDelay 25
+ else
+ if (svarStyle sv == AheadVar)
+ then liftIO $ threadDelay 100
+ else liftIO $ threadDelay 10
- liftIO $ threadDelay 200
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
- done <- queueEmpty sv
- if not done
+ -- The queue may be empty temporarily if the worker has dequeued the
+ -- work item but has not enqueued the remaining part yet. For the same
+ -- reason, a worker may come back if it tries to dequeue and finds the
+ -- queue empty, even though the whole work has not finished yet.
+
+ -- If we find that the queue is empty, but it may be empty
+ -- temporarily, when we checked it. If that's the case we might
+ -- sleep indefinitely unless the active workers produce some
+ -- output. We may deadlock specially if the otuput from the active
+ -- workers depends on the future workers that we may never send.
+ -- So in case the queue was temporarily empty set a flag to inform
+ -- the enqueue to send us a doorbell.
+
+ -- Note that this is just a best effort mechanism to avoid a
+ -- deadlock. Deadlocks may still happen if for some weird reason
+ -- the consuming computation shares an MVar or some other resource
+ -- with the producing computation and gets blocked on that resource
+ -- and therefore cannot do any pushworker to add more threads to
+ -- the producer. In such cases the programmer should use a parallel
+ -- style so that all the producers are scheduled immediately and
+ -- unconditionally. We can also use a separate monitor thread to
+ -- push workers instead of pushing them from the consumer, but then
+ -- we are no longer using pull based concurrency rate adaptation.
+ --
+ -- XXX update this in the tutorial.
+
+ -- register for the doorBell before we check the queue so that if we
+ -- sleep because the queue was empty we are guaranteed to get a
+ -- doorbell on the next enqueue.
+
+ liftIO $ atomicModifyIORefCAS_ (waitingForWork sv) $ const True
+ liftIO $ storeLoadBarrier
+ dispatchWorker sv
+
+ -- XXX test for the case when we miss sending a worker when the worker
+ -- count is more than 1500.
+ --
+ -- XXX Assert here that if the heap is not empty then there is at
+ -- least one outstanding worker. Otherwise we could be sleeping
+ -- forever.
+
+ done <- workDone sv
+ if done
then do
- cnt <- liftIO $ readIORef $ activeWorkers sv
- if (cnt < 1500)
- then do
- pushWorker sv
- sendWorkerWait sv
- else liftIO $ takeMVar (doorBell sv)
- else liftIO $ takeMVar (doorBell sv)
+ liftIO $ withDBGMVar sv "sendWorkerWait: nothing to do"
+ $ takeMVar (doorBell sv)
+ (_, len) <- liftIO $ readIORef (outputQueue sv)
+ when (len <= 0) $ sendWorkerWait sv
+ else sendWorkerWait sv
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar m a -> Stream m a
fromStreamVar sv = Stream $ \_ stp sng yld -> do
- let SVarStyle _ sched = svarStyle sv
- if sched == Par
- then liftIO $ takeMVar (doorBell sv)
- else do
- res <- liftIO $ tryTakeMVar (doorBell sv)
- when (isNothing res) $ sendWorkerWait sv
+ (list, _) <-
+ -- XXX we can set this in SVar
+ if svarStyle sv == ParallelVar
+ then do
+ liftIO $ withDBGMVar sv "fromStreamVar: doorbell"
+ $ takeMVar (doorBell sv)
+ readOutputQ sv
+ else do
+ res@(_, len) <- readOutputQ sv
+ -- When there is no output seen we dispatch more workers to help
+ -- out if there is work pending in the work queue.
+ if len <= 0
+ then blockingRead
+ else do
+ -- send a worker proactively, if needed, even before we start
+ -- processing the output. This may degrade single processor
+ -- perf but improves multi-processor, because of more
+ -- parallelism
+ sendWorker
+ return res
- void $ liftIO $ tryPutMVar (siren sv) ()
- (list, _) <- liftIO $ atomicModifyIORefCAS (outputQueue sv) $ \x -> (([],0), x)
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
-- streams.
- (runStream $ processEvents (reverse list)) Nothing stp sng yld
+ runStream (processEvents $ reverse list) Nothing stp sng yld
where
- handleException e tid = do
- liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n - 1
- modifyThread sv tid
- -- XXX implement kill async exception handling
- -- liftIO $ readIORef (runningThreads sv) >>= mapM_ killThread
- throwM e
+ {-# INLINE readOutputQ #-}
+ readOutputQ svr = liftIO $ do
+ (list, len) <- atomicModifyIORefCAS (outputQueue svr) $
+ \x -> (([],0), x)
+#ifdef DIAGNOSTICS
+ oqLen <- readIORef (maxOutQSize svr)
+ when (len > oqLen) $ writeIORef (maxOutQSize svr) len
+#endif
+ return (list, len)
+
+ sendWorker = do
+ cnt <- liftIO $ readIORef $ activeWorkers sv
+ when (cnt <= 0) $ do
+ done <- workDone sv
+ when (not done) $ pushWorker sv
+
+ {-# INLINE blockingRead #-}
+ blockingRead = do
+ sendWorkerWait sv
+ readOutputQ sv
+
+ allDone stp = do
+#ifdef DIAGNOSTICS
+#ifdef DIAGNOSTICS_VERBOSE
+ svInfo <- liftIO $ dumpSVar sv
+ liftIO $ hPutStrLn stderr $ "fromStreamVar done\n" ++ svInfo
+#endif
+#endif
+ stp
{-# INLINE processEvents #-}
processEvents [] = Stream $ \_ stp sng yld -> do
- done <- allThreadsDone sv
- if not done
- then (runStream (fromStreamVar sv)) Nothing stp sng yld
- else stp
+ workersDone <- allThreadsDone sv
+ done <-
+ -- XXX we can set this in SVar
+ if svarStyle sv == ParallelVar
+ then return workersDone
+ else
+ -- There may still be work pending even if there are no workers
+ -- pending because all the workers may return if the
+ -- outputQueue becomes full. In that case send off a worker to
+ -- kickstart the work again.
+ if workersDone
+ then do
+ r <- workDone sv
+ when (not r) $ pushWorker sv
+ return r
+ else return False
+
+ if done
+ then allDone stp
+ else runStream (fromStreamVar sv) Nothing stp sng yld
processEvents (ev : es) = Stream $ \_ stp sng yld -> do
- let continue = (runStream (processEvents es)) Nothing stp sng yld
- yield a = yld a (processEvents es)
-
+ let rest = processEvents es
case ev of
- ChildYield a -> yield a
- ChildStop tid e ->
+ ChildYield a -> yld a rest
+ ChildStop tid e -> do
+ if svarStyle sv == ParallelVar
+ then modifyThread sv tid
+ else delThread sv tid
case e of
- Nothing -> do
- let active = activeWorkers sv
- liftIO $ atomicModifyIORefCAS_ active $ \n -> n - 1
- modifyThread sv tid >> continue
- Just ex -> handleException ex tid
+ Nothing -> runStream rest Nothing stp sng yld
+ Just ex -> throwM ex
getFifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getFifoSVar ctype = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
- hooter <- newEmptyMVar
active <- newIORef 0
+ wfw <- newIORef False
running <- newIORef S.empty
q <- newQ
+#ifdef DIAGNOSTICS
+ disp <- newIORef 0
+ maxWrk <- newIORef 0
+ maxOq <- newIORef 0
+ maxHs <- newIORef 0
+ maxWq <- newIORef 0
+#endif
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
- , siren = hooter
+ , outputHeap = undefined
, runningThreads = running
+ , workQueue = undefined
, runqueue = runqueueFIFO sv q
- , enqueue = pushL q
+ , enqueue = enqueueFIFO sv q
, queueEmpty = liftIO $ nullQ q
+ , waitingForWork = wfw
, svarStyle = ctype
, activeWorkers = active
+#ifdef DIAGNOSTICS
+ , totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+#endif
}
in return sv
@@ -562,21 +1095,37 @@ getLifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getLifoSVar ctype = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
- hooter <- newEmptyMVar
active <- newIORef 0
+ wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef []
+#ifdef DIAGNOSTICS
+ disp <- newIORef 0
+ maxWrk <- newIORef 0
+ maxOq <- newIORef 0
+ maxHs <- newIORef 0
+ maxWq <- newIORef 0
+#endif
let checkEmpty = null <$> liftIO (readIORef q)
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
- , siren = hooter
+ , outputHeap = undefined
, runningThreads = running
+ , workQueue = undefined
, runqueue = runqueueLIFO sv q
- , enqueue = enqueueLIFO q
+ , enqueue = enqueueLIFO sv q
, queueEmpty = checkEmpty
+ , waitingForWork = wfw
, svarStyle = ctype
, activeWorkers = active
+#ifdef DIAGNOSTICS
+ , maxWorkers = maxWrk
+ , totalDispatches = disp
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+#endif
}
in return sv
@@ -584,19 +1133,79 @@ getParSVar :: SVarStyle -> IO (SVar m a)
getParSVar style = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
- hooter <- newEmptyMVar
active <- newIORef 0
+ wfw <- newIORef False
running <- newIORef S.empty
+#ifdef DIAGNOSTICS
+ disp <- newIORef 0
+ maxWrk <- newIORef 0
+ maxOq <- newIORef 0
+ maxHs <- newIORef 0
+ maxWq <- newIORef 0
+#endif
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
- , siren = hooter
+ , outputHeap = undefined
, runningThreads = running
+ , workQueue = undefined
, runqueue = undefined
, enqueue = undefined
, queueEmpty = undefined
+ , waitingForWork = wfw
+ , svarStyle = style
+ , activeWorkers = active
+#ifdef DIAGNOSTICS
+ , totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+#endif
+ }
+ in return sv
+
+getAheadSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
+getAheadSVar style = do
+ outQ <- newIORef ([], 0)
+ outH <- newIORef (H.empty, 0)
+ outQMv <- newEmptyMVar
+ active <- newIORef 0
+ wfw <- newIORef False
+ running <- newIORef S.empty
+ q <- newIORef ([], -1)
+
+#ifdef DIAGNOSTICS
+ disp <- newIORef 0
+ maxWrk <- newIORef 0
+ maxOq <- newIORef 0
+ maxHs <- newIORef 0
+ maxWq <- newIORef 0
+#endif
+
+ let checkEmpty = liftIO $ do
+ (xs, _) <- readIORef q
+ return $ null xs
+ let sv =
+ SVar { outputQueue = outQ
+ , doorBell = outQMv
+ , outputHeap = outH
+ , runningThreads = running
+ , workQueue = q
+ , runqueue = runqueueAhead sv q
+ , enqueue = undefined
+ , queueEmpty = checkEmpty
+ , waitingForWork = wfw
, svarStyle = style
, activeWorkers = active
+
+#ifdef DIAGNOSTICS
+ , totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+#endif
}
in return sv
@@ -605,39 +1214,35 @@ newEmptySVar :: MonadAsync m => SVarStyle -> m (SVar m a)
newEmptySVar style = do
liftIO $
case style of
- SVarStyle _ FIFO -> getFifoSVar style
- SVarStyle _ LIFO -> getLifoSVar style
- SVarStyle _ Par -> getParSVar style
+ WAsyncVar -> getFifoSVar style
+ AsyncVar -> getLifoSVar style
+ ParallelVar -> getParSVar style
+ AheadVar -> getAheadSVar style
-- | Create a new SVar and enqueue one stream computation on it.
+{-# INLINABLE newStreamVar1 #-}
newStreamVar1 :: MonadAsync m => SVarStyle -> Stream m a -> m (SVar m a)
newStreamVar1 style m = do
sv <- newEmptySVar style
-- Note: We must have all the work on the queue before sending the
-- pushworker, otherwise the pushworker may exit before we even get a
-- chance to push.
- liftIO $ (enqueue sv) m
- pushWorker sv
+ if style == ParallelVar
+ then pushWorkerPar sv m
+ else do
+ liftIO $ (enqueue sv) m
+ pushWorker sv
return sv
--- | Create a new SVar and enqueue two stream computations on it.
-newStreamVar2 :: MonadAsync m
- => SVarStyle -> Stream m a -> Stream m a -> m (SVar m a)
-newStreamVar2 style m1 m2 = do
+-- | Create a new SVar and enqueue one stream computation on it.
+{-# INLINABLE newStreamVarAhead #-}
+newStreamVarAhead :: MonadAsync m => Stream m a -> m (SVar m a)
+newStreamVarAhead m = do
+ sv <- newEmptySVar AheadVar
-- Note: We must have all the work on the queue before sending the
-- pushworker, otherwise the pushworker may exit before we even get a
-- chance to push.
- sv <- liftIO $
- case style of
- SVarStyle _ FIFO -> do
- c <- getFifoSVar style
- (enqueue c) m1 >> (enqueue c) m2
- return c
- SVarStyle _ LIFO -> do
- c <- getLifoSVar style
- (enqueue c) m2 >> (enqueue c) m1
- return c
- SVarStyle _ Par -> undefined
+ liftIO $ enqueueAhead sv (workQueue sv) m
pushWorker sv
return sv
@@ -694,9 +1299,9 @@ toStreamVar sv m = do
-- execution. When we are using parallel composition, an SVar is passed around
-- as a state variable. We try to schedule a new parallel computation on the
-- SVar passed to us. The first time, when no SVar exists, a new SVar is
--- created. Subsequently, 'joinStreamVar2' may get called when a computation
+-- created. Subsequently, 'joinStreamVarAsync' may get called when a computation
-- already scheduled on the SVar is further evaluated. For example, when (a
--- `parallel` b) is evaluated it calls a 'joinStreamVar2' to put 'a' and 'b' on
+-- `parallel` b) is evaluated it calls a 'joinStreamVarAsync' to put 'a' and 'b' on
-- the current scheduler queue.
--
-- The 'SVarStyle' required by the current composition context is passed as one
@@ -715,21 +1320,32 @@ toStreamVar sv m = do
-- * When the stream is switching from disjunctive composition to conjunctive
-- composition and vice-versa we create a new SVar to isolate the scheduling
-- of the two.
---
-{-# INLINE joinStreamVar2 #-}
-joinStreamVar2 :: MonadAsync m
+
+forkSVarAsync :: MonadAsync m => SVarStyle -> Stream m a -> Stream m a -> Stream m a
+forkSVarAsync style m1 m2 = Stream $ \_ stp sng yld -> do
+ sv <- newStreamVar1 style (concurrently m1 m2)
+ (runStream (fromStreamVar sv)) Nothing stp sng yld
+ where
+ concurrently ma mb = Stream $ \svr stp sng yld -> do
+ liftIO $ enqueue (fromJust svr) mb
+ (runStream ma) svr stp sng yld
+
+{-# INLINE joinStreamVarAsync #-}
+joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
-joinStreamVar2 style m1 m2 = Stream $ \svr stp sng yld ->
+joinStreamVarAsync style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
Just sv | svarStyle sv == style ->
liftIO ((enqueue sv) m2) >> (runStream m1) svr stp sng yld
- _ -> do
- sv <- newStreamVar1 style (concurrently m1 m2)
- (runStream (fromStreamVar sv)) Nothing stp sng yld
- where
- concurrently ma mb = Stream $ \svr stp sng yld -> do
- liftIO $ (enqueue (fromJust svr)) mb
- (runStream ma) svr stp sng yld
+ _ -> runStream (forkSVarAsync style m1 m2) Nothing stp sng yld
+
+{-# NOINLINE forkSVarPar #-}
+forkSVarPar :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
+forkSVarPar m r = Stream $ \_ stp sng yld -> do
+ sv <- newEmptySVar ParallelVar
+ pushWorkerPar sv m
+ pushWorkerPar sv r
+ (runStream (fromStreamVar sv)) Nothing stp sng yld
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
@@ -738,11 +1354,34 @@ joinStreamVarPar style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
Just sv | svarStyle sv == style -> do
pushWorkerPar sv m1 >> (runStream m2) svr stp sng yld
- _ -> do
- sv <- newEmptySVar style
- pushWorkerPar sv m1
- pushWorkerPar sv m2
- (runStream (fromStreamVar sv)) Nothing stp sng yld
+ _ -> runStream (forkSVarPar m1 m2) Nothing stp sng yld
+
+forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
+forkSVarAhead m1 m2 = Stream $ \_ stp sng yld -> do
+ sv <- newStreamVarAhead (concurrently m1 m2)
+ (runStream (fromStreamVar sv)) Nothing stp sng yld
+ where
+ concurrently ma mb = Stream $ \svr stp sng yld -> do
+ liftIO $ enqueueAhead (fromJust svr) (workQueue (fromJust svr)) mb
+ (runStream ma) Nothing stp sng yld
+
+{-# INLINE ahead #-}
+ahead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
+ahead m1 m2 = Stream $ \svr stp sng yld -> do
+ case svr of
+ Just sv | svarStyle sv == AheadVar -> do
+ liftIO $ enqueueAhead sv (workQueue sv) m2
+ -- Always run the left side on a new SVar to avoid complexity in
+ -- sequencing results. This means the left side cannot further
+ -- split into more ahead computations on the same SVar.
+ (runStream m1) Nothing stp sng yld
+ _ -> runStream (forkSVarAhead m1 m2) Nothing stp sng yld
+
+-- | XXX we can implement it more efficienty by directly implementing instead
+-- of combining streams using ahead.
+{-# INLINE consMAhead #-}
+consMAhead :: MonadAsync m => m a -> Stream m a -> Stream m a
+consMAhead m r = once m `ahead` r
------------------------------------------------------------------------------
-- Semigroup and Monoid style compositions for parallel actions
@@ -750,15 +1389,33 @@ joinStreamVarPar style m1 m2 = Stream $ \svr stp sng yld ->
{-# INLINE async #-}
async :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
-async = joinStreamVar2 (SVarStyle Disjunction LIFO)
+async = joinStreamVarAsync AsyncVar
+
+-- | XXX we can implement it more efficienty by directly implementing instead
+-- of combining streams using async.
+{-# INLINE consMAsync #-}
+consMAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
+consMAsync m r = once m `async` r
{-# INLINE wAsync #-}
wAsync :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
-wAsync = joinStreamVar2 (SVarStyle Disjunction FIFO)
+wAsync = joinStreamVarAsync WAsyncVar
+
+-- | XXX we can implement it more efficienty by directly implementing instead
+-- of combining streams using wAsync.
+{-# INLINE consMWAsync #-}
+consMWAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
+consMWAsync m r = once m `wAsync` r
{-# INLINE parallel #-}
parallel :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
-parallel = joinStreamVarPar (SVarStyle Disjunction Par)
+parallel = joinStreamVarPar ParallelVar
+
+-- | XXX we can implement it more efficienty by directly implementing instead
+-- of combining streams using parallel.
+{-# INLINE consMParallel #-}
+consMParallel :: MonadAsync m => m a -> Stream m a -> Stream m a
+consMParallel m r = once m `parallel` r
-------------------------------------------------------------------------------
-- Functor instace is the same for all types
@@ -774,15 +1431,36 @@ instance Monad m => Functor (Stream m) where
-- Alternative & MonadPlus
------------------------------------------------------------------------------
-alt :: Stream m a -> Stream m a -> Stream m a
-alt m1 m2 = Stream $ \_ stp sng yld ->
+_alt :: Stream m a -> Stream m a -> Stream m a
+_alt m1 m2 = Stream $ \_ stp sng yld ->
let stop = runStream m2 Nothing stp sng yld
in runStream m1 Nothing stop sng yld
------------------------------------------------------------------------------
+-- Stream to stream function application
+------------------------------------------------------------------------------
+
+applyWith :: MonadAsync m
+ => SVarStyle -> (Stream m a -> Stream m b) -> Stream m a -> Stream m b
+applyWith style f m = Stream $ \svr stp sng yld -> do
+ sv <- newStreamVar1 style m
+ runStream (f $ fromStreamVar sv) svr stp sng yld
+
+------------------------------------------------------------------------------
+-- Stream runner function application
+------------------------------------------------------------------------------
+
+runWith :: MonadAsync m
+ => SVarStyle -> (Stream m a -> m b) -> Stream m a -> m b
+runWith style f m = do
+ sv <- newStreamVar1 style m
+ f $ fromStreamVar sv
+
+------------------------------------------------------------------------------
-- Zipping
------------------------------------------------------------------------------
+{-# INLINE zipWith #-}
zipWith :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith f m1 m2 = go m1 m2
where
@@ -795,10 +1473,7 @@ zipWith f m1 m2 = go m1 m2
yield1 a ra = merge a ra
(runStream mx) Nothing stp single1 yield1
-mkAsync :: MonadAsync m => Stream m a -> m (Stream m a)
-mkAsync m = newStreamVar1 (SVarStyle Disjunction LIFO) m
- >>= return . fromStreamVar
-
+{-# INLINE zipAsyncWith #-}
zipAsyncWith :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWith f m1 m2 = Stream $ \_ stp sng yld -> do
@@ -806,27 +1481,15 @@ zipAsyncWith f m1 m2 = Stream $ \_ stp sng yld -> do
mb <- mkAsync m2
(runStream (zipWith f ma mb)) Nothing stp sng yld
+ where
+
+ mkAsync :: MonadAsync m => Stream m a -> m (Stream m a)
+ mkAsync m = newStreamVar1 AsyncVar m
+ >>= return . fromStreamVar
+
-------------------------------------------------------------------------------
-- Transformers
-------------------------------------------------------------------------------
instance MonadTrans Stream where
lift = once
-
-withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
-withLocal f m =
- Stream $ \_ stp sng yld ->
- let single = local f . sng
- yield a r = local f $ yld a (withLocal f r)
- in (runStream m) Nothing (local f stp) single yield
-
--- XXX handle and test cross thread state transfer
-withCatchError
- :: MonadError e m
- => Stream m a -> (e -> Stream m a) -> Stream m a
-withCatchError m h =
- Stream $ \_ stp sng yld ->
- let run x = runStream x Nothing stp sng yield
- handle r = r `catchError` \e -> run $ h e
- yield a r = yld a (withCatchError r h)
- in handle $ run m
diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs
index f791ec1..247609c 100644
--- a/src/Streamly/Prelude.hs
+++ b/src/Streamly/Prelude.hs
@@ -26,6 +26,10 @@
-- provided for convenience and for consistency with other pure APIs in the
-- @base@ package.
--
+-- Functions having a 'MonadAsync' constraint work concurrently when used with
+-- appropriate stream type combinator. Please be careful to not use 'parallely'
+-- with infinite streams.
+--
-- Deconstruction and folds accept a 'SerialT' type instead of a polymorphic
-- type to ensure that streams always have a concrete monomorphic type by
-- default, reducing type errors. In case you want to use any other type of
@@ -42,7 +46,7 @@ module Streamly.Prelude
, cons
, (.:)
- -- * General Unfold
+ -- * Generation by Unfolding
, unfoldr
, unfoldrM
@@ -55,11 +59,12 @@ module Streamly.Prelude
, iterate
, iterateM
, fromFoldable
+ , fromFoldableM
-- * Deconstruction
, uncons
- -- * Folding
+ -- * Elimination by Folding
-- ** General Folds
, foldr
, foldrM
@@ -101,6 +106,8 @@ module Streamly.Prelude
-- * Mapping
, mapM
+ , mapMaybe
+ , mapMaybeM
, sequence
-- * Zipping
@@ -124,6 +131,7 @@ where
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Semigroup (Semigroup(..))
+import Data.Maybe (isJust, fromJust)
import Prelude hiding (filter, drop, dropWhile, take,
takeWhile, zipWith, foldr, foldl,
mapM, mapM_, sequence, all, any,
@@ -142,9 +150,24 @@ import Streamly.Streams
-- Construction
------------------------------------------------------------------------------
--- | Build a Stream by unfolding pure steps starting from a seed.
+-- | Build a stream by unfolding a /pure/ step function starting from a seed.
+-- The step function returns the next element in the stream and the next seed
+-- value. When it is done it returns 'Nothing' and the stream ends. For
+-- example,
+--
+-- @
+-- let f b =
+-- if b > 3
+-- then Nothing
+-- else Just (b, b + 1)
+-- in toList $ unfoldr f 0
+-- @
+-- @
+-- [0,1,2,3]
+-- @
--
-- @since 0.1.0
+{-# INLINE unfoldr #-}
unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a
unfoldr step = fromStream . go
where
@@ -153,25 +176,69 @@ unfoldr step = fromStream . go
Nothing -> stp
Just (a, b) -> yld a (go b)
--- | Build a Stream by unfolding monadic steps starting from a seed.
+-- | Build a stream by unfolding a /monadic/ step function starting from a
+-- seed. The step function returns the next element in the stream and the next
+-- seed value. When it is done it returns 'Nothing' and the stream ends. For
+-- example,
--
--- @since 0.1.0
-unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
-unfoldrM step = fromStream . go
+-- @
+-- let f b =
+-- if b > 3
+-- then return Nothing
+-- else print b >> return (Just (b, b + 1))
+-- in runStream $ unfoldrM f 0
+-- @
+-- @
+-- 0
+-- 1
+-- 2
+-- 3
+-- @
+-- When run concurrently, the next unfold step can run concurrently with the
+-- processing of the output of the previous step. Note that more than one step
+-- cannot run concurrently as the next step depends on the output of the
+-- previous step.
+--
+-- @
+-- (asyncly $ S.unfoldrM (\\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0)
+-- & S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
+-- @
+--
+-- /Concurrent/
+--
+-- /Since: 0.1.0/
+{-# INLINE unfoldrM #-}
+unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM step = go
where
- go s = Stream $ \_ stp _ yld -> do
+ go s = fromStream $ Stream $ \svr stp sng yld -> do
mayb <- step s
case mayb of
Nothing -> stp
- Just (a, b) -> yld a (go b)
+ Just (a, b) ->
+ S.runStream (toStream (return a |: go b)) svr stp sng yld
--- | Construct a stream from a 'Foldable' container.
+-- | Construct a stream from a 'Foldable' containing pure values.
--
-- @since 0.2.0
{-# INLINE fromFoldable #-}
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
fromFoldable = Prelude.foldr cons nil
+-- | Construct a stream from a 'Foldable' containing monadic actions.
+--
+-- @
+-- runStream $ serially $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1)
+-- runStream $ asyncly $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1)
+-- @
+--
+-- /Concurrent (do not use with 'parallely' on infinite containers)/
+--
+-- @since 0.3.0
+{-# INLINE fromFoldableM #-}
+fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
+fromFoldableM = Prelude.foldr consM nil
+
-- | Same as 'fromFoldable'.
--
-- @since 0.1.0
@@ -195,23 +262,33 @@ once = fromStream . S.once
-- | Generate a stream by performing a monadic action @n@ times.
--
+--
+-- @
+-- runStream $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
+-- runStream $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
+-- @
+--
+-- /Concurrent/
+--
-- @since 0.1.1
-replicateM :: (IsStream t, Monad m) => Int -> m a -> t m a
-replicateM n m = fromStream $ go n
+replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
+replicateM n m = go n
where
- go cnt = Stream $ \_ stp _ yld ->
- if cnt <= 0
- then stp
- else m >>= \a -> yld a (go (cnt - 1))
+ go cnt = if cnt <= 0 then nil else m |: go (cnt - 1)
-- | Generate a stream by repeatedly executing a monadic action forever.
--
+-- @
+-- runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
+-- runStream $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
+-- @
+--
+-- /Concurrent, infinite (do not use with 'parallely')/
+--
-- @since 0.2.0
-repeatM :: (IsStream t, Monad m) => m a -> t m a
-repeatM = fromStream . go
- where
- go m = Stream $ \_ _ _ yld ->
- m >>= \a -> yld a (go m)
+repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
+repeatM = go
+ where go m = m |: go m
-- | Iterate a pure function from a seed value, streaming the results forever.
--
@@ -224,13 +301,28 @@ iterate step = fromStream . go
-- | Iterate a monadic function from a seed value, streaming the results
-- forever.
--
+-- When run concurrently, the next iteration can run concurrently with the
+-- processing of the previous iteration. Note that more than one iteration
+-- cannot run concurrently as the next iteration depends on the output of the
+-- previous iteration.
+--
+-- @
+-- runStream $ serially $ S.take 10 $ S.iterateM
+-- (\\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0
+--
+-- runStream $ asyncly $ S.take 10 $ S.iterateM
+-- (\\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0
+-- @
+--
+-- /Concurrent/
+--
-- @since 0.1.2
-iterateM :: (IsStream t, Monad m) => (a -> m a) -> a -> t m a
-iterateM step = fromStream . go
+iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> a -> t m a
+iterateM step = go
where
- go s = Stream $ \_ _ _ yld -> do
- a <- step s
- yld s (go a)
+ go s = fromStream $ Stream $ \svr stp sng yld -> do
+ next <- step s
+ S.runStream (toStream (return s |: go next)) svr stp sng yld
-- | Read lines from an IO Handle into a stream of Strings.
--
@@ -294,7 +386,8 @@ foldrM step acc m = go (toStream m)
-- @since 0.2.0
{-# INLINE scanx #-}
scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
-scanx step begin done m = cons (done begin) $ fromStream $ go (toStream m) begin
+scanx step begin done m =
+ cons (done begin) $ fromStream $ go (toStream m) begin
where
go m1 !acc = Stream $ \_ stp sng yld ->
let single a = sng (done $ step acc a)
@@ -385,8 +478,8 @@ foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b
foldlM' step begin m = foldxM step (return begin) return m
-- | Decompose a stream into its head and tail. If the stream is empty, returns
--- 'Nothing'. If the stream is non-empty, returns 'Just (a, ma)', where 'a' is
--- the head of the stream and 'ma' its tail.
+-- 'Nothing'. If the stream is non-empty, returns @Just (a, ma)@, where @a@ is
+-- the head of the stream and @ma@ its tail.
--
-- @since 0.1.0
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
@@ -644,21 +737,57 @@ maximum m = go Nothing (toStream m)
-- Transformation
------------------------------------------------------------------------------
--- XXX Parallel variants of these? mapMWith et al. sequenceWith.
-
-- | Replace each element of the stream with the result of a monadic action
-- applied on the element.
--
+-- @
+-- runStream $ S.replicateM 10 (return 1)
+-- & (serially . S.mapM (\\x -> threadDelay 1000000 >> print x))
+--
+-- runStream $ S.replicateM 10 (return 1)
+-- & (asyncly . S.mapM (\\x -> threadDelay 1000000 >> print x))
+-- @
+--
+-- /Concurrent (do not use with 'parallely' on infinite streams)/
+--
-- @since 0.1.0
{-# INLINE mapM #-}
-mapM :: (IsStream t, Monad m) => (a -> m b) -> t m a -> t m b
-mapM f m = fromStream $ go (toStream m)
+mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
+mapM f m = go (toStream m)
where
- go m1 = Stream $ \_ stp sng yld ->
+ go m1 = fromStream $ Stream $ \svr stp sng yld ->
let single a = f a >>= sng
- yield a r = f a >>= \b -> yld b (go r)
+ yield a r = S.runStream (toStream (f a |: (go r))) svr stp sng yld
in (S.runStream m1) Nothing stp single yield
+-- | Map a 'Maybe' returning function to a stream, filter out the 'Nothing'
+-- elements, and return a stream of values extracted from 'Just'.
+--
+-- @since 0.3.0
+{-# INLINE mapMaybe #-}
+mapMaybe :: (IsStream t) => (a -> Maybe b) -> t m a -> t m b
+mapMaybe f m = go (toStream m)
+ where
+ go m1 = fromStream $ Stream $ \_ stp sng yld ->
+ let single a = case f a of
+ Just b -> sng b
+ Nothing -> stp
+ yield a r = case f a of
+ Just b -> yld b (toStream $ go r)
+ Nothing -> (S.runStream r) Nothing stp single yield
+ in (S.runStream m1) Nothing stp single yield
+
+-- | Like 'mapMaybe' but maps a monadic function.
+--
+-- /Concurrent (do not use with 'parallely' on infinite streams)/
+--
+-- @since 0.3.0
+{-# INLINE mapMaybeM #-}
+mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m))
+ => (a -> m (Maybe b)) -> t m a -> t m b
+mapMaybeM f = fmap fromJust . filter isJust . mapM f
+
+-- XXX this can utilize parallel mapping if we implement it as runStream . mapM
-- | Apply a monadic action to each element of the stream and discard the
-- output of the action.
--
@@ -675,13 +804,23 @@ mapM_ f m = go (toStream m)
-- | Reduce a stream of monadic actions to a stream of the output of those
-- actions.
--
+-- @
+-- runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
+-- & (serially . S.sequence)
+--
+-- runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
+-- & (asyncly . S.sequence)
+-- @
+--
+-- /Concurrent (do not use with 'parallely' on infinite streams)/
+--
-- @since 0.1.0
-sequence :: (IsStream t, Monad m) => t m (m a) -> t m a
-sequence m = fromStream $ go (toStream m)
+sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
+sequence m = go (toStream m)
where
- go m1 = Stream $ \_ stp sng yld ->
+ go m1 = fromStream $ Stream $ \svr stp sng yld ->
let single ma = ma >>= sng
- yield ma r = ma >>= \b -> yld b (go r)
+ yield ma r = S.runStream (toStream $ ma |: go r) svr stp sng yld
in (S.runStream m1) Nothing stp single yield
------------------------------------------------------------------------------
diff --git a/src/Streamly/Streams.hs b/src/Streamly/Streams.hs
index e9bd427..2281bc2 100644
--- a/src/Streamly/Streams.hs
+++ b/src/Streamly/Streams.hs
@@ -3,6 +3,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
+{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StandaloneDeriving #-}
@@ -24,19 +25,10 @@ module Streamly.Streams
, Streaming -- deprecated
, S.MonadAsync
- -- * SVars
- , SVarSched (..)
- , SVarTag (..)
- , SVarStyle (..)
- , SVar
- , S.newEmptySVar
-
-- * Construction
, nil
, cons
, (.:)
- , consM
- , (|:)
, streamBuild
, fromCallback
, fromSVar
@@ -49,10 +41,15 @@ module Streamly.Streams
-- * Transformation
, mkAsync
+ , (|$)
+ , (|&)
+ , (|$.)
+ , (|&.)
-- * Merging Streams
, serial
, wSerial
+ , ahead
, async
, wAsync
, parallel
@@ -62,6 +59,7 @@ module Streamly.Streams
-- * IO Streams
, Serial
, WSerial
+ , Ahead
, Async
, WAsync
, Parallel
@@ -73,6 +71,7 @@ module Streamly.Streams
, StreamT -- deprecated
, WSerialT
, InterleavedT -- deprecated
+ , AheadT
, AsyncT
, WAsyncT
, ParallelT
@@ -84,6 +83,7 @@ module Streamly.Streams
, serially -- deprecated
, wSerially
, interleaving -- deprecated
+ , aheadly
, asyncly
, wAsyncly
, parallely
@@ -117,15 +117,17 @@ import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Data.Semigroup (Semigroup(..))
-import Streamly.Core ( MonadAsync
- , SVar, SVarStyle(..)
- , SVarTag(..), SVarSched(..))
+import Streamly.Core ( MonadAsync , SVar,
+ SVarStyle(..))
import qualified Streamly.Core as S
------------------------------------------------------------------------------
-- Types that can behave as a Stream
------------------------------------------------------------------------------
+infixr 5 `consM`
+infixr 5 |:
+
-- | Class of types that can represent a stream of elements of some type 'a' in
-- some monad 'm'.
--
@@ -133,6 +135,42 @@ import qualified Streamly.Core as S
class IsStream t where
toStream :: t m a -> S.Stream m a
fromStream :: S.Stream m a -> t m a
+ -- | Constructs a stream by adding a monadic action at the head of an
+ -- existing stream. For example:
+ --
+ -- @
+ -- > toList $ getLine \`consM` getLine \`consM` nil
+ -- hello
+ -- world
+ -- ["hello","world"]
+ -- @
+ --
+ -- /Concurrent (do not use 'parallely' to construct infinite streams)/
+ --
+ -- @since 0.2.0
+ consM :: MonadAsync m => m a -> t m a -> t m a
+ -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@"
+ -- to remember that @|@ comes before ':'.
+ --
+ -- @
+ -- > toList $ getLine |: getLine |: nil
+ -- hello
+ -- world
+ -- ["hello","world"]
+ -- @
+ --
+ -- @
+ -- let delay = threadDelay 1000000 >> print 1
+ -- runStream $ serially $ delay |: delay |: delay |: nil
+ -- runStream $ parallely $ delay |: delay |: delay |: nil
+ -- @
+ --
+ -- /Concurrent (do not use 'parallely' to construct infinite streams)/
+ --
+ -- @since 0.2.0
+ (|:) :: MonadAsync m => m a -> t m a -> t m a
+ -- We can define (|:) just as 'consM' but it is defined explicitly for each
+ -- type because we want to use SPECIALIZE pragma on the definition.
-- | Same as 'IsStream'.
--
@@ -155,8 +193,6 @@ type Streaming = IsStream
nil :: IsStream t => t m a
nil = fromStream S.nil
-infixr 5 `consM`
-
-- | Constructs a stream by adding a monadic action at the head of an existing
-- stream. For example:
--
@@ -168,28 +204,13 @@ infixr 5 `consM`
-- @
--
-- @since 0.2.0
-consM :: (IsStream t, Monad m) => m a -> t m a -> t m a
-consM m r = fromStream $ S.consM m (toStream r)
-
-infixr 5 |:
-
--- | Operator equivalent of 'consM'.
---
--- @
--- > toList $ getLine |: getLine |: nil
--- hello
--- world
--- ["hello","world"]
--- @
---
--- @since 0.2.0
-(|:) :: (IsStream t, Monad m) => m a -> t m a -> t m a
-(|:) = consM
+consMSerial :: (IsStream t, Monad m) => m a -> t m a -> t m a
+consMSerial m r = fromStream $ S.consM m (toStream r)
infixr 5 `cons`
-- | Construct a stream by adding a pure value at the head of an existing
--- stream. Same as @consM . return@. For example:
+-- stream. For pure values it can be faster than 'consM'. For example:
--
-- @
-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
@@ -296,9 +317,95 @@ toSVar sv m = S.toStreamVar sv (toStream m)
-- @since 0.2.0
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkAsync m = do
- sv <- S.newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m)
+ sv <- S.newStreamVar1 AsyncVar (toStream m)
return $ fromSVar sv
+{-# INLINE applyWith #-}
+applyWith :: (IsStream t, MonadAsync m)
+ => SVarStyle -> (t m a -> t m b) -> t m a -> t m b
+applyWith style f x = fromStream $
+ S.applyWith style (toStream . f . fromStream) (toStream x)
+
+{-# INLINE runWith #-}
+runWith :: (IsStream t, MonadAsync m)
+ => SVarStyle -> (t m a -> m b) -> t m a -> m b
+runWith style f x = S.runWith style (f . fromStream) (toStream x)
+
+infixr 0 |$
+infixr 0 |$.
+
+infixl 1 |&
+infixl 1 |&.
+
+-- | Parallel function application operator for streams; just like the regular
+-- function application operator '$' except that it is concurrent. The
+-- following code prints a value every second even though each stage adds a 1
+-- second delay.
+--
+--
+-- @
+-- runStream $
+-- S.mapM (\\x -> threadDelay 1000000 >> print x)
+-- |$ S.repeatM (threadDelay 1000000 >> return 1)
+-- @
+--
+-- /Concurrent/
+--
+-- @since 0.3.0
+{-# INLINE (|$) #-}
+(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
+f |$ x = applyWith ParallelVar f x
+
+-- | Parallel reverse function application operator for streams; just like the
+-- regular reverse function application operator '&' except that it is
+-- concurrent.
+--
+-- @
+-- runStream $
+-- S.repeatM (threadDelay 1000000 >> return 1)
+-- |& S.mapM (\\x -> threadDelay 1000000 >> print x)
+-- @
+--
+-- /Concurrent/
+--
+-- @since 0.3.0
+{-# INLINE (|&) #-}
+(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
+x |& f = f |$ x
+
+-- | Parallel function application operator; applies a @run@ or @fold@ function
+-- to a stream such that the fold consumer and the stream producer run in
+-- parallel. A @run@ or @fold@ function reduces the stream to a value in the
+-- underlying monad. The @.@ at the end of the operator is a mnemonic for
+-- termination of the stream.
+--
+-- @
+-- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
+-- |$. S.repeatM (threadDelay 1000000 >> return 1)
+-- @
+--
+-- /Concurrent/
+--
+-- @since 0.3.0
+{-# INLINE (|$.) #-}
+(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
+f |$. x = runWith ParallelVar f x
+
+-- | Parallel reverse function application operator for applying a run or fold
+-- functions to a stream. Just like '|$.' except that the operands are reversed.
+--
+-- @
+-- S.repeatM (threadDelay 1000000 >> return 1)
+-- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
+-- @
+--
+-- /Concurrent/
+--
+-- @since 0.3.0
+{-# INLINE (|&.) #-}
+(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
+x |&. f = f |$. x
+
------------------------------------------------------------------------------
-- CPP macros for common instances
------------------------------------------------------------------------------
@@ -306,6 +413,26 @@ mkAsync m = do
-- XXX use template haskell instead and include Monoid and IsStream instances
-- as well.
+withLocal :: MonadReader r m => (r -> r) -> S.Stream m a -> S.Stream m a
+withLocal f m =
+ S.Stream $ \_ stp sng yld ->
+ let single = local f . sng
+ yield a r = local f $ yld a (withLocal f r)
+ in (S.runStream m) Nothing (local f stp) single yield
+
+{-
+-- XXX handle and test cross thread state transfer
+withCatchError
+ :: MonadError e m
+ => S.Stream m a -> (e -> S.Stream m a) -> S.Stream m a
+withCatchError m h =
+ S.Stream $ \_ stp sng yld ->
+ let run x = S.runStream x Nothing stp sng yield
+ handle r = r `catchError` \e -> run $ h e
+ yield a r = yld a (withCatchError r h)
+ in handle $ run m
+-}
+
#define MONADPARALLEL , MonadAsync m
#define MONAD_APPLICATIVE_INSTANCE(STREAM,CONSTRAINT) \
@@ -327,12 +454,12 @@ instance (MonadThrow m CONSTRAINT) => MonadThrow (STREAM m) where { \
instance (MonadError e m CONSTRAINT) => MonadError e (STREAM m) where { \
throwError = lift . throwError; \
catchError m h = \
- fromStream $ S.withCatchError (toStream m) (\e -> toStream $ h e) }; \
+ fromStream $ withCatchError (toStream m) (\e -> toStream $ h e) }; \
-} \
\
instance (MonadReader r m CONSTRAINT) => MonadReader r (STREAM m) where { \
ask = lift ask; \
- local f m = fromStream $ S.withLocal f (toStream m) }; \
+ local f m = fromStream $ withLocal f (toStream m) }; \
\
instance (MonadState s m CONSTRAINT) => MonadState s (STREAM m) where { \
get = lift get; \
@@ -349,6 +476,9 @@ instance (MonadState s m CONSTRAINT) => MonadState s (STREAM m) where { \
-- all elements from the second stream.
--
-- @
+-- import Streamly
+-- import qualified "Streamly.Prelude" as S
+--
-- main = ('toList' . 'serially' $ (fromFoldable [1,2]) \<\> (fromFoldable [3,4])) >>= print
-- @
-- @
@@ -361,7 +491,7 @@ instance (MonadState s m CONSTRAINT) => MonadState s (STREAM m) where { \
-- @
-- main = 'runStream' . 'serially' $ do
-- x <- return 1 \<\> return 2
--- liftIO $ print x
+-- S.once $ print x
-- @
-- @
-- 1
@@ -374,7 +504,7 @@ instance (MonadState s m CONSTRAINT) => MonadState s (STREAM m) where { \
-- main = 'runStream' . 'serially' $ do
-- x <- return 1 \<\> return 2
-- y <- return 3 \<\> return 4
--- liftIO $ print (x, y)
+-- S.once $ print (x, y)
-- @
-- @
-- (1,3)
@@ -408,6 +538,16 @@ instance IsStream SerialT where
toStream = getSerialT
fromStream = SerialT
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> SerialT IO a -> SerialT IO a #-}
+ consM :: Monad m => m a -> SerialT m a -> SerialT m a
+ consM = consMSerial
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> SerialT IO a -> SerialT IO a #-}
+ (|:) :: Monad m => m a -> SerialT m a -> SerialT m a
+ (|:) = consMSerial
+
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
@@ -450,6 +590,9 @@ MONAD_COMMON_INSTANCES(SerialT,)
-- two streams, yielding one element from each stream alternately.
--
-- @
+-- import Streamly
+-- import qualified "Streamly.Prelude" as S
+--
-- main = ('toList' . 'wSerially' $ (fromFoldable [1,2]) \<\> (fromFoldable [3,4])) >>= print
-- @
-- @
@@ -464,7 +607,7 @@ MONAD_COMMON_INSTANCES(SerialT,)
-- main = 'runStream' . 'wSerially' $ do
-- x <- return 1 \<\> return 2
-- y <- return 3 \<\> return 4
--- liftIO $ print (x, y)
+-- S.once $ print (x, y)
-- @
-- @
-- (1,3)
@@ -490,6 +633,16 @@ instance IsStream WSerialT where
toStream = getWSerialT
fromStream = WSerialT
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
+ consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
+ consM = consMSerial
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
+ (|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
+ (|:) = consMSerial
+
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
@@ -543,6 +696,122 @@ MONAD_APPLICATIVE_INSTANCE(WSerialT,)
MONAD_COMMON_INSTANCES(WSerialT,)
------------------------------------------------------------------------------
+-- AheadT
+------------------------------------------------------------------------------
+
+-- | Deep ahead composition or ahead composition with depth first traversal.
+-- The semigroup composition of 'AheadT' appends streams in a depth first
+-- manner just like 'SerialT' except that it can produce elements concurrently
+-- ahead of time. It is like 'AsyncT' except that 'AsyncT' produces the output
+-- as it arrives whereas 'AheadT' orders the output in the traversal order.
+--
+-- @
+-- main = ('toList' . 'aheadly' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print
+-- @
+-- @
+-- [1,2,3,4]
+-- @
+--
+-- Any exceptions generated by a constituent stream are propagated to the
+-- output stream.
+--
+-- Similarly, the monad instance of 'AheadT' may run each iteration
+-- concurrently ahead of time but presents the results in the same order as
+-- 'SerialT'.
+--
+-- @
+-- import "Streamly"
+-- import qualified "Streamly.Prelude" as S
+-- import Control.Concurrent
+--
+-- main = 'runStream' . 'aheadly' $ do
+-- n <- return 3 \<\> return 2 \<\> return 1
+-- S.once $ do
+-- threadDelay (n * 1000000)
+-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
+-- @
+-- @
+-- ThreadId 40: Delay 1
+-- ThreadId 39: Delay 2
+-- ThreadId 38: Delay 3
+-- @
+--
+-- All iterations may run in the same thread if they do not block.
+--
+-- Note that ahead composition with depth first traversal can be used to
+-- combine infinite number of streams as it explores only a bounded number of
+-- streams at a time.
+--
+-- @since 0.3.0
+newtype AheadT m a = AheadT {getAheadT :: S.Stream m a}
+ deriving (Functor, MonadTrans)
+
+instance IsStream AheadT where
+ toStream = getAheadT
+ fromStream = AheadT
+
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
+ consM m r = fromStream $ S.consMAhead m (toStream r)
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> AheadT IO a -> AheadT IO a #-}
+ (|:) = consM
+
+------------------------------------------------------------------------------
+-- Semigroup
+------------------------------------------------------------------------------
+
+-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AheadT'.
+-- Merges two streams sequentially but with concurrent lookahead.
+--
+-- @since 0.3.0
+{-# INLINE ahead #-}
+ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
+ahead m1 m2 = fromStream $ S.ahead (toStream m1) (toStream m2)
+
+instance MonadAsync m => Semigroup (AheadT m a) where
+ (<>) = ahead
+
+------------------------------------------------------------------------------
+-- Monoid
+------------------------------------------------------------------------------
+
+instance MonadAsync m => Monoid (AheadT m a) where
+ mempty = nil
+ mappend = (<>)
+
+------------------------------------------------------------------------------
+-- Monad
+------------------------------------------------------------------------------
+
+{-# INLINE aheadbind #-}
+aheadbind
+ :: MonadAsync m
+ => S.Stream m a
+ -> (a -> S.Stream m b)
+ -> S.Stream m b
+aheadbind m f = go m
+ where
+ go (S.Stream g) =
+ S.Stream $ \ctx stp sng yld ->
+ let run x = (S.runStream x) ctx stp sng yld
+ single a = run $ f a
+ yield a r = run $ f a `S.ahead` go r
+ in g Nothing stp single yield
+
+instance MonadAsync m => Monad (AheadT m) where
+ return = pure
+ (AheadT m) >>= f = AheadT $ aheadbind m (getAheadT . f)
+
+------------------------------------------------------------------------------
+-- Other instances
+------------------------------------------------------------------------------
+
+MONAD_APPLICATIVE_INSTANCE(AheadT,MONADPARALLEL)
+MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)
+
+------------------------------------------------------------------------------
-- AsyncT
------------------------------------------------------------------------------
@@ -573,11 +842,12 @@ MONAD_COMMON_INSTANCES(WSerialT,)
--
-- @
-- import "Streamly"
+-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = 'runStream' . 'asyncly' $ do
-- n <- return 3 \<\> return 2 \<\> return 1
--- liftIO $ do
+-- S.once $ do
-- threadDelay (n * 1000000)
-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
@@ -601,6 +871,14 @@ instance IsStream AsyncT where
toStream = getAsyncT
fromStream = AsyncT
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> AsyncT IO a -> AsyncT IO a #-}
+ consM m r = fromStream $ S.consMAsync m (toStream r)
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> AsyncT IO a -> AsyncT IO a #-}
+ (|:) = consM
+
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
@@ -614,9 +892,6 @@ instance IsStream AsyncT where
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
async m1 m2 = fromStream $ S.async (toStream m1) (toStream m2)
-instance MonadAsync m => Semigroup (AsyncT m a) where
- (<>) = async
-
-- | Same as 'async'.
--
-- @since 0.1.0
@@ -625,6 +900,9 @@ instance MonadAsync m => Semigroup (AsyncT m a) where
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
(<|) = async
+instance MonadAsync m => Semigroup (AsyncT m a) where
+ (<>) = async
+
------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------
@@ -691,11 +969,12 @@ MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
--
-- @
-- import "Streamly"
+-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = 'runStream' . 'wAsyncly' $ do
-- n <- return 3 \<\> return 2 \<\> return 1
--- liftIO $ do
+-- S.once $ do
-- threadDelay (n * 1000000)
-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
@@ -720,6 +999,14 @@ instance IsStream WAsyncT where
toStream = getWAsyncT
fromStream = WAsyncT
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
+ consM m r = fromStream $ S.consMWAsync m (toStream r)
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
+ (|:) = consM
+
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
@@ -829,11 +1116,12 @@ MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)
--
-- @
-- import "Streamly"
+-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = 'runStream' . 'parallely' $ do
-- n <- return 3 \<\> return 2 \<\> return 1
--- liftIO $ do
+-- S.once $ do
-- threadDelay (n * 1000000)
-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
@@ -854,6 +1142,14 @@ instance IsStream ParallelT where
toStream = getParallelT
fromStream = ParallelT
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
+ consM m r = fromStream $ S.consMParallel m (toStream r)
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> ParallelT IO a -> ParallelT IO a #-}
+ (|:) = consM
+
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
@@ -926,6 +1222,16 @@ instance IsStream ZipSerialM where
toStream = getZipSerialM
fromStream = ZipSerialM
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
+ consM :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
+ consM = consMSerial
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
+ (|:) :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
+ (|:) = consMSerial
+
instance Monad m => Applicative (ZipSerialM m) where
pure = ZipSerialM . S.repeat
m1 <*> m2 = fromStream $ S.zipWith id (toStream m1) (toStream m2)
@@ -958,6 +1264,16 @@ instance IsStream ZipAsyncM where
toStream = getZipAsyncM
fromStream = ZipAsyncM
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
+ consM :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
+ consM = consMSerial
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
+ (|:) :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
+ (|:) = consMSerial
+
instance MonadAsync m => Applicative (ZipAsyncM m) where
pure = ZipAsyncM . S.repeat
m1 <*> m2 = fromStream $ S.zipAsyncWith id (toStream m1) (toStream m2)
@@ -991,6 +1307,12 @@ wSerially = adapt
interleaving :: IsStream t => WSerialT m a -> t m a
interleaving = wSerially
+-- | Fix the type of a polymorphic stream as 'AheadT'.
+--
+-- @since 0.3.0
+aheadly :: IsStream t => AheadT m a -> t m a
+aheadly = adapt
+
-- | Fix the type of a polymorphic stream as 'AsyncT'.
--
-- @since 0.1.0
@@ -1097,6 +1419,12 @@ type Serial a = SerialT IO a
-- @since 0.2.0
type WSerial a = WSerialT IO a
+-- | A serial IO stream of elements of type @a@ with concurrent lookahead. See
+-- 'AheadT' documentation for more details.
+--
+-- @since 0.3.0
+type Ahead a = AheadT IO a
+
-- | A demand driven left biased parallely composing IO stream of elements of
-- type @a@. See 'AsyncT' documentation for more details.
--
diff --git a/src/Streamly/Tutorial.hs b/src/Streamly/Tutorial.hs
index 5b71a13..a56fad6 100644
--- a/src/Streamly/Tutorial.hs
+++ b/src/Streamly/Tutorial.hs
@@ -37,7 +37,10 @@ module Streamly.Tutorial
-- * Streams
-- $streams
- -- * Flavors of Streams
+ -- * Concurrent Streams
+ -- $concurrentStreams
+
+ -- * Combining Streams
-- $flavors
-- * Imports and Supporting Code
@@ -46,12 +49,21 @@ module Streamly.Tutorial
-- * Generating Streams
-- $generating
+ -- * Generating Streams Concurrently
+ -- $generatingConcurrently
+
-- * Eliminating Streams
-- $eliminating
+ -- * Concurrent Pipeline Stages
+ -- $concurrentApplication
+
-- * Transforming Streams
-- $transformation
+ -- * Mapping Concurrently
+ -- $concurrentTransformation
+
-- * Merging Streams
-- ** Semigroup Style
@@ -63,6 +75,9 @@ module Streamly.Tutorial
-- *** Wide Serial Composition ('WSerial')
-- $interleaved
+ -- *** Deep Ahead Composition ('Ahead')
+ -- $ahead
+
-- *** Deep Asynchronous Composition ('Async')
-- $async
@@ -90,6 +105,9 @@ module Streamly.Tutorial
-- *** Wide Serial Nesting ('WSerial')
-- $interleavedNesting
+ -- *** Deep Ahead Nesting ('Ahead')
+ -- $aheadNesting
+
-- *** Deep Asynchronous Nesting ('Async')
-- $concurrentNesting
@@ -126,6 +144,9 @@ module Streamly.Tutorial
-- * Reactive Programming
-- $reactive
+ -- * Writing Concurrent Programs
+ -- $programs
+
-- * Performance
-- $performance
@@ -171,51 +192,89 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
--
-- Similarly, the 'Serial' type is almost a drop in replacement for pure lists,
-- pure lists are a special case of monadic streams. If you use 'nil' in place
--- of '[]' and '|:' in place ':' you can replace a list with a 'Serial' The
--- only difference is that the elements must be monadic type and to operate on
--- the streams we must use the corresponding functions from "Streamly.Prelude"
--- instead of using the base "Prelude".
+-- of '[]' and '|:' in place ':' you can replace a list with a 'Serial' stream.
+-- The only difference is that the elements must be monadic type and to operate
+-- on the streams we must use the corresponding functions from
+-- "Streamly.Prelude" instead of using the base "Prelude".
+
+-- $concurrentStreams
+--
+-- Streams can be generated concurrently, even infnite streams can be generated
+-- concurrently using controlled concurrency, streams can be merged
+-- concurrently, multiple stages in a streaming pipeline can run concurrently,
+-- streams can be mapped concurrently, they can be zipped concurrently, and in
+-- a monadic composition they combine like a list transformer providing
+-- concurrent non-determinism.
+--
+-- There are three basic concurrent stream styles, 'Ahead', 'Async', and
+-- 'Parallel'. The 'Ahead' style streams are similar to 'Serial' except that
+-- they can speculatively execute multiple stream actions concurrently in
+-- advance. 'Ahead' would return exactly the same stream as 'Serial' except
+-- that it may execute the actions concurrently. The 'Async' style streams,
+-- like 'Ahead', speculatively execute multiple stream actions in advance but
+-- return the results in their finishing order rather than in the stream
+-- traversal order. 'Parallel' is like 'Async' except that it provides
+-- unbounded parallelism instead of controlled parallelism.
+--
+-- For easy reference, we can classify the stream types based on /execution order/,
+-- /consumption order/, and /bounded or unbounded/ concurrency.
+-- Execution could be serial (i.e. synchronous) or asynchronous. In serial
+-- execution we execute the next action in the stream only after the previous
+-- one has finished executing. In asynchronous execution multiple actions in
+-- the stream can be executed asynchronously i.e. the next action can start
+-- executing even before the first one has finished. Consumption order
+-- determines the order in which the outputs generated by the composition are
+-- consumed. Consumption could be serial or asynchronous. In serial
+-- consumption, the outputs are consumed in the traversal order, in
+-- asynchronous consumption the outputs are consumed as they arrive i.e. first
+-- come first serve order.
+--
+-- @
+-- +------------+--------------+--------------+--------------+
+-- | Type | Execution | Consumption | Concurrency |
+-- +============+==============+==============+==============+
+-- | 'Serial' | Serial | Serial | None |
+-- +------------+--------------+--------------+--------------+
+-- | 'Ahead' | Asynchronous | Serial | bounded |
+-- +------------+--------------+--------------+--------------+
+-- | 'Async' | Asynchronous | Asynchronous | bounded |
+-- +------------+--------------+--------------+--------------+
+-- | 'Parallel' | Asynchronous | Asynchronous | unbounded |
+-- +------------+--------------+--------------+--------------+
+-- @
+--
+-- All these types can be freely inter-converted using type conversion
+-- combinators or type annotations without any cost, to acheive the desired
+-- composition style. To force a particular type of composition we coerce the
+-- stream type using the corresponding type adapting combinator from
+-- 'serially', 'aheadly', 'asyncly', or 'parallely'. The default stream type
+-- is inferred as 'Serial' unless you change it by using one of the combinators
+-- or using a type annotation.
-- $flavors
--
--- There are a few more types similar to 'Serial' that all represent streams
--- and differ only in the 'Semigroup', 'Applicative' and 'Monad' compositions
--- of the streams.
+-- Streams can be combined using semigroup or monoid composition to form a
+-- composite stream. Traversal of a composition of streams could be @deep@ or
+-- @wide@. Deep goes depth first i.e. each stream is traversed fully before
+-- we traverse the next stream. Wide goes breadth first i.e. one element from
+-- each stream is traversed before coming back to the first stream again.
+--
+-- Each stream type has a wide traversal variant prefixed by 'W'. The wide
+-- variant differs only in the Semigroup\/Monoid, Applicative\/Monad
+-- compositions of the streams.
+-- The following table summarizes the basic types and the corresponding wide
+-- variants:
--
--- The composition of two or more streams is distinguished based on three
--- characterstics, /traversal order/, /execution order/ and
--- /consumption order/. Traversal of a composition of streams could be @deep@
--- or @wide@. Deep goes depth first i.e. each stream is traversed fully
--- before we traverse the next stream. Wide goes breadth first i.e. one element
--- from each stream is traversed before coming back to the first stream again.
--- Execution could be serial (i.e. synchronous) or asynchronous. In serial
--- execution we execute an action in the next stream only after the first has
--- finished executing. In asynchronous execution actions in both streams can be
--- executed asynchronously i.e. the next action can start executing even before
--- the first one has finished. The third parameter is consumption order that is
--- in what order the output generated by the composition is consumed.
--- Consumption could be serial or asynchronous. In serial consumption, the
--- outputs are consumed in the traversal order, in asynchronous consumption the
--- outputs are consumed as they arrive i.e. first come first serve order.
---
--- The following table summarizes different styles of streams based on how they
--- compose. All these types are monads and they differ in 'Semigroup',
--- 'Applicative' and 'Monad' compositions:
---
--- @
--- +------------+-----------+--------------+--------------+
--- | Type | Traversal | Execution | Consumption |
--- +============+===========+==============+==============+
--- | 'Serial' | Deep | Serial | Serial |
--- +------------+-----------+--------------+--------------+
--- | 'WSerial' | Wide | Serial | Serial |
--- +------------+-----------+--------------+--------------+
--- | 'Async' | Deep | Asynchronous | Asynchronous |
--- +------------+-----------+--------------+--------------+
--- | 'WAsync' | Wide | Asynchronous | Asynchronous |
--- +------------+-----------+--------------+--------------+
--- | 'Parallel' | Parallel | Asynchronous | Asynchronous |
--- +------------+-----------+--------------+--------------+
+-- @
+-- +------------+-----------+
+-- | Deep | Wide |
+-- +============+===========+
+-- | 'Serial' | 'WSerial' |
+-- +------------+-----------+
+-- | 'Ahead' | 'WAhead' |
+-- +------------+-----------+
+-- | 'Async' | 'WAsync' |
+-- +------------+-----------+
-- @
--
-- Other than these types there are also 'ZipSerial' and 'ZipAsync' types that
@@ -223,13 +282,24 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- types are not monads they are only applicatives and they do not differ in
-- 'Semigroup' composition.
--
--- All these types can be freely inter-converted using type conversion
--- combinators or type annotations without any cost, to acheive the desired
--- composition style. To force a particular type of composition we coerce the
--- stream type using the corresponding type adapting combinator from
--- 'serially', 'wSerially', 'asyncly', 'wAsyncly', 'parallely', 'zipSerially'
--- or 'zipAsyncly'. The default stream type is inferred as 'Serial' unless you
--- change it by using one of the combinators or using a type annotation.
+
+-- $programs
+--
+-- When writing concurrent programs it is advised to not use the concurrent
+-- style stream combinators blindly at the top level. That might create too
+-- much concurrency where it is not even required, and can even degrade
+-- performance in some cases. In some cases it can also lead to surprising
+-- behavior because of some code that is supposed to be serial becoming
+-- concurrent. Please be aware that all concurrency capable APIs that you may
+-- have used under the scope of a concurrent stream combinator will become
+-- concurrent. For example if you have a 'repeatM' somewhere in your program
+-- and you use 'parallely' on top, the 'repeatM' becomes fully parallel,
+-- resulting into an infinite parallel execution . Instead, use the
+-- /Keep It Serial and Stupid/ principle, start with the default serial
+-- composition and enable concurrent combinators only when and where necessary.
+-- When you use a concurrent combinator you can use an explicit 'serially'
+-- combinator to suppress any unnecessary concurrency under the scope of that
+-- combinator.
-- $monadtransformers
--
@@ -308,6 +378,31 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
--
-- For more ways to construct a stream see the module "Streamly.Prelude".
+-- $generatingConcurrently
+--
+-- Monadic construction and generation functions e.g. 'consM', 'unfoldrM',
+-- 'replicateM', 'repeatM', 'iterateM' and 'fromFoldableM' etc work
+-- concurrently when used with appropriate stream type combinator. The pure
+-- versions of these APIs are not concurrent, however you can use the monadic
+-- versions even for pure computations by wrapping the pure value in a monad to
+-- get the concurrent generation capability where required.
+--
+-- The following code finishes in 3 seconds (6 seconds when serial):
+--
+-- @
+-- > let p n = threadDelay (n * 1000000) >> return n
+-- > S.'toList' $ 'parallely' $ p 3 |: p 2 |: p 1 |: S.'nil'
+-- [1,2,3]
+-- > S.'toList' $ 'aheadly' $ p 3 |: p 2 |: p 1 |: S.'nil'
+-- [3,2,1]
+-- @
+-- The following finishes in 10 seconds (100 seconds when serial):
+--
+-- @
+-- > 'runStream' $ 'asyncly' $ S.'replicateM' 10 $ p 10
+-- @
+--
+
-- $eliminating
--
-- We have already seen 'runStream' and 'toList' to eliminate a stream in the
@@ -354,6 +449,35 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- & S.'mapM' putStrLn
-- @
+-- $concurrentTransformation
+--
+-- Monadic transformation functions 'mapM' and 'sequence' work concurrently
+-- when used with appropriate stream type combinators. The pure versions do not
+-- work concurrently, however you can use the monadic versions even for pure
+-- computations to get the concurrent transformation capability where required.
+--
+-- This would print a value every second (2 seconds when serial):
+--
+-- @
+-- > let p n = threadDelay (n * 1000000) >> return n
+-- > runStream $ aheadly $ S.'mapM' (\\x -> p 1 >> print x) (serially $ repeatM (p 1))
+-- @
+--
+
+-- $concurrentApplication
+--
+-- The concurrent function application operators '|$' and '|&' apply a stream
+-- argument to a stream function concurrently to compose a concurrent pipeline
+-- of stream processing functions:
+--
+-- Because both the stages run concurrently, we would see a delay of only 1
+-- second instead of 2 seconds in the following:
+--
+-- @
+-- > let p n = threadDelay (n * 1000000) >> return n
+-- > runStream $ S.'repeatM' (p 1) '|&' S.'mapM' (\\x -> p 1 >> print x)
+-- @
+
-- $semigroup
--
-- We can combine two streams into a single stream using semigroup composition
@@ -492,6 +616,39 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- Note that this composition cannot be used to fold infinite number of streams
-- since it requires preserving the state until a stream is finished.
+-- $ahead
+--
+-- The 'Semigroup' operation '<>' of the 'Ahead' type combines two streams in a
+-- /serial depth first/ manner with concurrent lookahead. We use the 'aheadly'
+-- type combinator to effect 'Ahead' style of composition. We can also use an
+-- explicit 'Ahead' type annotation for the stream to acheive the same effect.
+--
+-- When two streams are combined in this manner, the streams are traversed in
+-- depth first manner just like 'Serial', however it can execute the next
+-- stream concurrently and keep the results ready when its turn arrives.
+-- Concurrent execution of the next stream(s) is performed if the first stream
+-- blocks or if it cannot produce output at the rate that is enough to meet the
+-- consumer demand. Multiple streams can be executed concurrently to meet the
+-- demand. The following example would print the result in a second even
+-- though each action in each stream takes one second:
+--
+-- @
+-- main = do
+-- xs \<- 'toList' . 'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
+-- print xs
+-- where p n = threadDelay 1000000 >> return n
+-- @
+-- @
+-- [1,2,3,4]
+-- @
+--
+-- Each stream is constructed 'aheadly' and then both the streams are merged
+-- 'aheadly', therefore, all the actions can run concurrently but the result is
+-- presented in serial order.
+--
+-- You can also use the polymorphic combinator 'ahead' in place of '<>' to
+-- compose any type of streams in this manner.
+
-- $async
--
-- The 'Semigroup' operation '<>' of the 'Async' type combines the two
@@ -508,32 +665,33 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- stream(s) is performed if the first stream blocks or if it cannot produce
-- output at the rate that is enough to meet the consumer demand. Multiple
-- streams can be executed concurrently to meet the demand.
--- In the following example the first stream does not block,
--- therefore the first stream is completely exhausted before the second.
+-- In the example below each element in the stream introduces a constant delay
+-- of 1 second, however, it takes just one second to produce all the results.
+-- The results are not guaranteed to be in any particular order:
--
-- @
--- main = 'runStream' . 'asyncly' $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)
+-- main = do
+-- xs \<- 'runStream' . 'asyncly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
+-- print xs
+-- where p n = threadDelay 1000000 >> return n
-- @
-- @
--- 1
--- 2
--- 3
--- 4
+-- [4,2,1,3]
-- @
--
--- If the first stream blocks, we can yield from the second. In the example
--- below each yield in the stream has a constant delay of 1 second therefore 1
--- and 3 would be yielded first and then 2 and 4 would be yielded.
+-- The constituent streams are also composed in 'Async' manner and the
+-- composition of streams too. We can compose the constituent streams to run
+-- serially, in that case it would take 2 seconds to produce all the results.
+-- The elements in the serial streams would be in serial order in the results:
--
-- @
--- main = 'runStream' . 'asyncly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
--- where p n = threadDelay 1000000 >> print n
+-- main = do
+-- xs \<- 'runStream' . 'asyncly' $ (serially $ p 1 |: p 2 |: nil) <> (serially $ p 3 |: p 4 |: nil)
+-- print xs
+-- where p n = threadDelay 1000000 >> return n
-- @
-- @
--- 1
--- 3
--- 2
--- 4
+-- [3,1,2,4]
-- @
--
-- In the following example we can see that new threads are started when a
@@ -626,20 +784,20 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- The 'Semigroup' operation '<>' of the 'WAsync' type combines two streams in
-- a concurrent manner using /breadth first traversal/. We use the 'wAsyncly'
-- type combinator to effect 'WAsync' style of composition. We can also use the
--- 'WAsync' type annotation for the stream to acheive the same effect.
+-- 'WAsync' type annotation for the stream to achieve the same effect.
--
-- When streams with multiple elements are combined in this manner, we traverse
-- all the streams concurrently in a breadth first manner i.e. one action from
--- each stream is peformed and yielded to the resulting stream before we come
+-- each stream is performed and yielded to the resulting stream before we come
-- back to the first stream again and so on. Even though we execute the actions
--- in a breadth first order the outputs may be consumed in a different order
--- because they are consumed on a first come first serve basis.
+-- in a breadth first order the outputs are consumed on a first come first
+-- serve basis.
--
-- In the following example we can see that outputs are produced in the breadth
--- first travresal order but this is not guaranteed.
+-- first traversal order but this is not guaranteed.
--
-- @
--- main = 'runStream' . 'wAsyncly' $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)
+-- main = 'runStream' . 'wAsyncly' $ (serially $ print 1 |: print 2 |: nil) <> (serially $ print 3 |: print 4 |: nil)
-- @
-- @
-- 1
@@ -857,6 +1015,44 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- Notice that this is analogous to merging streams of type 'Serial' or merging
-- streams using 'serial'.
+-- $aheadNesting
+--
+-- The 'Monad' composition of 'Ahead' type behaves just like 'Serial' except
+-- that it can speculatively perform a bounded number of next iterations of a
+-- loop concurrently.
+--
+-- The 'aheadly' type combinator can be used to switch to this style of
+-- composition. Alternatively, a type annotation can be used to specify the
+-- type of the stream as 'Ahead'.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude" as S
+--
+-- comp = 'toList' . 'aheadly' $ do
+-- x <- S.'fromFoldable' [3,2,1]
+-- delay x >> return x
+--
+-- main = comp >> print
+-- @
+-- @
+-- ThreadId 40: Delay 1
+-- ThreadId 39: Delay 2
+-- ThreadId 38: Delay 3
+-- [3,2,1]
+-- @
+--
+-- This code finishes in 3 seconds, 'Serial' would take 6 seconds. As we can
+-- see all the three iterations are concurrent and run in different threads,
+-- however, the results are returned in the serial order.
+--
+-- Concurrency is demand driven, when multiple streams are composed using this
+-- style, the iterations are executed in a depth first manner just like
+-- 'Serial' i.e. nested iterations are executed before we proceed to the next
+-- outer iteration. The only difference is that we may execute multiple future
+-- iterations concurrently and keep the results ready.
+--
+
-- $concurrentNesting
--
-- The 'Monad' composition of 'Async' type can perform the iterations of a
@@ -1324,13 +1520,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- import qualified Streamly.Prelude as S
-- import qualified Data.Vector.Fusion.Stream.Monadic as V
--
--- main = do
--- -- streamly to vector
--- V.toList (V.unfoldrM S.uncons (S.fromFoldable [1..3])) >>= print
---
--- -- vector to streamly
--- S.toList (S.unfoldrM unconsV (V.fromList [1..3])) >>= print
---
+-- -- | vector to streamly
+-- fromVector :: (IsStream t, Monad m) => V.Stream m a -> t m a
+-- fromVector = S.unfoldrM unconsV
-- where
-- unconsV v = do
-- r <- V.null v
@@ -1339,6 +1531,14 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- else do
-- h <- V.head v
-- return $ Just (h, V.tail v)
+--
+-- -- | streamly to vector
+-- toVector :: Monad m => SerialT m a -> V.Stream m a
+-- toVector = V.unfoldrM (S.uncons . adapt)
+--
+-- main = do
+-- S.toList (fromVector (V.fromList [1..3])) >>= print
+-- V.toList (toVector (S.fromFoldable [1..3])) >>= print
-- @
--
-- Interop with @pipes@:
@@ -1349,19 +1549,23 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- import qualified Pipes as P
-- import qualified Pipes.Prelude as P
--
--- main = do
--- -- streamly to pipe
--- P.toListM (P.unfoldr unconsS (S.'fromFoldable' [1..3])) >>= print
---
--- -- pipe to streamly
--- S.'toList' (S.'unfoldrM' unconsP (P.each [1..3])) >>= print
---
+-- -- | pipes to streamly
+-- fromPipes :: (IsStream t, Monad m) => P.Producer a m r -> t m a
+-- fromPipes = S.'unfoldrM' unconsP
-- where
-- -- Adapt P.next to return a Maybe instead of Either
-- unconsP p = P.next p >>= either (\\_ -> return Nothing) (return . Just)
--
+-- -- | streamly to pipes
+-- toPipes :: Monad m => SerialT m a -> P.Producer a m ()
+-- toPipes = P.unfoldr unconsS
+-- where
-- -- Adapt S.uncons to return an Either instead of Maybe
-- unconsS s = S.'uncons' s >>= maybe (return $ Left ()) (return . Right)
+--
+-- main = do
+-- S.'toList' (fromPipes (P.each [1..3])) >>= print
+-- P.toListM (toPipes (S.'fromFoldable' [1..3])) >>= print
-- @
--
-- Interop with @streaming@:
@@ -1372,17 +1576,20 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- import qualified Streaming as SG
-- import qualified Streaming.Prelude as SG
--
--- main = do
--- -- streamly to streaming
--- SG.toList (SG.unfoldr unconsS (S.'fromFoldable' [1..3])) >>= print
---
--- -- streaming to streamly
--- S.'toList' (S.unfoldrM SG.uncons (SG.each [1..3])) >>= print
+-- -- | streaming to streamly
+-- fromStreaming :: (IsStream t, Monad m) => SG.Stream (SG.Of a) m r -> t m a
+-- fromStreaming = S.unfoldrM SG.uncons
--
+-- -- | streamly to streaming
+-- toStreaming :: Monad m => SerialT m a -> SG.Stream (SG.Of a) m ()
+-- toStreaming = SG.unfoldr unconsS
-- where
---
-- -- Adapt S.uncons to return an Either instead of Maybe
-- unconsS s = S.'uncons' s >>= maybe (return $ Left ()) (return . Right)
+--
+-- main = do
+-- S.toList (fromStreaming (SG.each [1..3])) >>= print
+-- SG.toList (toStreaming (S.fromFoldable [1..3])) >>= print
-- @
--
-- Interop with @conduit@:
@@ -1394,12 +1601,15 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- import qualified Data.Conduit.List as C
-- import qualified Data.Conduit.Combinators as C
--
--- main = do
--- -- streamly to conduit
--- C.runConduit (C.unfoldM S.'uncons' (S.'fromFoldable' [1..3]) C..| C.sinkList) >>= print
+-- -- It seems there is no way out of a conduit as it does not provide an
+-- -- uncons or a tail function. We can convert streamly to conduit though.
+--
+-- -- | streamly to conduit
+-- toConduit :: Monad m => SerialT m a -> C.ConduitT i a m ()
+-- toConduit s = C.unfoldM S.'uncons' s
--
--- -- It seems there is no way out of a conduit as it does not provide an
--- -- uncons or a tail function.
+-- main = do
+-- C.runConduit (toConduit (S.'fromFoldable' [1..3]) C..| C.sinkList) >>= print
-- @
-- $comparison
diff --git a/stack-7.10.yaml b/stack-7.10.yaml
index 323d136..32c30d6 100644
--- a/stack-7.10.yaml
+++ b/stack-7.10.yaml
@@ -2,6 +2,7 @@ resolver: lts-6.35
packages:
- '.'
extra-deps:
+ - QuickCheck-2.10
- lockfree-queue-0.2.3.1
- simple-conduit-0.4.0
- transient-0.5.9.2
diff --git a/stack-8.0.yaml b/stack-8.0.yaml
new file mode 100644
index 0000000..aed11c1
--- /dev/null
+++ b/stack-8.0.yaml
@@ -0,0 +1,16 @@
+resolver: lts-9.20
+packages:
+- '.'
+extra-deps:
+ - QuickCheck-2.10
+ - lockfree-queue-0.2.3.1
+ - simple-conduit-0.6.0
+ - SDL-0.6.5.1
+ - gauge-0.2.1
+ - basement-0.0.4
+flags: {}
+extra-package-dbs: []
+rebuild-ghc-options: true
+# For mac ports installed SDL library on Mac OS X
+#extra-include-dirs:
+#- /opt/local/include
diff --git a/stack.yaml b/stack.yaml
index fd971c1..e87d702 100644
--- a/stack.yaml
+++ b/stack.yaml
@@ -8,7 +8,7 @@ extra-deps:
- SDL-0.6.5.1
- git: https://github.com/composewell/bench-graph
- commit: b6d6c69f76c0d951aecd257866cede9467bd166d
+ commit: 268a04061cca7eda448b8f741d8d0aa82cd6be3a
- git: https://github.com/harendra-kumar/hs-gauge
commit: f3bb4a1fc801c581224843759b7e6dabb0aef3dc
diff --git a/streamly.cabal b/streamly.cabal
index 09d4af6..a61e2af 100644
--- a/streamly.cabal
+++ b/streamly.cabal
@@ -1,29 +1,26 @@
name: streamly
-version: 0.2.1
+version: 0.3.0
synopsis: Beautiful Streaming, Concurrent and Reactive Composition
description:
- Streamly, short for streaming concurrently, is a simple yet powerful
- streaming library with concurrent merging and concurrent nested looping
- support. A stream is just like a list except that it is a list of monadic
- actions rather than pure values. Streamly streams can be generated,
- consumed, combined, or transformed serially or concurrently. We can loop over
- a stream serially or concurrently. We can also have serial or concurrent
- nesting of loops. For those familiar with list transformer concept streamly
- is a concurrent list transformer. Streamly uses standard composition
- abstractions. Concurrent composition is just the same as serial composition
- except that we use a simple combinator to request a concurrent composition
- instead of serial. The programmer does not have to be aware of threads,
- locking or synchronization to write scalable concurrent programs.
+ Streamly, short for streaming concurrently, provides monadic streams, with a
+ simple API, almost identical to standard lists, and an in-built support for
+ concurrency. By using stream-style combinators on stream composition,
+ streams can be generated, merged, chained, mapped, zipped, and consumed
+ concurrently – providing a generalized high level programming framework
+ unifying streaming and concurrency. Controlled concurrency allows even
+ infinite streams to be evaluated concurrently. Concurrency is auto scaled
+ based on feedback from the stream consumer. The programmer does not have to
+ be aware of threads, locking or synchronization to write scalable concurrent
+ programs.
.
- Streamly provides functionality that is equivalent to streaming libraries
- like <https://hackage.haskell.org/package/pipes pipes> and
- <https://hackage.haskell.org/package/conduit conduit> but with a simple list
- like API. The streaming API of streamly is close to the monadic streams API
- of the <https://hackage.haskell.org/package/vector vector> package and
- similar in concept to the
- <https://hackage.haskell.org/package/streaming streaming> package. In
- addition to the streaming functionality, streamly subsumes the functionality
- of list transformer libraries like @pipes@ or
+ The basic streaming functionality of streamly is equivalent to that provided by
+ streaming libraries like
+ <https://hackage.haskell.org/package/vector vector>,
+ <https://hackage.haskell.org/package/streaming streaming>,
+ <https://hackage.haskell.org/package/pipes pipes>, and
+ <https://hackage.haskell.org/package/conduit conduit>.
+ In addition to providing streaming functionality, streamly subsumes the
+ functionality of list transformer libraries like @pipes@ or
<https://hackage.haskell.org/package/list-t list-t> and also the logic
programming library <https://hackage.haskell.org/package/logict logict>. On
the concurrency side, it subsumes the functionality of the
@@ -32,13 +29,25 @@ description:
concept to <https://hackage.haskell.org/package/Yampa Yampa> or
<https://hackage.haskell.org/package/reflex reflex>.
.
- Streamly has excellent performance, see
- <https://github.com/composewell/streaming-benchmarks streaming-benchmarks>
- for a comparison of popular streaming libraries on micro-benchmarks. For
- file IO, currently the library provides only one API to stream the lines in
- the file as Strings. Future versions will provide better streaming file IO
- options. Streamly interworks with the popular streaming libraries, see the
- interworking section in "Streamly.Tutorial".
+ For file IO, currently the library provides only one API to stream the lines
+ in the file as Strings. Future versions will provide better streaming file
+ IO options. Streamly interworks with the popular streaming libraries, see
+ the interworking section in "Streamly.Tutorial".
+ .
+ Why use streamly?
+ .
+ * /Simplicity/: Simple list like streaming API, if you know how to use lists
+ then you know how to use streamly. This library is built with simplicity
+ and ease of use as a primary design goal.
+ * /Concurrency/: Simple, powerful, and scalable concurrency. Concurrency is
+ built-in, and not intrusive, concurrent programs are written exactly the
+ same way as non-concurrent ones.
+ * /Generality/: Unifies functionality provided by several disparate packages
+ (streaming, concurrency, list transformer, logic programming, reactive
+ programming) in a concise API.
+ * /Performance/: Streamly is designed for high performance. See
+ <https://github.com/composewell/streaming-benchmarks streaming-benchmarks>
+ for a comparison of popular streaming libraries on micro-benchmarks.
.
Where to find more information:
.
@@ -50,7 +59,7 @@ homepage: https://github.com/composewell/streamly
bug-reports: https://github.com/composewell/streamly/issues
license: BSD3
license-file: LICENSE
-tested-with: GHC==7.10.3, GHC==8.0.2, GHC==8.2.2, GHC==8.4.2
+tested-with: GHC==7.10.3, GHC==8.0.2, GHC==8.2.2, GHC==8.4.3
author: Harendra Kumar
maintainer: harendra.kumar@gmail.com
copyright: 2017 Harendra Kumar
@@ -64,24 +73,30 @@ extra-source-files:
README.md
bench.sh
stack-7.10.yaml
+ stack-8.0.yaml
stack.yaml
source-repository head
type: git
location: https://github.com/composewell/streamly
+flag diag
+ description: Diagnostics build
+ manual: True
+ default: False
+
flag dev
- description: Build development version
+ description: Development build
manual: True
default: False
flag examples
- description: Build examples
+ description: Build including examples
manual: True
default: False
flag examples-sdl
- description: Include examples that use SDL dependency
+ description: Build including SDL examples
manual: True
default: False
@@ -102,7 +117,11 @@ library
default-language: Haskell2010
ghc-options: -Wall
+ if flag(diag)
+ cpp-options: -DDIAGNOSTICS
+
if flag(dev)
+ cpp-options: -DDIAGNOSTICS
ghc-options: -Wmissed-specialisations
-Wall-missed-specialisations
-fno-ignore-asserts
@@ -117,14 +136,17 @@ library
-Wnoncanonical-monadfail-instances
build-depends: base >= 4.8 && < 5
- , atomic-primops >= 0.8 && < 0.9
, containers >= 0.5 && < 0.6
- , exceptions >= 0.8 && < 0.11
- , lifted-base >= 0.2 && < 0.3
+ , heaps >= 0.3 && < 0.4
+
+ -- concurrency
+ , atomic-primops >= 0.8 && < 0.9
, lockfree-queue >= 0.2.3 && < 0.3
+
+ -- transfomers
+ , exceptions >= 0.8 && < 0.11
, monad-control >= 1.0 && < 2
, mtl >= 2.2 && < 3
- , stm >= 2.4.3 && < 2.5
, transformers >= 0.4 && < 0.6
, transformers-base >= 0.4 && < 0.5
@@ -140,7 +162,7 @@ test-suite test
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
- ghc-options: -O0 -Wall
+ ghc-options: -O0 -Wall -threaded -with-rtsopts=-N
if flag(dev)
ghc-options: -Wmissed-specialisations
-Wall-missed-specialisations
@@ -167,8 +189,9 @@ test-suite properties
type: exitcode-stdio-1.0
main-is: Prop.hs
hs-source-dirs: test
- ghc-options: -O0 -Wall
+ ghc-options: -O0 -Wall -threaded -with-rtsopts=-N4
if flag(dev)
+ cpp-options: -DDEVBUILD
ghc-options: -Wmissed-specialisations
-Wall-missed-specialisations
if impl(ghc >= 8.0)
@@ -183,7 +206,7 @@ test-suite properties
build-depends:
streamly
, base >= 4.8 && < 5
- , QuickCheck >= 2.8 && < 2.12
+ , QuickCheck >= 2.10 && < 2.12
, hspec >= 2.0 && < 3
default-language: Haskell2010
diff --git a/test/Main.hs b/test/Main.hs
index c8eb1af..405c52c 100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -115,6 +115,7 @@ main = hspec $ do
---------------------------------------------------------------------------
describe "Serial Composition" $ compose serially mempty id
+ describe "Ahead Composition" $ compose aheadly mempty id
describe "WSerial Composition" $ compose wSerially mempty sort
describe "Async Composition" $ compose asyncly mempty sort
describe "WAsync Composition" $ compose wAsyncly mempty sort
@@ -128,7 +129,10 @@ main = hspec $ do
---------------------------------------------------------------------------
-- TBD need more such combinations to be tested.
- describe "<> and <>" $ composeAndComposeSimple serially serially (cycle [[1 .. 9]])
+ describe "serial <> and serial <>" $ composeAndComposeSimple serially serially (cycle [[1 .. 9]])
+ describe "ahead <> and ahead <>" $ composeAndComposeSimple aheadly aheadly (cycle [[1 .. 9]])
+ describe "ahead <> and serial <>" $ composeAndComposeSimple aheadly serially (cycle [[1 .. 9]])
+ describe "serial <> and ahead <>" $ composeAndComposeSimple serially aheadly (cycle [[1 .. 9]])
describe "<> and <=>" $ composeAndComposeSimple
serially
@@ -203,6 +207,7 @@ main = hspec $ do
---------------------------------------------------------------------------
describe "Serial loops" $ loops serially id reverse
+ describe "Ahead loops" $ loops aheadly id reverse
describe "Async parallel loops" $ loops asyncly sort sort
describe "WAsync loops" $ loops wAsyncly sort sort
describe "parallel loops" $ loops parallely sort sort
@@ -216,30 +221,42 @@ main = hspec $ do
describe "Bind and compose Stream 3" $ bindAndComposeSimple serially asyncly
describe "Bind and compose Stream 4" $ bindAndComposeSimple serially wAsyncly
describe "Bind and compose Stream 5" $ bindAndComposeSimple serially parallely
+ describe "Bind and compose Stream 6" $ bindAndComposeSimple serially aheadly
+
+ describe "Bind and compose Ahead Stream 0" $ bindAndComposeSimple aheadly aheadly
+ describe "Bind and compose Ahead Stream 1" $ bindAndComposeSimple aheadly serially
+ describe "Bind and compose Ahead Stream 2" $ bindAndComposeSimple aheadly wSerially
+ describe "Bind and compose Ahead Stream 3" $ bindAndComposeSimple aheadly asyncly
+ describe "Bind and compose Ahead Stream 4" $ bindAndComposeSimple aheadly wAsyncly
+ describe "Bind and compose Ahead Stream 5" $ bindAndComposeSimple aheadly parallely
describe "Bind and compose Costream 1" $ bindAndComposeSimple wSerially serially
describe "Bind and compose Costream 2" $ bindAndComposeSimple wSerially wSerially
describe "Bind and compose Costream 3" $ bindAndComposeSimple wSerially asyncly
describe "Bind and compose Costream 4" $ bindAndComposeSimple wSerially wAsyncly
describe "Bind and compose Costream 5" $ bindAndComposeSimple wSerially parallely
+ describe "Bind and compose Costream 6" $ bindAndComposeSimple wSerially aheadly
describe "Bind and compose Async 1" $ bindAndComposeSimple asyncly serially
describe "Bind and compose Async 2" $ bindAndComposeSimple asyncly wSerially
describe "Bind and compose Async 3" $ bindAndComposeSimple asyncly asyncly
describe "Bind and compose Async 4" $ bindAndComposeSimple asyncly wAsyncly
describe "Bind and compose Async 5" $ bindAndComposeSimple asyncly parallely
+ describe "Bind and compose Async 6" $ bindAndComposeSimple asyncly aheadly
describe "Bind and compose WAsync 1" $ bindAndComposeSimple wAsyncly serially
describe "Bind and compose WAsync 2" $ bindAndComposeSimple wAsyncly wSerially
describe "Bind and compose WAsync 3" $ bindAndComposeSimple wAsyncly asyncly
describe "Bind and compose WAsync 4" $ bindAndComposeSimple wAsyncly wAsyncly
describe "Bind and compose WAsync 5" $ bindAndComposeSimple wAsyncly parallely
+ describe "Bind and compose WAsync 6" $ bindAndComposeSimple wAsyncly aheadly
describe "Bind and compose Parallel 1" $ bindAndComposeSimple parallely serially
describe "Bind and compose Parallel 2" $ bindAndComposeSimple parallely wSerially
describe "Bind and compose Parallel 3" $ bindAndComposeSimple parallely asyncly
describe "Bind and compose Parallel 4" $ bindAndComposeSimple parallely wAsyncly
describe "Bind and compose Parallel 5" $ bindAndComposeSimple parallely parallely
+ describe "Bind and compose Parallel 6" $ bindAndComposeSimple parallely aheadly
let fldr, fldl :: (IsStream t, Semigroup (t IO Int)) => [t IO Int] -> t IO Int
fldr = foldr (<>) nil
@@ -255,6 +272,21 @@ main = hspec $ do
describe "Bind and compose" $ bindAndComposeHierarchy serially wAsyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy serially parallely k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose serially aheadly" $ bindAndComposeHierarchy serially aheadly k
+
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose aheadly serially" $ bindAndComposeHierarchy aheadly serially k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose aheadly wSerially" $ bindAndComposeHierarchy aheadly wSerially k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose aheadly asyncly" $ bindAndComposeHierarchy aheadly asyncly k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose aheadly wAsyncly" $ bindAndComposeHierarchy aheadly wAsyncly k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose aheadly parallely" $ bindAndComposeHierarchy aheadly parallely k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose serially aheadly" $ bindAndComposeHierarchy aheadly aheadly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy wSerially serially k
@@ -266,6 +298,8 @@ main = hspec $ do
describe "Bind and compose" $ bindAndComposeHierarchy wSerially wAsyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy wSerially parallely k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose wserially aheadly" $ bindAndComposeHierarchy wSerially aheadly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy asyncly serially k
@@ -277,6 +311,8 @@ main = hspec $ do
describe "Bind and compose" $ bindAndComposeHierarchy asyncly wAsyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy asyncly parallely k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose asyncly aheadly" $ bindAndComposeHierarchy asyncly aheadly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy wAsyncly serially k
@@ -288,6 +324,8 @@ main = hspec $ do
describe "Bind and compose" $ bindAndComposeHierarchy wAsyncly wAsyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy wAsyncly parallely k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose wAsyncly aheadly" $ bindAndComposeHierarchy wAsyncly aheadly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy parallely serially k
@@ -299,15 +337,19 @@ main = hspec $ do
describe "Bind and compose" $ bindAndComposeHierarchy parallely wAsyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy parallely parallely k
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose parallely aheadly" $ bindAndComposeHierarchy parallely aheadly k
-- Nest two lists using different styles of product compositions
it "Nests two streams using monadic serial composition" nestTwoSerial
+ it "Nests two streams using monadic ahead composition" nestTwoAhead
it "Nests two streams using monadic interleaved composition" nestTwoInterleaved
it "Nests two streams using monadic Async composition" nestTwoAsync
it "Nests two streams using monadic WAsync composition" nestTwoWAsync
it "Nests two streams using monadic parallel composition" nestTwoParallel
it "Nests two streams using applicative serial composition" nestTwoSerialApp
+ it "Nests two streams using applicative ahead composition" nestTwoAheadApp
it "Nests two streams using applicative interleaved composition" nestTwoInterleavedApp
it "Nests two streams using applicative Async composition" nestTwoAsyncApp
it "Nests two streams using applicative WAsync composition" nestTwoWAsyncApp
@@ -319,6 +361,7 @@ main = hspec $ do
-- TBD combine all binds and all compose in one example
describe "Miscellaneous combined examples" mixedOps
+ describe "Miscellaneous combined examples aheadly" mixedOpsAheadly
describe "Simple MonadError and MonadThrow" simpleMonadError
{-
@@ -333,12 +376,18 @@ main = hspec $ do
describe "Composed MonadThrow asyncly" $ composeWithMonadThrow asyncly
describe "Composed MonadThrow wAsyncly" $ composeWithMonadThrow wAsyncly
describe "Composed MonadThrow parallely" $ composeWithMonadThrow parallely
+ describe "Composed MonadThrow aheadly" $ composeWithMonadThrow aheadly
- it "Crosses thread limit (2000 threads)" $
+ it "asyncly crosses thread limit (2000 threads)" $
runStream (asyncly $ fold $
replicate 2000 $ A.once $ threadDelay 1000000)
`shouldReturn` ()
+ it "aheadly crosses thread limit (4000 threads)" $
+ runStream (aheadly $ fold $
+ replicate 4000 $ A.once $ threadDelay 1000000)
+ `shouldReturn` ()
+
-- XXX need to test that we have promptly cleaned up everything after the error
-- XXX We can also check the output that we are expected to get before the
-- error occurs.
@@ -432,6 +481,16 @@ nestTwoSerial =
return (x + y)
) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+nestTwoAhead :: Expectation
+nestTwoAhead =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in (A.toList . aheadly) (do
+ x <- s1
+ y <- s2
+ return (x + y)
+ ) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
nestTwoSerialApp :: Expectation
nestTwoSerialApp =
let s1 = foldMapWith (<>) return [1..4]
@@ -439,6 +498,13 @@ nestTwoSerialApp =
in toListSerial ((+) <$> s1 <*> s2)
`shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+nestTwoAheadApp :: Expectation
+nestTwoAheadApp =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in (A.toList . aheadly) ((+) <$> s1 <*> s2)
+ `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
nestTwoInterleaved :: Expectation
nestTwoInterleaved =
let s1 = foldMapWith (<>) return [1..4]
@@ -706,3 +772,33 @@ mixedOps = do
return (x11 + y11 + z11)
return (x1 + y1 + z1)
return (x + y + z)
+
+mixedOpsAheadly :: Spec
+mixedOpsAheadly = do
+ it "Compose many ops" $
+ (toListSerial composeMixed >>= return . sort)
+ `shouldReturn` ([8,9,9,9,9,9,10,10,10,10,10,10,10,10,10,10,11,11
+ ,11,11,11,11,11,11,11,11,12,12,12,12,12,13
+ ] :: [Int])
+ where
+
+ composeMixed :: SerialT IO Int
+ composeMixed = do
+ A.once $ return ()
+ A.once $ putStr ""
+ x <- return 1
+ y <- return 2
+ z <- do
+ x1 <- wAsyncly $ return 1 <> return 2
+ A.once $ return ()
+ A.once $ putStr ""
+ y1 <- aheadly $ return 1 <> return 2
+ z1 <- do
+ x11 <- return 1 <> return 2
+ y11 <- aheadly $ return 1 <> return 2
+ z11 <- parallely $ return 1 <> return 2
+ A.once $ return ()
+ A.once $ putStr ""
+ return (x11 + y11 + z11)
+ return (x1 + y1 + z1)
+ return (x + y + z)
diff --git a/test/Prop.hs b/test/Prop.hs
index fbd8811..5a6b229 100644
--- a/test/Prop.hs
+++ b/test/Prop.hs
@@ -2,14 +2,19 @@
module Main (main) where
+import Control.Exception (BlockedIndefinitelyOnMVar(..), catches,
+ BlockedIndefinitelyOnSTM(..), Handler(..))
import Control.Monad (when)
import Control.Applicative (ZipList(..))
-import Control.Monad (replicateM)
+import Control.Concurrent (MVar, takeMVar, putMVar, newEmptyMVar)
+import Control.Monad (replicateM, replicateM_)
+import Data.IORef (readIORef, modifyIORef, newIORef)
import Data.List (sort, foldl', scanl')
+import Data.Maybe (mapMaybe)
import GHC.Word (Word8)
import Test.Hspec.QuickCheck (prop)
-import Test.QuickCheck (counterexample, Property)
+import Test.QuickCheck (counterexample, Property, withMaxSuccess)
import Test.QuickCheck.Monadic (run, monadicIO, monitor, assert, PropertyM)
import Test.Hspec
@@ -18,6 +23,13 @@ import Streamly
import Streamly.Prelude ((.:), nil)
import qualified Streamly.Prelude as A
+maxTestCount :: Int
+#ifdef DEVBUILD
+maxTestCount = 100
+#else
+maxTestCount = 10
+#endif
+
singleton :: IsStream t => a -> t m a
singleton a = a .: nil
@@ -59,6 +71,166 @@ transformFromList constr eq listOp op a =
let list = listOp a
equals eq stream list
+mvarExcHandler :: String -> BlockedIndefinitelyOnMVar -> IO ()
+mvarExcHandler label BlockedIndefinitelyOnMVar = do
+ error $ label ++ " " ++ "BlockedIndefinitelyOnMVar\n"
+
+stmExcHandler :: String -> BlockedIndefinitelyOnSTM -> IO ()
+stmExcHandler label BlockedIndefinitelyOnSTM = do
+ error $ label ++ " " ++ "BlockedIndefinitelyOnSTM\n"
+
+dbgMVar :: String -> IO () -> IO ()
+dbgMVar label action =
+ action `catches` [ Handler (mvarExcHandler label)
+ , Handler (stmExcHandler label)
+ ]
+
+-- | first n actions takeMVar and the last action performs putMVar n times
+mvarSequenceOp :: MVar () -> Word8 -> Word8 -> IO Word8
+mvarSequenceOp mv n x = do
+ let msg = show x ++ "/" ++ show n
+ if x < n
+ then dbgMVar ("take mvarSequenceOp " ++ msg) (takeMVar mv) >> return x
+ else dbgMVar ("put mvarSequenceOp" ++ msg)
+ (replicateM_ (fromIntegral n) (putMVar mv ())) >> return x
+
+concurrentMapM
+ :: ([Word8] -> t IO Word8)
+ -> ([Word8] -> [Word8] -> Bool)
+ -> (Word8 -> MVar () -> t IO Word8 -> SerialT IO Word8)
+ -> Word8
+ -> Property
+concurrentMapM constr eq op n =
+ monadicIO $ do
+ let list = [0..n]
+ stream <- run $ do
+ mv <- newEmptyMVar :: IO (MVar ())
+ (A.toList . (op n mv)) (constr list)
+ equals eq stream list
+
+concurrentFromFoldable
+ :: IsStream t
+ => ([Word8] -> [Word8] -> Bool)
+ -> (t IO Word8 -> SerialT IO Word8)
+ -> Word8
+ -> Property
+concurrentFromFoldable eq op n =
+ monadicIO $ do
+ let list = [0..n]
+ stream <- run $ do
+ mv <- newEmptyMVar :: IO (MVar ())
+ (A.toList . op) (A.fromFoldableM (map (mvarSequenceOp mv n) list))
+ equals eq stream list
+
+sourceUnfoldrM :: IsStream t => MVar () -> Word8 -> t IO Word8
+sourceUnfoldrM mv n = A.unfoldrM step 0
+ where
+ -- argument must be integer to avoid overflow of word8 at 255
+ step :: Int -> IO (Maybe (Word8, Int))
+ step cnt = do
+ let msg = show cnt ++ "/" ++ show n
+ if cnt > fromIntegral n
+ then return Nothing
+ else do
+ dbgMVar ("put sourceUnfoldrM " ++ msg) (putMVar mv ())
+ return (Just (fromIntegral cnt, cnt + 1))
+
+-- Note that this test is not guaranteed to succeed, because there is no
+-- guarantee of parallelism in case of Async/Ahead streams.
+concurrentUnfoldrM
+ :: IsStream t
+ => ([Word8] -> [Word8] -> Bool)
+ -> (t IO Word8 -> SerialT IO Word8)
+ -> Word8
+ -> Property
+concurrentUnfoldrM eq op n =
+ monadicIO $ do
+ -- XXX we should test empty list case as well
+ let list = [0..n]
+ stream <- run $ do
+ -- putStrLn $ "concurrentUnfoldrM: " ++ show n
+ mv <- newEmptyMVar :: IO (MVar ())
+ cnt <- newIORef 0
+ -- since unfoldr happens in parallel with the stream processing we
+ -- can do two takeMVar in one iteration. If it is not parallel then
+ -- this will not work and the test will fail.
+ A.toList $ do
+ x <- op (sourceUnfoldrM mv n)
+ -- results may not be yielded in order, in case of
+ -- Async/WAsync/Parallel. So we use an increasing count
+ -- instead.
+ i <- A.once $ readIORef cnt
+ A.once $ modifyIORef cnt (+1)
+ let msg = show i ++ "/" ++ show n
+ A.once $ do
+ if even i
+ then do
+ dbgMVar ("first take concurrentUnfoldrM " ++ msg)
+ (takeMVar mv)
+ if n > i
+ then do
+ dbgMVar ("second take concurrentUnfoldrM " ++ msg)
+ (takeMVar mv)
+ else return ()
+ else return ()
+ return x
+ equals eq stream list
+
+concurrentApplication :: Word8 -> Property
+concurrentApplication n =
+ monadicIO $ do
+ -- XXX we should test empty list case as well
+ let list = [0..n]
+ stream <- run $ do
+ -- putStrLn $ "concurrentApplication: " ++ show n
+ mv <- newEmptyMVar :: IO (MVar ())
+ -- since unfoldr happens in parallel with the stream processing we
+ -- can do two takeMVar in one iteration. If it is not parallel then
+ -- this will not work and the test will fail.
+ A.toList $ do
+ sourceUnfoldrM mv n |&
+ (A.mapM $ \x -> do
+ let msg = show x ++ "/" ++ show n
+ if even x
+ then do
+ dbgMVar ("first take concurrentApp " ++ msg)
+ (takeMVar mv)
+ if n > x
+ then dbgMVar ("second take concurrentApp " ++ msg)
+ (takeMVar mv)
+ else return ()
+ else return ()
+ return x)
+ equals (==) stream list
+
+sourceUnfoldrM1 :: IsStream t => Word8 -> t IO Word8
+sourceUnfoldrM1 n = A.unfoldrM step 0
+ where
+ -- argument must be integer to avoid overflow of word8 at 255
+ step :: Int -> IO (Maybe (Word8, Int))
+ step cnt = do
+ if cnt > fromIntegral n
+ then return Nothing
+ else return (Just (fromIntegral cnt, cnt + 1))
+
+concurrentFoldlApplication :: Word8 -> Property
+concurrentFoldlApplication n =
+ monadicIO $ do
+ -- XXX we should test empty list case as well
+ let list = [0..n]
+ stream <- run $ do
+ sourceUnfoldrM1 n |&. A.foldlM' (\xs x -> return (x : xs)) []
+ equals (==) (reverse stream) list
+
+concurrentFoldrApplication :: Word8 -> Property
+concurrentFoldrApplication n =
+ monadicIO $ do
+ -- XXX we should test empty list case as well
+ let list = [0..n]
+ stream <- run $ do
+ sourceUnfoldrM1 n |&. A.foldrM (\x xs -> return (x : xs)) []
+ equals (==) stream list
+
transformCombineFromList
:: Semigroup (t IO Int)
=> ([Int] -> t IO Int)
@@ -152,6 +324,9 @@ transformOps constr desc t eq = do
prop (desc ++ " takeWhile > 0") $
transform (takeWhile (> 0)) $ t . (A.takeWhile (> 0))
+ let f x = if odd x then Just (x + 100) else Nothing
+ prop (desc ++ " mapMaybe") $ transform (mapMaybe f) $ t . (A.mapMaybe f)
+
prop (desc ++ " drop maxBound") $
transform (drop maxBound) $ t . (A.drop maxBound)
prop (desc ++ " drop 0") $ transform (drop 0) $ t . (A.drop 0)
@@ -165,7 +340,29 @@ transformOps constr desc t eq = do
prop (desc ++ " dropWhile > 0") $
transform (dropWhile (> 0)) $ t . (A.dropWhile (> 0))
prop (desc ++ " scan") $ transform (scanl' (+) 0) $ t . (A.scanl' (+) 0)
- prop (desc ++ "reverse") $ transform reverse $ t . A.reverse
+ prop (desc ++ " reverse") $ transform reverse $ t . A.reverse
+
+concurrentOps
+ :: IsStream t
+ => ([Word8] -> t IO Word8)
+ -> String
+ -> (t IO Word8 -> SerialT IO Word8)
+ -> ([Word8] -> [Word8] -> Bool)
+ -> Spec
+concurrentOps constr desc t eq = do
+ prop (desc ++ " fromFoldableM") $ withMaxSuccess maxTestCount $
+ concurrentFromFoldable eq t
+ prop (desc ++ " unfoldrM") $ withMaxSuccess maxTestCount $
+ concurrentUnfoldrM eq t
+ -- we pass it the length of the stream n and an mvar mv.
+ -- The stream is [0..n]. The threads communicate in such a way that the
+ -- actions coming first in the stream are dependent on the last action. So
+ -- if the stream is not processed concurrently it will block forever.
+ -- Note that if the size of the stream is bigger than the thread limit
+ -- then it will block even if it is concurrent.
+ prop (desc ++ " mapM") $ withMaxSuccess maxTestCount $
+ concurrentMapM constr eq $ \n mv stream ->
+ t $ A.mapM (mvarSequenceOp mv n) stream
-- XXX add tests for MonadReader and MonadError etc. In case an SVar is
-- accidentally passed through them.
@@ -304,10 +501,11 @@ applicativeOps
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> ([Int], [Int])
-> Property
-applicativeOps constr t eq (a, b) = monadicIO $ do
- stream <- run ((A.toList . t) ((,) <$> (constr a) <*> (constr b)))
- let list = (,) <$> a <*> b
- equals eq stream list
+applicativeOps constr t eq (a, b) = withMaxSuccess maxTestCount $
+ monadicIO $ do
+ stream <- run ((A.toList . t) ((,) <$> (constr a) <*> (constr b)))
+ let list = (,) <$> a <*> b
+ equals eq stream list
zipApplicative
:: (IsStream t, Applicative (t IO))
@@ -316,14 +514,15 @@ zipApplicative
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> ([Int], [Int])
-> Property
-zipApplicative constr t eq (a, b) = monadicIO $ do
- stream1 <- run ((A.toList . t) ((,) <$> (constr a) <*> (constr b)))
- stream2 <- run ((A.toList . t) (pure (,) <*> (constr a) <*> (constr b)))
- stream3 <- run ((A.toList . t) (A.zipWith (,) (constr a) (constr b)))
- let list = getZipList $ (,) <$> ZipList a <*> ZipList b
- equals eq stream1 list
- equals eq stream2 list
- equals eq stream3 list
+zipApplicative constr t eq (a, b) = withMaxSuccess maxTestCount $
+ monadicIO $ do
+ stream1 <- run ((A.toList . t) ((,) <$> (constr a) <*> (constr b)))
+ stream2 <- run ((A.toList . t) (pure (,) <*> (constr a) <*> (constr b)))
+ stream3 <- run ((A.toList . t) (A.zipWith (,) (constr a) (constr b)))
+ let list = getZipList $ (,) <$> ZipList a <*> ZipList b
+ equals eq stream1 list
+ equals eq stream2 list
+ equals eq stream3 list
zipMonadic
:: (IsStream t, Monad (t IO))
@@ -332,7 +531,7 @@ zipMonadic
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> ([Int], [Int])
-> Property
-zipMonadic constr t eq (a, b) =
+zipMonadic constr t eq (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
stream1 <-
run
@@ -353,7 +552,7 @@ monadThen
-> ([Int] -> [Int] -> Bool)
-> ([Int], [Int])
-> Property
-monadThen constr t eq (a, b) = monadicIO $ do
+monadThen constr t eq (a, b) = withMaxSuccess maxTestCount $ monadicIO $ do
stream <- run ((A.toList . t) ((constr a) >> (constr b)))
let list = a >> b
equals eq stream list
@@ -365,7 +564,7 @@ monadBind
-> ([Int] -> [Int] -> Bool)
-> ([Int], [Int])
-> Property
-monadBind constr t eq (a, b) =
+monadBind constr t eq (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
stream <-
run
@@ -376,6 +575,12 @@ monadBind constr t eq (a, b) =
main :: IO ()
main = hspec $ do
+ let folded :: IsStream t => [a] -> t IO a
+ folded = serially . (\xs ->
+ case xs of
+ [x] -> return x -- singleton stream case
+ _ -> foldMapWith (<>) return xs
+ )
describe "Construction" $ do
-- XXX test for all types of streams
prop "serially replicateM" $ constructWithReplicateM serially
@@ -388,17 +593,13 @@ main = hspec $ do
A.toList . serially . (A.take 100) $ A.iterateM addM (0 :: Int)
`shouldReturn` (take 100 $ iterate (+ 1) 0)
- let folded :: IsStream t => [a] -> t IO a
- folded = serially . (\xs ->
- case xs of
- [x] -> return x -- singleton stream case
- _ -> foldMapWith (<>) return xs
- )
describe "Functor operations" $ do
functorOps A.fromFoldable "serially" serially (==)
functorOps folded "serially folded" serially (==)
functorOps A.fromFoldable "wSerially" wSerially (==)
functorOps folded "wSerially folded" wSerially (==)
+ functorOps A.fromFoldable "aheadly" aheadly (==)
+ functorOps folded "aheadly folded" aheadly (==)
functorOps A.fromFoldable "asyncly" asyncly sortEq
functorOps folded "asyncly folded" asyncly sortEq
functorOps A.fromFoldable "wAsyncly" wAsyncly sortEq
@@ -413,6 +614,7 @@ main = hspec $ do
describe "Semigroup operations" $ do
semigroupOps "serially" serially (==)
semigroupOps "wSerially" wSerially (==)
+ semigroupOps "aheadly" aheadly (==)
semigroupOps "asyncly" asyncly sortEq
semigroupOps "wAsyncly" wAsyncly sortEq
semigroupOps "parallely" parallely sortEq
@@ -425,6 +627,8 @@ main = hspec $ do
-- XXX applicative with three arguments
prop "serially applicative" $ applicativeOps A.fromFoldable serially (==)
prop "serially applicative folded" $ applicativeOps folded serially (==)
+ prop "aheadly applicative" $ applicativeOps A.fromFoldable aheadly (==)
+ prop "aheadly applicative folded" $ applicativeOps folded aheadly (==)
prop "wSerially applicative" $ applicativeOps A.fromFoldable wSerially sortEq
prop "wSerially applicative folded" $ applicativeOps folded wSerially sortEq
prop "asyncly applicative" $ applicativeOps A.fromFoldable asyncly sortEq
@@ -434,10 +638,14 @@ main = hspec $ do
describe "Zip operations" $ do
prop "zipSerially applicative" $ zipApplicative A.fromFoldable zipSerially (==)
- -- XXX this hangs
- -- prop "zipAsyncly applicative" $ zipApplicative zipAsyncly (==)
+ prop "zipSerially applicative folded" $ zipApplicative folded zipSerially (==)
+ prop "zipAsyncly applicative" $ zipApplicative A.fromFoldable zipAsyncly (==)
+ prop "zipAsyncly applicative folded" $ zipApplicative folded zipAsyncly (==)
+
prop "zip monadic serially" $ zipMonadic A.fromFoldable serially (==)
prop "zip monadic serially folded" $ zipMonadic folded serially (==)
+ prop "zip monadic aheadly" $ zipMonadic A.fromFoldable aheadly (==)
+ prop "zip monadic aheadly folded" $ zipMonadic folded aheadly (==)
prop "zip monadic wSerially" $ zipMonadic A.fromFoldable wSerially (==)
prop "zip monadic wSerially folded" $ zipMonadic folded wSerially (==)
prop "zip monadic asyncly" $ zipMonadic A.fromFoldable asyncly (==)
@@ -449,18 +657,21 @@ main = hspec $ do
describe "Monad operations" $ do
prop "serially monad then" $ monadThen A.fromFoldable serially (==)
+ prop "aheadly monad then" $ monadThen A.fromFoldable aheadly (==)
prop "wSerially monad then" $ monadThen A.fromFoldable wSerially sortEq
prop "asyncly monad then" $ monadThen A.fromFoldable asyncly sortEq
prop "wAsyncly monad then" $ monadThen A.fromFoldable wAsyncly sortEq
prop "parallely monad then" $ monadThen A.fromFoldable parallely sortEq
prop "serially monad then folded" $ monadThen folded serially (==)
+ prop "aheadly monad then folded" $ monadThen folded aheadly (==)
prop "wSerially monad then folded" $ monadThen folded wSerially sortEq
prop "asyncly monad then folded" $ monadThen folded asyncly sortEq
prop "wAsyncly monad then folded" $ monadThen folded wAsyncly sortEq
prop "parallely monad then folded" $ monadThen folded parallely sortEq
prop "serially monad bind" $ monadBind A.fromFoldable serially (==)
+ prop "aheadly monad bind" $ monadBind A.fromFoldable aheadly (==)
prop "wSerially monad bind" $ monadBind A.fromFoldable wSerially sortEq
prop "asyncly monad bind" $ monadBind A.fromFoldable asyncly sortEq
prop "wAsyncly monad bind" $ monadBind A.fromFoldable wAsyncly sortEq
@@ -468,6 +679,7 @@ main = hspec $ do
describe "Stream transform operations" $ do
transformOps A.fromFoldable "serially" serially (==)
+ transformOps A.fromFoldable "aheadly" aheadly (==)
transformOps A.fromFoldable "wSerially" wSerially (==)
transformOps A.fromFoldable "zipSerially" zipSerially (==)
transformOps A.fromFoldable "zipAsyncly" zipAsyncly (==)
@@ -476,6 +688,7 @@ main = hspec $ do
transformOps A.fromFoldable "parallely" parallely sortEq
transformOps folded "serially folded" serially (==)
+ transformOps folded "aheadly folded" aheadly (==)
transformOps folded "wSerially folded" wSerially (==)
transformOps folded "zipSerially folded" zipSerially (==)
transformOps folded "zipAsyncly folded" zipAsyncly (==)
@@ -484,6 +697,7 @@ main = hspec $ do
transformOps folded "parallely folded" parallely sortEq
transformOpsWord8 A.fromFoldable "serially" serially
+ transformOpsWord8 A.fromFoldable "aheadly" aheadly
transformOpsWord8 A.fromFoldable "wSerially" wSerially
transformOpsWord8 A.fromFoldable "zipSerially" zipSerially
transformOpsWord8 A.fromFoldable "zipAsyncly" zipAsyncly
@@ -492,6 +706,7 @@ main = hspec $ do
transformOpsWord8 A.fromFoldable "parallely" parallely
transformOpsWord8 folded "serially folded" serially
+ transformOpsWord8 folded "aheadly folded" aheadly
transformOpsWord8 folded "wSerially folded" wSerially
transformOpsWord8 folded "zipSerially folded" zipSerially
transformOpsWord8 folded "zipAsyncly folded" zipAsyncly
@@ -499,8 +714,30 @@ main = hspec $ do
transformOpsWord8 folded "wAsyncly folded" wAsyncly
transformOpsWord8 folded "parallely folded" parallely
+ -- XXX add tests with outputQueue size set to 1
+ describe "Stream concurrent operations" $ do
+ concurrentOps A.fromFoldable "aheadly" aheadly (==)
+ concurrentOps A.fromFoldable "asyncly" asyncly sortEq
+ concurrentOps A.fromFoldable "wAsyncly" wAsyncly sortEq
+ concurrentOps A.fromFoldable "parallely" parallely sortEq
+
+ concurrentOps folded "aheadly folded" aheadly (==)
+ concurrentOps folded "asyncly folded" asyncly sortEq
+ concurrentOps folded "wAsyncly folded" wAsyncly sortEq
+ concurrentOps folded "parallely folded" parallely sortEq
+
+ prop "concurrent application" $ withMaxSuccess maxTestCount $
+ concurrentApplication
+ prop "concurrent foldr application" $ withMaxSuccess maxTestCount $
+ concurrentFoldrApplication
+ prop "concurrent foldl application" $ withMaxSuccess maxTestCount $
+ concurrentFoldlApplication
+
+ -- These tests are specifically targeted towards detecting illegal sharing
+ -- of SVar across conurrent streams.
describe "Stream transform and combine operations" $ do
transformCombineOpsCommon A.fromFoldable "serially" serially (==)
+ transformCombineOpsCommon A.fromFoldable "aheadly" aheadly (==)
transformCombineOpsCommon A.fromFoldable "wSerially" wSerially sortEq
transformCombineOpsCommon A.fromFoldable "zipSerially" zipSerially (==)
transformCombineOpsCommon A.fromFoldable "zipAsyncly" zipAsyncly (==)
@@ -508,12 +745,23 @@ main = hspec $ do
transformCombineOpsCommon A.fromFoldable "wAsyncly" wAsyncly sortEq
transformCombineOpsCommon A.fromFoldable "parallely" parallely sortEq
+ transformCombineOpsCommon folded "serially" serially (==)
+ transformCombineOpsCommon folded "aheadly" aheadly (==)
+ transformCombineOpsCommon folded "wSerially" wSerially sortEq
+ transformCombineOpsCommon folded "zipSerially" zipSerially (==)
+ transformCombineOpsCommon folded "zipAsyncly" zipAsyncly (==)
+ transformCombineOpsCommon folded "asyncly" asyncly sortEq
+ transformCombineOpsCommon folded "wAsyncly" wAsyncly sortEq
+ transformCombineOpsCommon folded "parallely" parallely sortEq
+
transformCombineOpsOrdered A.fromFoldable "serially" serially (==)
+ transformCombineOpsOrdered A.fromFoldable "serially" aheadly (==)
transformCombineOpsOrdered A.fromFoldable "zipSerially" zipSerially (==)
transformCombineOpsOrdered A.fromFoldable "zipAsyncly" zipAsyncly (==)
describe "Stream elimination operations" $ do
eliminationOps A.fromFoldable "serially" serially
+ eliminationOps A.fromFoldable "aheadly" aheadly
eliminationOps A.fromFoldable "wSerially" wSerially
eliminationOps A.fromFoldable "zipSerially" zipSerially
eliminationOps A.fromFoldable "zipAsyncly" zipAsyncly
@@ -522,6 +770,7 @@ main = hspec $ do
eliminationOps A.fromFoldable "parallely" parallely
eliminationOps folded "serially folded" serially
+ eliminationOps folded "aheadly folded" aheadly
eliminationOps folded "wSerially folded" wSerially
eliminationOps folded "zipSerially folded" zipSerially
eliminationOps folded "zipAsyncly folded" zipAsyncly
@@ -529,13 +778,15 @@ main = hspec $ do
eliminationOps folded "wAsyncly folded" wAsyncly
eliminationOps folded "parallely folded" parallely
- describe "Stream elimination operations" $ do
+ describe "Stream serial elimination operations" $ do
serialEliminationOps A.fromFoldable "serially" serially
+ serialEliminationOps A.fromFoldable "aheadly" aheadly
serialEliminationOps A.fromFoldable "wSerially" wSerially
serialEliminationOps A.fromFoldable "zipSerially" zipSerially
serialEliminationOps A.fromFoldable "zipAsyncly" zipAsyncly
serialEliminationOps folded "serially folded" serially
+ serialEliminationOps folded "aheadly folded" aheadly
serialEliminationOps folded "wSerially folded" wSerially
serialEliminationOps folded "zipSerially folded" zipSerially
serialEliminationOps folded "zipAsyncly folded" zipAsyncly