summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharendra <>2018-10-26 20:11:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-10-26 20:11:00 (GMT)
commit31acd2a2ea16885665c1b2bec291c62c88a94824 (patch)
tree592600db3f991f299d9c23f6d5f1a0f6393adbe3
parent253a299b8e37c3f5b2ac3c67e7e460a2a37e951e (diff)
version 0.5.20.5.2
-rw-r--r--Changelog.md15
-rw-r--r--README.md277
-rwxr-xr-xbench.sh313
-rw-r--r--benchmark/BaseStreams.hs2
-rw-r--r--benchmark/Chart.hs200
-rw-r--r--benchmark/ChartLinear.hs54
-rw-r--r--benchmark/ChartNested.hs46
-rw-r--r--benchmark/Linear.hs89
-rw-r--r--benchmark/LinearAsync.hs92
-rw-r--r--benchmark/LinearOps.hs43
-rw-r--r--benchmark/LinearRate.hs60
-rw-r--r--benchmark/Nested.hs15
-rw-r--r--benchmark/NestedOps.hs12
-rw-r--r--benchmark/StreamDOps.hs18
-rw-r--r--benchmark/StreamKOps.hs34
-rw-r--r--docs/streamly-vs-async.md230
-rw-r--r--docs/transformers.md32
-rw-r--r--examples/AcidRain.hs9
-rw-r--r--examples/CirclingSquare.hs5
-rw-r--r--examples/ControlFlow.hs309
-rw-r--r--examples/ListDir.hs4
-rw-r--r--examples/MergeSort.hs8
-rw-r--r--examples/SearchQuery.hs2
-rw-r--r--src/Streamly/Internal.hs19
-rw-r--r--src/Streamly/Prelude.hs162
-rw-r--r--src/Streamly/SVar.hs590
-rw-r--r--src/Streamly/Streams/Ahead.hs171
-rw-r--r--src/Streamly/Streams/Async.hs139
-rw-r--r--src/Streamly/Streams/Parallel.hs11
-rw-r--r--src/Streamly/Streams/Prelude.hs2
-rw-r--r--src/Streamly/Streams/SVar.hs76
-rw-r--r--src/Streamly/Streams/Serial.hs13
-rw-r--r--src/Streamly/Streams/StreamD.hs19
-rw-r--r--src/Streamly/Streams/StreamK.hs125
-rw-r--r--src/Streamly/Streams/Zip.hs5
-rw-r--r--stack-7.10.yaml2
-rw-r--r--stack-8.0.yaml2
-rw-r--r--stack.yaml6
-rw-r--r--streamly.cabal137
-rw-r--r--test/Main.hs306
-rw-r--r--test/MaxRate.hs34
-rw-r--r--test/Prop.hs426
-rw-r--r--test/loops.hs13
-rw-r--r--test/nested-loops.hs7
-rw-r--r--test/parallel-loops.hs1
45 files changed, 2696 insertions, 1439 deletions
diff --git a/Changelog.md b/Changelog.md
index f588053..d246c09 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -1,3 +1,18 @@
+## 0.5.2
+
+### Bug Fixes
+
+* Cleanup any pending threads when an exception occurs.
+* Fixed a livelock in ahead style streams. The problem manifests sometimes when
+ multiple streams are merged together in ahead style and one of them is a nil
+ stream.
+* As per expected concurrency semantics each forked concurrent task must run
+ with the monadic state captured at the fork point. This release fixes a bug,
+ which, in some cases caused an incorrect monadic state to be used for a
+ concurrent action, leading to unexpected behavior when concurrent streams are
+ used in a stateful monad e.g. `StateT`. Particularly, this bug cannot affect
+ `ReaderT`.
+
## 0.5.1
* Performance improvements, especially space consumption, for concurrent
diff --git a/README.md b/README.md
index 477a241..e6ea1dd 100644
--- a/README.md
+++ b/README.md
@@ -1,69 +1,122 @@
# Streamly
-## Stream`ing` `Concurrent`ly
+## Streaming Concurrently
+
+Haskell lists express pure computations using composable stream operations like
+`:`, `unfold`, `map`, `filter`, `zip` and `fold`. Streamly is exactly like
+lists except that it can express sequences of pure as well as monadic
+computations aka streams. More importantly, it can express monadic sequences
+with concurrent execution semantics without introducing any additional APIs.
+
+Streamly expresses concurrency using standard, well known abstractions.
+Concurrency semantics are defined for list operations, semigroup, applicative
+and monadic compositions. Programmer does not need to know any low level
+notions of concurrency like threads, locking or synchronization. Concurrent
+and non-concurrent programs are fundamentally the same. A chosen segment of
+the program can be made concurrent by annotating it with an appropriate
+combinator. We can choose a combinator for lookahead style or asynchronous
+concurrency. Concurrency is automatically scaled up or down based on the
+demand from the consumer application, we can finally say goodbye to managing
+thread pools and associated sizing issues. The result is truly fearless
+and declarative monadic concurrency.
+
+## Where to use streamly?
+
+Streamly is a general purpose programming framwework. It can be used equally
+efficiently from a simple `Hello World!` program to a massively concurrent
+application. The answer to the question, "where to use streamly?" - would be
+similar to the answer to - "Where to use Haskell lists or the IO monad?".
+Streamly generalizes lists to monadic streams, and the `IO` monad to
+non-deterministic and concurrent stream composition. The `IO` monad is a
+special case of streamly; if we use single element streams the behavior of
+streamly becomes identical to the IO monad. The IO monad code can be replaced
+with streamly by just prefixing the IO actions with `liftIO`, without any other
+changes, and without any loss of performance. Pure lists too are a special
+case of streamly; if we use `Identity` as the underlying monad, streamly
+streams turn into pure lists. Non-concurrent programs are just a special case
+of concurrent ones, simply adding a combinator turns a non-concurrent program
+into a concurrent one.
+
+In other words, streamly combines the functionality of lists and IO, with
+builtin concurrency. If you want to write a program that involves IO,
+concurrent or not, then you can just use streamly as the base monad, in fact,
+you could even use streamly for pure computations, as streamly performs at par
+with pure lists or `vector`.
+
+## Why data flow programming?
+
+If you need some convincing for using streaming or data flow programming
+paradigm itself then try to answer this question - why do we use lists in
+Haskell? It boils down to why we use functional programming in the first place.
+Haskell is successful in enforcing the functional data flow paradigm for pure
+computations using lists, but not for monadic computations. In the absence of a
+standard and easy to use data flow programming paradigm for monadic
+computations, and the IO monad providing an escape hatch to an imperative
+model, we just love to fall into the imperative trap, and start asking the same
+fundamental question again - why do we have to use the streaming data model?
+
+## Show me an example
+
+Here is an IO monad code to list a directory recursively:
+
+```haskell
+import Control.Monad.IO.Class (liftIO)
+import Path.IO (listDir, getCurrentDir) -- from path-io package
+
+listDirRecursive = getCurrentDir >>= readdir
+ where
+ readdir dir = do
+ (dirs, files) <- listDir dir
+ liftIO $ mapM_ putStrLn
+ $ map show dirs ++ map show files
+ foldMap readdir dirs
+```
-Streamly, short for streaming concurrently, provides monadic streams, with a
-simple API, almost identical to standard lists and vector, and an in-built
-support for concurrency. By using stream-style combinators on stream
-composition, streams can be generated, merged, chained, mapped, zipped, and
-consumed concurrently – providing a generalized high level programming
-framework unifying streaming and concurrency. Controlled concurrency allows
-even infinite streams to be evaluated concurrently. Concurrency is auto scaled
-based on feedback from the stream consumer. The programmer does not have to be
-aware of threads, locking or synchronization to write scalable concurrent
-programs.
+This is your usual IO monad code, with no streamly specific code whatsoever.
+This is how you can run this:
-The basic streaming functionality of streamly is equivalent to that provided by
-streaming libraries like
-[vector](https://hackage.haskell.org/package/vector),
-[streaming](https://hackage.haskell.org/package/streaming),
-[pipes](https://hackage.haskell.org/package/pipes), and
-[conduit](https://hackage.haskell.org/package/conduit).
-In addition to providing streaming functionality, streamly subsumes
-the functionality of list transformer libraries like `pipes` or
-[list-t](https://hackage.haskell.org/package/list-t), and also the logic
-programming library [logict](https://hackage.haskell.org/package/logict). On
-the concurrency side, it subsumes the functionality of the
-[async](https://hackage.haskell.org/package/async) package, and provides even
-higher level concurrent composition. Because it supports
-streaming with concurrency we can write FRP applications similar in concept to
-[Yampa](https://hackage.haskell.org/package/Yampa) or
-[reflex](https://hackage.haskell.org/package/reflex).
+``` haskell
+main :: IO ()
+main = listDirRecursive
+```
-Why use streamly?
+And, this is how you can run exactly the same code using streamly with
+lookahead style concurrency, the only difference is that this time multiple
+directories are read concurrently:
- * _Simplicity_: Simple list like streaming API, if you know how to use lists
- then you know how to use streamly. This library is built with simplicity
- and ease of use as a design goal.
- * _Concurrency_: Simple, powerful, and scalable concurrency. Concurrency is
- built-in, and not intrusive, concurrent programs are written exactly the
- same way as non-concurrent ones.
- * _Generality_: Unifies functionality provided by several disparate packages
- (streaming, concurrency, list transformer, logic programming, reactive
- programming) in a concise API.
- * _Performance_: Streamly is designed for high performance. It employs stream
- fusion optimizations for best possible performance. Serial peformance is
- equivalent to the venerable `vector` library in most cases and even better
- in some cases. Concurrent performance is unbeatable. See
- [streaming-benchmarks](https://github.com/composewell/streaming-benchmarks)
- for a comparison of popular streaming libraries on micro-benchmarks.
+``` haskell
+import Streamly (runStream, aheadly)
+
+main :: IO ()
+main = runStream $ aheadly $ listDirRecursive
+```
+
+Isn't that magical? What's going on here? Streamly does not introduce any new
+abstractions, it just uses standard abstractions like `Semigroup` or
+`Monoid` to combine monadic streams concurrently, the way lists combine a
+sequence of pure values non-concurrently. The `foldMap` in the code
+above turns into a concurrent monoidal composition of a stream of `readdir`
+computations.
+
+## How does it perform?
+
+Providing monadic streaming and high level declarative concurrency does not
+mean that `streamly` compromises with performance in any way. The
+non-concurrent performance of `streamly` competes with lists and the `vector`
+library. The concurrent performance is as good as it gets, see [concurrency
+benchmarks](https://github.com/composewell/concurrency-benchmarks) for detailed
+performance results and a comparison with the `async` package.
The following chart shows a summary of the cost of key streaming operations
-processing a million elements. The timings for streamly and vector are in the
-600-700 microseconds range and therefore can barely be seen in the graph.
+processing a million elements. The timings for `streamly` and `vector` are in
+the 600-700 microseconds range and therefore can barely be seen in the graph.
+For more details, see [streaming
+benchmarks](https://github.com/composewell/streaming-benchmarks).
![Streaming Operations at a Glance](charts-0/KeyOperations-time.svg)
## Streaming Pipelines
-Unlike `pipes` or `conduit` and like `vector` and `streaming`, `streamly`
-composes stream data instead of stream processors (functions). A stream is
-just like a list and is explicitly passed around to functions that process the
-stream. Therefore, no special operator is needed to join stages in a streaming
-pipeline, just the standard function application (`$`) or reverse function
-application (`&`) operator is enough. Combinators are provided in
-`Streamly.Prelude` to transform or fold streams.
-
The following snippet provides a simple stream composition example that reads
numbers from stdin, prints the squares of even numbers and exits if an even
number more than 9 is entered.
@@ -82,6 +135,14 @@ main = runStream $
& S.mapM print
```
+Unlike `pipes` or `conduit` and like `vector` and `streaming`, `streamly`
+composes stream data instead of stream processors (functions). A stream is
+just like a list and is explicitly passed around to functions that process the
+stream. Therefore, no special operator is needed to join stages in a streaming
+pipeline, just the standard function application (`$`) or reverse function
+application (`&`) operator is enough. Combinators are provided in
+`Streamly.Prelude` to transform or fold streams.
+
## Concurrent Stream Generation
Monadic construction and generation functions e.g. `consM`, `unfoldrM`,
@@ -263,28 +324,6 @@ main = do
print s
```
-Of course, the actions running in parallel could be arbitrary IO actions. For
-example, to concurrently list the contents of a directory tree recursively:
-
-``` haskell
-import Path.IO (listDir, getCurrentDir)
-import Streamly
-import qualified Streamly.Prelude as S
-
-main = runStream $ aheadly $ getCurrentDir >>= readdir
- where readdir d = do
- (dirs, files) <- S.yieldM $ listDir d
- S.yieldM $ mapM_ putStrLn $ map show files
- -- read the subdirs concurrently, (<>) is concurrent
- foldMap readdir dirs
-```
-
-In the above examples we do not think in terms of threads, locking or
-synchronization, rather we think in terms of what can run in parallel, the rest
-is taken care of automatically. When using `aheadly` the programmer does
-not have to worry about how many threads are to be created, they are
-automatically adjusted based on the demand of the consumer.
-
The concurrency facilities provided by streamly can be compared with
[OpenMP](https://en.wikipedia.org/wiki/OpenMP) and
[Cilk](https://en.wikipedia.org/wiki/Cilk) but with a more declarative
@@ -312,6 +351,22 @@ rate. Rate control works precisely even at throughputs as high as millions of
yields per second. For more sophisticated rate control see the haddock
documentation.
+## Exceptions
+
+From a library user point of view, there is nothing much to learn or talk about
+exceptions. Synchronous exceptions work just the way they are supposed to work
+in any standard non-concurrent code. When concurrent streams are combined
+together, exceptions from the constituent streams are propagated to the
+consumer stream. When an exception occurs in any of the constituent streams
+other concurrent streams are promptly terminated. Exceptions can be thrown
+using the `MonadThrow` instance.
+
+There is no notion of explicit threads in streamly, therefore, no
+asynchronous exceptions to deal with. You can just ignore the zillions of
+blogs, talks, caveats about async exceptions. Async exceptions just don't
+exist. Please don't use things like `myThreadId` and `throwTo` just for fun!
+
+
## Reactive Programming (FRP)
Streamly is a foundation for first class reactive programming as well by virtue
@@ -321,17 +376,75 @@ for a console based FRP game example and
[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
for an SDL based animation example.
+## Conclusion
+
+Streamly, short for streaming concurrently, provides monadic streams, with a
+simple API, almost identical to standard lists, and an in-built
+support for concurrency. By using stream-style combinators on stream
+composition, streams can be generated, merged, chained, mapped, zipped, and
+consumed concurrently – providing a generalized high level programming
+framework unifying streaming and concurrency. Controlled concurrency allows
+even infinite streams to be evaluated concurrently. Concurrency is auto scaled
+based on feedback from the stream consumer. The programmer does not have to be
+aware of threads, locking or synchronization to write scalable concurrent
+programs.
+
+Streamly is a programmer first library, designed to be useful and friendly to
+programmers for solving practical problems in a simple and concise manner. Some
+key points in favor of streamly are:
+
+ * _Simplicity_: Simple list like streaming API, if you know how to use lists
+ then you know how to use streamly. This library is built with simplicity
+ and ease of use as a design goal.
+ * _Concurrency_: Simple, powerful, and scalable concurrency. Concurrency is
+ built-in, and not intrusive, concurrent programs are written exactly the
+ same way as non-concurrent ones.
+ * _Generality_: Unifies functionality provided by several disparate packages
+ (streaming, concurrency, list transformer, logic programming, reactive
+ programming) in a concise API.
+ * _Performance_: Streamly is designed for high performance. It employs stream
+ fusion optimizations for best possible performance. Serial peformance is
+ equivalent to the venerable `vector` library in most cases and even better
+ in some cases. Concurrent performance is unbeatable. See
+ [streaming-benchmarks](https://github.com/composewell/streaming-benchmarks)
+ for a comparison of popular streaming libraries on micro-benchmarks.
+
+The basic streaming functionality of streamly is equivalent to that provided by
+streaming libraries like
+[vector](https://hackage.haskell.org/package/vector),
+[streaming](https://hackage.haskell.org/package/streaming),
+[pipes](https://hackage.haskell.org/package/pipes), and
+[conduit](https://hackage.haskell.org/package/conduit).
+In addition to providing streaming functionality, streamly subsumes
+the functionality of list transformer libraries like `pipes` or
+[list-t](https://hackage.haskell.org/package/list-t), and also the logic
+programming library [logict](https://hackage.haskell.org/package/logict). On
+the concurrency side, it subsumes the functionality of the
+[async](https://hackage.haskell.org/package/async) package, and provides even
+higher level concurrent composition. Because it supports
+streaming with concurrency we can write FRP applications similar in concept to
+[Yampa](https://hackage.haskell.org/package/Yampa) or
+[reflex](https://hackage.haskell.org/package/reflex).
+
+See the `Comparison with existing packages` section at the end of the
+[tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html).
+
## Further Reading
For more information, see:
- * [A comprehensive tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
- * [Some practical examples](https://github.com/composewell/streamly/tree/master/examples)
- * See the `Comparison with existing packages` section at the end of the
- [tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
- * [Streaming benchmarks comparing streamly with other streaming libraries](https://github.com/composewell/streaming-benchmarks)
- * [Quick tutorial comparing streamly with the async package](https://github.com/composewell/streamly/blob/master/docs/Async.md)
- * [Concurrency benchmarks comparing streamly with async](https://github.com/composewell/concurrency-benchmarks)
+ * [Detailed tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
+ * [Reference documentation](https://hackage.haskell.org/package/streamly)
+ * [Examples](https://github.com/composewell/streamly/tree/master/examples)
+ * [Guides](https://github.com/composewell/streamly/blob/master/docs)
+ * [Streaming benchmarks](https://github.com/composewell/streaming-benchmarks)
+ * [Concurrency benchmarks](https://github.com/composewell/concurrency-benchmarks)
+
+## Support
+
+If you require professional support, consulting, training or timely
+enhancements to the library please contact
+[support@composewell.com](mailto:support@composewell.com).
## Contributing
diff --git a/bench.sh b/bench.sh
index 6ed2a18..ee63912 100755
--- a/bench.sh
+++ b/bench.sh
@@ -2,12 +2,17 @@
print_help () {
echo "Usage: $0 "
- echo " [--quick] [--append] "
- echo " [--no-graphs] [--no-measure]"
- echo " [--benchmark <linear|nested>]"
echo " [--compare] [--base commit] [--candidate commit]"
+ echo " [--benchmarks <all|linear|linear-async|linear-rate|nested|base>]"
+ echo " [--graphs]"
+ echo " [--slow]"
+ echo " [--no-measure]"
+ echo " [--append] "
echo " -- <gauge options>"
echo
+ echo "Multiple benchmarks can be specified as a space separate list"
+ echo " e.g. --benchmarks \"linear nested\""
+ echo
echo "When using --compare, by default comparative chart of HEAD^ vs HEAD"
echo "commit is generated, in the 'charts' directory."
echo "Use --base and --candidate to select the commits to compare."
@@ -23,61 +28,58 @@ die () {
exit 1
}
-DEFAULT_BENCHMARK=linear
-COMPARE=0
-
-while test -n "$1"
-do
- case $1 in
- -h|--help|help) print_help ;;
- --quick) QUICK=1; shift ;;
- --append) APPEND=1; shift ;;
- --benchmark) shift; BENCHMARK=$1; shift ;;
- --base) shift; BASE=$1; shift ;;
- --candidate) shift; CANDIDATE=$1; shift ;;
- --compare) COMPARE=1; shift ;;
- --no-graphs) GRAPH=0; shift ;;
- --no-measure) MEASURE=0; shift ;;
- --) shift; break ;;
- -*|--*) print_help ;;
- *) break ;;
- esac
-done
+set_benchmarks() {
+ if test -z "$BENCHMARKS"
+ then
+ BENCHMARKS=$DEFAULT_BENCHMARKS
+ elif test "$BENCHMARKS" = "all"
+ then
+ BENCHMARKS=$ALL_BENCHMARKS
+ fi
+ echo "Using benchmark suites [$BENCHMARKS]"
+}
-GAUGE_ARGS=$*
+# $1: benchmark name (linear, nested, base)
+find_report_prog() {
+ local prog_name="chart"
+ hash -r
+ local prog_path=$($STACK exec which $prog_name)
+ if test -x "$prog_path"
+ then
+ echo $prog_path
+ else
+ return 1
+ fi
+}
-if test -z "$BENCHMARK"
-then
- BENCHMARK=$DEFAULT_BENCHMARK
- echo "Using default benchmark suite [$BENCHMARK], use --benchmark to specify another"
-else
- echo "Using benchmark suite [$BENCHMARK]"
-fi
+# $1: benchmark name (linear, nested, base)
+build_report_prog() {
+ local prog_name="chart"
+ local prog_path=$($STACK exec which $prog_name)
-STACK=stack
-echo "Using stack command [$STACK]"
-
-# We build it first at the current commit before checking out any other commit
-# for benchmarking.
-if test "$GRAPH" != "0"
-then
- CHART_PROG="chart-$BENCHMARK"
- prog=$($STACK exec which $CHART_PROG)
- hash -r
- if test ! -x "$prog"
- then
- echo "Building charting executable"
- $STACK build --flag "streamly:dev" || die "build failed"
- fi
+ hash -r
+ if test ! -x "$prog_path" -a "$BUILD_ONCE" = "0"
+ then
+ echo "Building bench-graph executables"
+ BUILD_ONCE=1
+ $STACK build --flag "streamly:dev" || die "build failed"
+ elif test ! -x "$prog_path"
+ then
+ return 1
+ fi
+ return 0
+}
- prog=$($STACK exec which $CHART_PROG)
- if test ! -x "$prog"
+build_report_progs() {
+ if test "$RAW" = "0"
then
- die "Could not find [$CHART_PROG] executable"
+ build_report_prog || exit 1
+ local prog
+ prog=$(find_report_prog) || \
+ die "Cannot find bench-graph executable"
+ echo "Using bench-graph executable [$prog]"
fi
- CHART_PROG=$prog
- echo "Using chart executable [$CHART_PROG]"
-fi
+}
# We run the benchmarks in isolation in a separate process so that different
# benchmarks do not interfere with other. To enable that we need to pass the
@@ -89,16 +91,21 @@ fi
# find .stack-work/ -type f -name "benchmarks"
find_bench_prog () {
- BENCH_PROG=`$STACK path --dist-dir`/build/$BENCHMARK/$BENCHMARK
- if test ! -x "$BENCH_PROG"
+ local bench_name=$1
+ local bench_prog=`$STACK path --dist-dir`/build/$bench_name/$bench_name
+ if test -x "$bench_prog"
then
- echo
- echo "WARNING! benchmark binary [$BENCH_PROG] not found or not executable"
- echo "WARNING! not using isolated measurement."
- echo
+ echo $bench_prog
+ else
+ return 1
fi
}
+bench_output_file() {
+ local bench_name=$1
+ echo "charts/$bench_name/results.csv"
+}
+
# --min-duration 0 means exactly one iteration per sample. We use a million
# iterations in the benchmarking code explicitly and do not use the iterations
# done by the benchmarking tool.
@@ -112,44 +119,33 @@ find_bench_prog () {
# We can pass --min-samples value from the command line as second argument
# after the benchmark name in case we want to use more than one sample.
-if test "$QUICK" = "1"
-then
- ENABLE_QUICK="--quick"
-fi
+run_bench () {
+ local bench_name=$1
+ local output_file=$(bench_output_file $bench_name)
+ local bench_prog
+ bench_prog=$(find_bench_prog $bench_name) || \
+ die "Cannot find benchmark executable for benchmark $bench_name"
-OUTPUT_FILE="charts/results.csv"
+ mkdir -p `dirname $output_file`
-run_bench () {
- $STACK build --bench --no-run-benchmarks || die "build failed"
- find_bench_prog
- mkdir -p charts
-
- # We set min-samples to 3 if we use less than three samples, statistical
- # analysis crashes. Note that the benchmark runs for a minimum of 5 seconds.
- # We use min-duration=0 to run just one iteration for each sample. Anyway the
- # default is to run iterations worth minimum 30 ms and most of our benchmarks
- # are close to that or more.
- $BENCH_PROG $ENABLE_QUICK \
- --include-first-iter \
- --min-samples 3 \
- --min-duration 0 \
- --match exact \
- --csvraw=$OUTPUT_FILE \
+ echo "Running benchmark $bench_name ..."
+
+ $bench_prog $SPEED_OPTIONS \
+ --csvraw=$output_file \
-v 2 \
- --measure-with $BENCH_PROG $GAUGE_ARGS || die "Benchmarking failed"
+ --measure-with $bench_prog $GAUGE_ARGS || die "Benchmarking failed"
}
-if test "$MEASURE" != "0"
- then
- if test -e $OUTPUT_FILE -a "$APPEND" != 1
- then
- mv -f -v $OUTPUT_FILE ${OUTPUT_FILE}.prev
- fi
+run_benches() {
+ for i in $1
+ do
+ run_bench $i
+ done
+}
+
+run_benches_comparing() {
+ local bench_list=$1
- if test "$COMPARE" = "0"
- then
- run_bench
- else
if test -z "$CANDIDATE"
then
CANDIDATE=$(git rev-parse HEAD)
@@ -159,20 +155,133 @@ if test "$MEASURE" != "0"
# XXX Should be where the current branch is forked from master
BASE="$CANDIDATE^"
fi
- echo "Checking out base commit for benchmarking"
- git checkout "$BASE" || die "Checkout of base commit failed"
- run_bench
- echo "Checking out candidate commit for benchmarking"
- git checkout "$CANDIDATE" || die "Checkout of candidate commit failed"
- run_bench
+ echo "Comparing baseline commit [$BASE] with candidate [$CANDIDATE]"
+ echo "Checking out base commit [$BASE] for benchmarking"
+ git checkout "$BASE" || die "Checkout of base commit [$BASE] failed"
+
+ $STACK build --bench --no-run-benchmarks || die "build failed"
+ run_benches "$bench_list"
+
+ echo "Checking out candidate commit [$CANDIDATE] for benchmarking"
+ git checkout "$CANDIDATE" || \
+ die "Checkout of candidate [$CANDIDATE] commit failed"
+
+ $STACK build --bench --no-run-benchmarks || die "build failed"
+ run_benches "$bench_list"
+ # XXX reset back to the original commit
+}
+
+backup_output_file() {
+ local bench_name=$1
+ local output_file=$(bench_output_file $bench_name)
+
+ if test -e $output_file -a "$APPEND" != 1
+ then
+ mv -f -v $output_file ${output_file}.prev
fi
-fi
+}
+
+run_measurements() {
+ local bench_list=$1
+
+ for i in $bench_list
+ do
+ backup_output_file $i
+ done
+
+ if test "$COMPARE" = "0"
+ then
+ run_benches "$bench_list"
+ else
+ run_benches_comparing "$bench_list"
+ fi
+}
+
+run_reports() {
+ local prog
+ prog=$(find_report_prog) || \
+ die "Cannot find bench-graph executable"
+ echo
+
+ for i in $1
+ do
+ echo "Generating reports for ${i}..."
+ $prog --benchmark $i
+ done
+}
+
+#-----------------------------------------------------------------------------
+# Execution starts here
+#-----------------------------------------------------------------------------
+
+DEFAULT_BENCHMARKS="linear"
+ALL_BENCHMARKS="linear linear-async linear-rate nested base"
+
+COMPARE=0
+BASE=
+CANDIDATE=
+
+APPEND=0
+RAW=0
+GRAPH=0
+MEASURE=1
+SPEED_OPTIONS="--quick --min-samples 10 --time-limit 1 --min-duration 0"
+
+STACK=stack
+GAUGE_ARGS=
-if test "$GRAPH" != "0"
+BUILD_ONCE=0
+
+#-----------------------------------------------------------------------------
+# Read command line
+#-----------------------------------------------------------------------------
+
+while test -n "$1"
+do
+ case $1 in
+ -h|--help|help) print_help ;;
+ --slow) SPEED_OPTIONS="--min-duration 0"; shift ;;
+ --append) APPEND=1; shift ;;
+ --benchmarks) shift; BENCHMARKS=$1; shift ;;
+ --base) shift; BASE=$1; shift ;;
+ --candidate) shift; CANDIDATE=$1; shift ;;
+ --compare) COMPARE=1; shift ;;
+ --raw) RAW=1; shift ;;
+ --graphs) GRAPH=1; shift ;;
+ --no-measure) MEASURE=0; shift ;;
+ --) shift; break ;;
+ -*|--*) print_help ;;
+ *) break ;;
+ esac
+done
+GAUGE_ARGS=$*
+
+echo "Using stack command [$STACK]"
+set_benchmarks
+
+#-----------------------------------------------------------------------------
+# Build stuff
+#-----------------------------------------------------------------------------
+
+# We need to build the report progs first at the current (latest) commit before
+# checking out any other commit for benchmarking.
+build_report_progs "$BENCHMARKS"
+
+#-----------------------------------------------------------------------------
+# Run benchmarks
+#-----------------------------------------------------------------------------
+
+if test "$MEASURE" = "1"
then
- echo
- echo "Generating charts from ${OUTPUT_FILE}..."
- $CHART_PROG
+ $STACK build --bench --no-run-benchmarks || die "build failed"
+ run_measurements "$BENCHMARKS"
fi
-# XXX reset back to the original commit
+#-----------------------------------------------------------------------------
+# Run reports
+#-----------------------------------------------------------------------------
+
+if test "$RAW" = "0"
+then
+ run_reports "$BENCHMARKS"
+fi
diff --git a/benchmark/BaseStreams.hs b/benchmark/BaseStreams.hs
index 3fce088..86a7e99 100644
--- a/benchmark/BaseStreams.hs
+++ b/benchmark/BaseStreams.hs
@@ -31,7 +31,7 @@ _benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
-}
main :: IO ()
-main = do
+main =
defaultMain
[ bgroup "streamD"
[ bgroup "generation"
diff --git a/benchmark/Chart.hs b/benchmark/Chart.hs
new file mode 100644
index 0000000..cf0c37d
--- /dev/null
+++ b/benchmark/Chart.hs
@@ -0,0 +1,200 @@
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+module Main where
+
+import Control.Exception (handle, catch, SomeException, ErrorCall(..))
+import Control.Monad.Trans.State
+import Control.Monad.Trans.Maybe
+import Data.List
+import Data.List.Split
+import Data.Ord (comparing)
+import System.Environment (getArgs)
+import Control.Monad.IO.Class (liftIO)
+import Control.Monad (mzero)
+
+import BenchShow
+
+------------------------------------------------------------------------------
+-- Command line parsing
+------------------------------------------------------------------------------
+
+data BenchType = Linear | LinearAsync | LinearRate | Nested | Base
+
+data Options = Options
+ { genGraphs :: Bool
+ , benchType :: BenchType
+ }
+
+defaultOptions = Options False Linear
+
+setGenGraphs val = do
+ (args, opts) <- get
+ put (args, opts { genGraphs = val })
+
+setBenchType val = do
+ (args, opts) <- get
+ put (args, opts { benchType = val })
+
+-- Like the shell "shift" to shift the command line arguments
+shift :: StateT ([String], Options) (MaybeT IO) (Maybe String)
+shift = do
+ s <- get
+ case s of
+ ([], _) -> return Nothing
+ (x : xs, opts) -> put (xs, opts) >> return (Just x)
+
+parseBench :: StateT ([String], Options) (MaybeT IO) ()
+parseBench = do
+ x <- shift
+ case x of
+ Just "linear" -> setBenchType Linear
+ Just "linear-async" -> setBenchType LinearAsync
+ Just "linear-rate" -> setBenchType LinearRate
+ Just "nested" -> setBenchType Nested
+ Just "base" -> setBenchType Base
+ Just str -> do
+ liftIO $ putStrLn $ "unrecognized benchmark type " <> str
+ mzero
+ Nothing -> do
+ liftIO $ putStrLn "please provide a benchmark type "
+ mzero
+
+-- totally imperative style option parsing
+parseOptions :: IO (Maybe Options)
+parseOptions = do
+ args <- getArgs
+ runMaybeT $ flip evalStateT (args, defaultOptions) $ do
+ x <- shift
+ case x of
+ Just "--graphs" -> setGenGraphs True
+ Just "--benchmark" -> parseBench
+ Just str -> do
+ liftIO $ putStrLn $ "Unrecognized option " <> str
+ mzero
+ Nothing -> return ()
+ fmap snd get
+
+ignoringErr a = catch a (\(ErrorCall err :: ErrorCall) ->
+ putStrLn $ "Failed with error:\n" <> err <> "\nSkipping.")
+
+------------------------------------------------------------------------------
+-- Linear composition charts
+------------------------------------------------------------------------------
+
+makeLinearGraphs :: Config -> String -> IO ()
+makeLinearGraphs cfg inputFile = do
+ ignoringErr $ graph inputFile "operations" $ cfg
+ { title = Just "Streamly operations"
+ , classifyBenchmark = \b ->
+ if not ("serially/" `isPrefixOf` b)
+ || "/generation" `isInfixOf` b
+ || "/compose" `isInfixOf` b
+ || "/concat" `isSuffixOf` b
+ then Nothing
+ else Just ("Streamly", last $ splitOn "/" b)
+ }
+
+ ignoringErr $ graph inputFile "generation" $ cfg
+ { title = Just "Stream generation"
+ , classifyBenchmark = \b ->
+ if "serially/generation" `isPrefixOf` b
+ then Just ("Streamly", last $ splitOn "/" b)
+ else Nothing
+ }
+
+ ignoringErr $ graph inputFile "composition" $ cfg
+ { title = Just "Streamly composition performance"
+ , classifyBenchmark = fmap ("Streamly",) . stripPrefix "serially/compose/"
+ }
+
+ ignoringErr $ graph inputFile "composition-scaling"
+ $ cfg
+ { title = Just "Streamly composition scaling"
+ , classifyBenchmark = fmap ("Streamly",) . stripPrefix "serially/compose-"
+ }
+
+------------------------------------------------------------------------------
+-- Nested composition charts
+------------------------------------------------------------------------------
+
+makeNestedGraphs :: Config -> String -> IO ()
+makeNestedGraphs cfg inputFile =
+ ignoringErr $ graph inputFile "nested-serial-diff" $ cfg
+ { title = Just "Nested serial"
+ , classifyBenchmark = \b ->
+ let ls = splitOn "/" b
+ in case head ls of
+ "serially" -> Just (head ls, last ls)
+ _ -> Nothing
+ }
+
+------------------------------------------------------------------------------
+-- Charts for parallel streams
+------------------------------------------------------------------------------
+
+makeLinearAsyncGraphs :: Config -> String -> IO ()
+makeLinearAsyncGraphs cfg inputFile = do
+ putStrLn "Not implemented"
+ return ()
+
+makeLinearRateGraphs :: Config -> String -> IO ()
+makeLinearRateGraphs cfg inputFile = do
+ putStrLn "Not implemented"
+ return ()
+
+------------------------------------------------------------------------------
+-- Charts for base streams
+------------------------------------------------------------------------------
+
+makeBaseGraphs :: Config -> String -> IO ()
+makeBaseGraphs cfg inputFile = do
+ putStrLn "Not implemented"
+ return ()
+
+------------------------------------------------------------------------------
+-- text reports
+------------------------------------------------------------------------------
+
+benchShow Options{..} cfg func inp out =
+ if genGraphs
+ then func cfg {outputDir = Just out} inp
+ else
+ ignoringErr $ report inp Nothing $ cfg
+ { selectBenchmarks =
+ \f ->
+ reverse
+ $ fmap fst
+ $ either
+ (const $ either error id $ f $ ColumnIndex 0)
+ (sortOn snd)
+ $ f $ ColumnIndex 1
+ }
+
+main :: IO ()
+main = do
+ let cfg = defaultConfig { presentation = Groups PercentDiff }
+ res <- parseOptions
+
+ case res of
+ Nothing -> do
+ putStrLn "cannot parse options"
+ return ()
+ Just opts@Options{..} ->
+ case benchType of
+ Linear -> benchShow opts cfg makeLinearGraphs
+ "charts/linear/results.csv"
+ "charts/linear"
+ LinearAsync -> benchShow opts cfg makeLinearAsyncGraphs
+ "charts/linear-async/results.csv"
+ "charts/linear-async"
+ LinearRate -> benchShow opts cfg makeLinearRateGraphs
+ "charts/linear-rate/results.csv"
+ "charts/linear-rate"
+ Nested -> benchShow opts cfg makeNestedGraphs
+ "charts/nested/results.csv"
+ "charts/nested"
+ Base -> benchShow opts cfg makeBaseGraphs
+ "charts/base/results.csv"
+ "charts/base"
diff --git a/benchmark/ChartLinear.hs b/benchmark/ChartLinear.hs
deleted file mode 100644
index 15676e2..0000000
--- a/benchmark/ChartLinear.hs
+++ /dev/null
@@ -1,54 +0,0 @@
-{-# LANGUAGE TupleSections #-}
-{-# LANGUAGE ScopedTypeVariables #-}
-
-module Main where
-
-import Data.List
-import Data.List.Split
-import BenchGraph (bgraph, defaultConfig, Config(..), ComparisonStyle(..))
-import Control.Exception (handle, catch, SomeException, ErrorCall(..))
-
-main :: IO ()
-main = do
- let cfg = defaultConfig
- { outputDir = "charts"
- , comparisonStyle = CompareDelta
- }
-
- ignoringErr a = catch a (\(ErrorCall err :: ErrorCall) ->
- putStrLn $ "Failed with error:\n" ++ err ++ "\nSkipping.")
- -- bgraph <input> <output> <field in csv file to be plotted>
- -- other interesting fields to plot are:
- -- allocated
- -- bytesCopied
- -- mutatorCpuSeconds
- -- gcCpuSeconds
- ignoringErr $ bgraph "charts/results.csv" "operations" "time" $ cfg
- { chartTitle = Just "Streamly operations (time)"
- , classifyBenchmark = \b ->
- if (not $ "serially/" `isPrefixOf` b)
- || "/generation" `isInfixOf` b
- || "/compose" `isInfixOf` b
- || "/concat" `isSuffixOf` b
- then Nothing
- else Just ("Streamly", last $ splitOn "/" b)
- }
-
- ignoringErr $ bgraph "charts/results.csv" "generation" "time" $ cfg
- { chartTitle = Just "Stream generation (time)"
- , classifyBenchmark = \b ->
- if "serially/generation" `isPrefixOf` b
- then Just ("Streamly", last $ splitOn "/" b)
- else Nothing
- }
-
- ignoringErr $ bgraph "charts/results.csv" "composition" "time" $ cfg
- { chartTitle = Just "Streamly composition performance (time)"
- , classifyBenchmark = fmap ("Streamly",) . stripPrefix "serially/compose/"
- }
-
- ignoringErr $ bgraph "charts/results.csv" "composition-scaling" "time"
- $ cfg
- { chartTitle = Just "Streamly composition scaling (time)"
- , classifyBenchmark = fmap ("Streamly",) . stripPrefix "serially/compose-"
- }
diff --git a/benchmark/ChartNested.hs b/benchmark/ChartNested.hs
deleted file mode 100644
index e1554e8..0000000
--- a/benchmark/ChartNested.hs
+++ /dev/null
@@ -1,46 +0,0 @@
-{-# LANGUAGE TupleSections #-}
-{-# LANGUAGE ScopedTypeVariables #-}
-
-module Main where
-
-import Data.List
-import Data.List.Split
-import BenchGraph (bgraph, defaultConfig, Config(..), ComparisonStyle(..))
-import Control.Exception (handle, catch, SomeException, ErrorCall)
-
-main :: IO ()
-main = do
- let cfg = defaultConfig
- { outputDir = "charts"
- , comparisonStyle = CompareFull
- }
-
- ignoringErr a = catch a (\(_ :: ErrorCall) ->
- putStrLn "Failed. Skipping.")
- -- bgraph <input> <output> <field in csv file to be plotted>
- -- other interesting fields to plot are:
- -- allocated
- -- bytesCopied
- -- mutatorCpuSeconds
- -- gcCpuSeconds
- ignoringErr $ bgraph "charts/results.csv" "nested-ops" "time" $ cfg
- { chartTitle = Just "Nested operations (time)"
- , classifyBenchmark = \b ->
- let ls = splitOn "/" b
- in case head ls of
- "linear" -> Nothing
- _ -> Just (head ls, last ls)
- , sortBenchmarks = nub
- , comparisonStyle = CompareFull
- }
-
- ignoringErr $ bgraph "charts/results.csv" "nested-serial-comparative" "time" $ cfg
- { chartTitle = Just "Nested serial diff (time)"
- , classifyBenchmark = \b ->
- let ls = splitOn "/" b
- in case head ls of
- "serially" -> Just (head ls, last ls)
- _ -> Nothing
- , sortBenchmarks = nub
- , comparisonStyle = CompareDelta
- }
diff --git a/benchmark/Linear.hs b/benchmark/Linear.hs
index ddcb0b0..c982327 100644
--- a/benchmark/Linear.hs
+++ b/benchmark/Linear.hs
@@ -19,7 +19,7 @@ import Gauge
-- | Takes a fold method, and uses it with a default source.
{-# INLINE benchIO #-}
benchIO :: (IsStream t, NFData b) => String -> (t IO Int -> IO b) -> Benchmark
-benchIO name f = bench name $ nfIO $ randomRIO (1,1000) >>= f . Ops.source
+benchIO name f = bench name $ nfIO $ randomRIO (1,1) >>= f . Ops.source
-- | Takes a source, and uses it with a default drain/fold method.
{-# INLINE benchSrcIO #-}
@@ -29,7 +29,7 @@ benchSrcIO
-> (Int -> t IO Int)
-> Benchmark
benchSrcIO t name f
- = bench name $ nfIO $ randomRIO (1,1000) >>= Ops.toNull t . f
+ = bench name $ nfIO $ randomRIO (1,1) >>= Ops.toNull t . f
{-
_benchId :: NFData b => String -> (Ops.Stream m Int -> Identity b) -> Benchmark
@@ -37,12 +37,12 @@ _benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
-}
main :: IO ()
-main = do
+main =
defaultMain
[ bgroup "serially"
[ bgroup "generation"
[ -- Most basic, barely stream continuations running
- benchSrcIO serially "unfoldr" $ Ops.sourceUnfoldr
+ benchSrcIO serially "unfoldr" Ops.sourceUnfoldr
, benchSrcIO serially "unfoldrM" Ops.sourceUnfoldrM
, benchSrcIO serially "fromList" Ops.sourceFromList
, benchSrcIO serially "fromListM" Ops.sourceFromListM
@@ -91,10 +91,10 @@ main = do
, benchIO "mapMaybe" Ops.mapMaybe
, benchIO "mapMaybeM" Ops.mapMaybeM
, bench "sequence" $ nfIO $ randomRIO (1,1000) >>= \n ->
- (Ops.sequence serially) (Ops.sourceUnfoldrMAction n)
+ Ops.sequence serially (Ops.sourceUnfoldrMAction n)
, benchIO "findIndices" Ops.findIndices
, benchIO "elemIndices" Ops.elemIndices
- , benchIO "concat" Ops.concat
+ -- , benchIO "concat" Ops.concat
]
, bgroup "filtering"
[ benchIO "filter-even" Ops.filterEven
@@ -107,8 +107,8 @@ main = do
, benchIO "dropWhile-true" Ops.dropWhileTrue
, benchIO "dropWhileM-true" Ops.dropWhileMTrue
]
- , benchIO "zip" $ Ops.zip
- , benchIO "zipM" $ Ops.zipM
+ , benchIO "zip" Ops.zip
+ , benchIO "zipM" Ops.zipM
, bgroup "compose"
[ benchIO "mapM" Ops.composeMapM
, benchIO "map-with-all-in-filter" Ops.composeMapAllInFilter
@@ -123,77 +123,4 @@ main = do
, benchIO "4" $ Ops.composeScaling 4
]
]
- , bgroup "asyncly"
- [ -- benchIO "unfoldr" $ Ops.toNull asyncly
- benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
- -- , benchSrcIO asyncly "fromFoldable" Ops.sourceFromFoldable
- , benchSrcIO asyncly "fromFoldableM" Ops.sourceFromFoldableM
- -- , benchSrcIO asyncly "foldMapWith" Ops.sourceFoldMapWith
- , benchSrcIO asyncly "foldMapWithM" Ops.sourceFoldMapWithM
- , benchIO "mapM" $ Ops.mapM asyncly
- , benchSrcIO asyncly "unfoldrM maxThreads 1"
- (maxThreads 1 . Ops.sourceUnfoldrM)
- , benchSrcIO asyncly "unfoldrM maxBuffer 1 (1000 ops)"
- (maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
- ]
- , bgroup "asyncly/rate"
- [ -- benchIO "unfoldr" $ Ops.toNull asyncly
- benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
- , benchSrcIO asyncly "unfoldrM/Nothing"
- (rate Nothing . Ops.sourceUnfoldrM)
- , benchSrcIO asyncly "unfoldrM/AvgRate/1,000,000"
- (avgRate 1000000 . Ops.sourceUnfoldrM)
- , benchSrcIO asyncly "unfoldrM/AvgRate/3,000,000"
- (avgRate 3000000 . Ops.sourceUnfoldrM)
- , benchSrcIO asyncly "unfoldrM/AvgRate/10,000,000/maxThreads1"
- (maxThreads 1 . avgRate 10000000 . Ops.sourceUnfoldrM)
- -- XXX arbitrarily large rate should be the same as rate Nothing
- , benchSrcIO asyncly "unfoldrM/AvgRate/10,000,000"
- (avgRate 10000000 . Ops.sourceUnfoldrM)
- , benchSrcIO asyncly "unfoldrM/AvgRate/20,000,000"
- (avgRate 20000000 . Ops.sourceUnfoldrM)
- ]
- , bgroup "wAsyncly"
- [ -- benchIO "unfoldr" $ Ops.toNull wAsyncly
- benchSrcIO wAsyncly "unfoldrM" Ops.sourceUnfoldrM
- -- , benchSrcIO wAsyncly "fromFoldable" Ops.sourceFromFoldable
- , benchSrcIO wAsyncly "fromFoldableM" Ops.sourceFromFoldableM
- -- , benchSrcIO wAsyncly "foldMapWith" Ops.sourceFoldMapWith
- , benchSrcIO wAsyncly "foldMapWithM" Ops.sourceFoldMapWithM
- , benchIO "mapM" $ Ops.mapM wAsyncly
- ]
- -- unfoldr and fromFoldable are always serial and thereofore the same for
- -- all stream types.
- , bgroup "aheadly"
- [ -- benchIO "unfoldr" $ Ops.toNull aheadly
- benchSrcIO aheadly "unfoldrM" Ops.sourceUnfoldrM
- , benchSrcIO aheadly "fromFoldableM" Ops.sourceFromFoldableM
- -- , benchSrcIO aheadly "foldMapWith" Ops.sourceFoldMapWith
- , benchSrcIO aheadly "foldMapWithM" Ops.sourceFoldMapWithM
- , benchIO "mapM" $ Ops.mapM aheadly
- , benchSrcIO aheadly "unfoldrM maxThreads 1"
- (maxThreads 1 . Ops.sourceUnfoldrM)
- , benchSrcIO aheadly "unfoldrM maxBuffer 1 (1000 ops)"
- (maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
- -- , benchSrcIO aheadly "fromFoldable" Ops.sourceFromFoldable
- ]
- , bgroup "aheadly/rate"
- [
- -- XXX arbitrarily large maxRate should be the same as maxRate -1
- benchSrcIO aheadly "unfoldrM rate AvgRate 1000000"
- (avgRate 1000000 . Ops.sourceUnfoldrM)
- ]
- -- XXX need to use smaller streams to finish in reasonable time
- , bgroup "parallely"
- [ --benchIO "unfoldr" $ Ops.toNull parallely
- benchSrcIO parallely "unfoldrM" Ops.sourceUnfoldrM
- --, benchSrcIO parallely "fromFoldable" Ops.sourceFromFoldable
- , benchSrcIO parallely "fromFoldableM" Ops.sourceFromFoldableM
- -- , benchSrcIO parallely "foldMapWith" Ops.sourceFoldMapWith
- , benchSrcIO parallely "foldMapWithM" Ops.sourceFoldMapWithM
- , benchIO "mapM" $ Ops.mapM parallely
- -- Zip has only one parallel flavor
- , benchIO "zip" $ Ops.zipAsync
- , benchIO "zipM" $ Ops.zipAsyncM
- ]
]
diff --git a/benchmark/LinearAsync.hs b/benchmark/LinearAsync.hs
new file mode 100644
index 0000000..29a3f40
--- /dev/null
+++ b/benchmark/LinearAsync.hs
@@ -0,0 +1,92 @@
+-- |
+-- Module : Main
+-- Copyright : (c) 2018 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+
+import Control.DeepSeq (NFData)
+-- import Data.Functor.Identity (Identity, runIdentity)
+import System.Random (randomRIO)
+import qualified LinearOps as Ops
+
+import Streamly
+import Gauge
+
+-- We need a monadic bind here to make sure that the function f does not get
+-- completely optimized out by the compiler in some cases.
+--
+-- | Takes a fold method, and uses it with a default source.
+{-# INLINE benchIO #-}
+benchIO :: (IsStream t, NFData b) => String -> (t IO Int -> IO b) -> Benchmark
+benchIO name f = bench name $ nfIO $ randomRIO (1,1) >>= f . Ops.source
+
+-- | Takes a source, and uses it with a default drain/fold method.
+{-# INLINE benchSrcIO #-}
+benchSrcIO
+ :: (t IO Int -> SerialT IO Int)
+ -> String
+ -> (Int -> t IO Int)
+ -> Benchmark
+benchSrcIO t name f
+ = bench name $ nfIO $ randomRIO (1,1) >>= Ops.toNull t . f
+
+{-
+_benchId :: NFData b => String -> (Ops.Stream m Int -> Identity b) -> Benchmark
+_benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
+-}
+
+main :: IO ()
+main =
+ defaultMain
+ [ bgroup "asyncly"
+ [ -- benchIO "unfoldr" $ Ops.toNull asyncly
+ benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
+ -- , benchSrcIO asyncly "fromFoldable" Ops.sourceFromFoldable
+ , benchSrcIO asyncly "fromFoldableM" Ops.sourceFromFoldableM
+ -- , benchSrcIO asyncly "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO asyncly "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM asyncly
+ , benchSrcIO asyncly "unfoldrM maxThreads 1"
+ (maxThreads 1 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM maxBuffer 1 (1000 ops)"
+ (maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
+ ]
+ , bgroup "wAsyncly"
+ [ -- benchIO "unfoldr" $ Ops.toNull wAsyncly
+ benchSrcIO wAsyncly "unfoldrM" Ops.sourceUnfoldrM
+ -- , benchSrcIO wAsyncly "fromFoldable" Ops.sourceFromFoldable
+ , benchSrcIO wAsyncly "fromFoldableM" Ops.sourceFromFoldableM
+ -- , benchSrcIO wAsyncly "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO wAsyncly "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM wAsyncly
+ ]
+ -- unfoldr and fromFoldable are always serial and thereofore the same for
+ -- all stream types.
+ , bgroup "aheadly"
+ [ -- benchIO "unfoldr" $ Ops.toNull aheadly
+ benchSrcIO aheadly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO aheadly "fromFoldableM" Ops.sourceFromFoldableM
+ -- , benchSrcIO aheadly "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO aheadly "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM aheadly
+ , benchSrcIO aheadly "unfoldrM maxThreads 1"
+ (maxThreads 1 . Ops.sourceUnfoldrM)
+ , benchSrcIO aheadly "unfoldrM maxBuffer 1 (1000 ops)"
+ (maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
+ -- , benchSrcIO aheadly "fromFoldable" Ops.sourceFromFoldable
+ ]
+ -- XXX need to use smaller streams to finish in reasonable time
+ , bgroup "parallely"
+ [ --benchIO "unfoldr" $ Ops.toNull parallely
+ benchSrcIO parallely "unfoldrM" Ops.sourceUnfoldrM
+ --, benchSrcIO parallely "fromFoldable" Ops.sourceFromFoldable
+ , benchSrcIO parallely "fromFoldableM" Ops.sourceFromFoldableM
+ -- , benchSrcIO parallely "foldMapWith" Ops.sourceFoldMapWith
+ , benchSrcIO parallely "foldMapWithM" Ops.sourceFoldMapWithM
+ , benchIO "mapM" $ Ops.mapM parallely
+ -- Zip has only one parallel flavor
+ , benchIO "zip" Ops.zipAsync
+ , benchIO "zipM" Ops.zipAsyncM
+ ]
+ ]
diff --git a/benchmark/LinearOps.hs b/benchmark/LinearOps.hs
index 98bf624..a0f63b9 100644
--- a/benchmark/LinearOps.hs
+++ b/benchmark/LinearOps.hs
@@ -5,21 +5,27 @@
-- License : MIT
-- Maintainer : harendra.kumar@gmail.com
+{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
module LinearOps where
+import Control.Monad (when)
import Data.Maybe (fromJust)
import Prelude
(Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=), (==), (<=),
- subtract, undefined, Maybe(..), odd, Bool, not)
+ subtract, undefined, Maybe(..), odd, Bool, not, (>>=), mapM_, curry)
import qualified Streamly as S
import qualified Streamly.Prelude as S
value, maxValue :: Int
+#ifdef LINEAR_ASYNC
+value = 10000
+#else
value = 100000
-maxValue = value + 1000
+#endif
+maxValue = value + 1
-------------------------------------------------------------------------------
-- Benchmark ops
@@ -69,7 +75,7 @@ sourceUnfoldr n = S.unfoldr step n
step cnt =
if cnt > n + value
then Nothing
- else (Just (cnt, cnt + 1))
+ else Just (cnt, cnt + 1)
{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> t m Int
@@ -149,32 +155,19 @@ uncons s = do
{-# INLINE init #-}
init :: Monad m => Stream m a -> m ()
-init s = do
- r <- S.init s
- case r of
- Nothing -> return ()
- Just x -> S.runStream x
+init s = S.init s >>= Prelude.mapM_ S.runStream
{-# INLINE tail #-}
tail :: Monad m => Stream m a -> m ()
-tail s = do
- r <- S.tail s
- case r of
- Nothing -> return ()
- Just x -> tail x
+tail s = S.tail s >>= Prelude.mapM_ tail
{-# INLINE nullHeadTail #-}
nullHeadTail :: Monad m => Stream m Int -> m ()
nullHeadTail s = do
r <- S.null s
- if not r
- then do
+ when (not r) $ do
_ <- S.head s
- t <- S.tail s
- case t of
- Nothing -> return ()
- Just x -> nullHeadTail x
- else return ()
+ S.tail s >>= Prelude.mapM_ nullHeadTail
mapM_ = S.mapM_ (\_ -> return ())
toList = S.toList
@@ -249,7 +242,7 @@ mapM t = transform . t . S.mapM return
mapMaybe = transform . S.mapMaybe
(\x -> if Prelude.odd x then Nothing else Just ())
mapMaybeM = transform . S.mapMaybeM
- (\x -> if Prelude.odd x then (return Nothing) else return $ Just ())
+ (\x -> if Prelude.odd x then return Nothing else return $ Just ())
sequence t = transform . t . S.sequence
filterEven = transform . S.filter even
filterAllOut = transform . S.filter (> maxValue)
@@ -280,19 +273,19 @@ zipAsync, zipAsyncM :: S.MonadAsync m => Stream m Int -> m ()
zip src = do
r <- S.tail src
let src1 = fromJust r
- transform $ (S.zipWith (,) src src1)
+ transform (S.zipWith (,) src src1)
zipM src = do
r <- S.tail src
let src1 = fromJust r
- transform $ (S.zipWithM (\a b -> return (a,b)) src src1)
+ transform (S.zipWithM (curry return) src src1)
zipAsync src = do
r <- S.tail src
let src1 = fromJust r
- transform $ (S.zipAsyncWith (,) src src1)
+ transform (S.zipAsyncWith (,) src src1)
zipAsyncM src = do
r <- S.tail src
let src1 = fromJust r
- transform $ (S.zipAsyncWithM (\a b -> return (a,b)) src src1)
+ transform (S.zipAsyncWithM (curry return) src src1)
concat _n = return ()
-------------------------------------------------------------------------------
diff --git a/benchmark/LinearRate.hs b/benchmark/LinearRate.hs
new file mode 100644
index 0000000..6e64011
--- /dev/null
+++ b/benchmark/LinearRate.hs
@@ -0,0 +1,60 @@
+-- |
+-- Module : Main
+-- Copyright : (c) 2018 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+
+-- Rate benchmarks are kept separate because they need more running time to
+-- provide stable results.
+
+-- import Data.Functor.Identity (Identity, runIdentity)
+import System.Random (randomRIO)
+import qualified LinearOps as Ops
+
+import Streamly
+import Gauge
+
+-- | Takes a source, and uses it with a default drain/fold method.
+{-# INLINE benchSrcIO #-}
+benchSrcIO
+ :: (t IO Int -> SerialT IO Int)
+ -> String
+ -> (Int -> t IO Int)
+ -> Benchmark
+benchSrcIO t name f
+ = bench name $ nfIO $ randomRIO (1,1) >>= Ops.toNull t . f
+
+{-
+_benchId :: NFData b => String -> (Ops.Stream m Int -> Identity b) -> Benchmark
+_benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
+-}
+
+main :: IO ()
+main =
+ defaultMain
+ -- XXX arbitrarily large rate should be the same as rate Nothing
+ [ bgroup "avgrate"
+ [ bgroup "asyncly"
+ [ -- benchIO "unfoldr" $ Ops.toNull asyncly
+ benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
+ , benchSrcIO asyncly "unfoldrM/Nothing"
+ (rate Nothing . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/1,000,000"
+ (avgRate 1000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/3,000,000"
+ (avgRate 3000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/10,000,000/maxThreads1"
+ (maxThreads 1 . avgRate 10000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/10,000,000"
+ (avgRate 10000000 . Ops.sourceUnfoldrM)
+ , benchSrcIO asyncly "unfoldrM/20,000,000"
+ (avgRate 20000000 . Ops.sourceUnfoldrM)
+ ]
+ , bgroup "aheadly"
+ [
+ benchSrcIO aheadly "unfoldrM/1,000,000"
+ (avgRate 1000000 . Ops.sourceUnfoldrM)
+ ]
+ ]
+ ]
diff --git a/benchmark/Nested.hs b/benchmark/Nested.hs
index f627af7..38b24ec 100644
--- a/benchmark/Nested.hs
+++ b/benchmark/Nested.hs
@@ -19,13 +19,13 @@ _benchId :: (NFData b) => String -> (Int -> Identity b) -> Benchmark
_benchId name f = bench name $ nf (\g -> runIdentity (g 1)) f
main :: IO ()
-main = do
+main =
-- TBD Study scaling with 10, 100, 1000 loop iterations
defaultMain
[ bgroup "serially"
[ benchIO "toNull" $ Ops.toNull serially
, benchIO "toList" $ Ops.toList serially
- , benchIO "toListSome" $ Ops.toListSome serially
+ -- , benchIO "toListSome" $ Ops.toListSome serially
, benchIO "filterAllOut" $ Ops.filterAllOut serially
, benchIO "filterAllIn" $ Ops.filterAllIn serially
, benchIO "filterSome" $ Ops.filterSome serially
@@ -35,7 +35,7 @@ main = do
, bgroup "wSerially"
[ benchIO "toNull" $ Ops.toNull wSerially
, benchIO "toList" $ Ops.toList wSerially
- , benchIO "toListSome" $ Ops.toListSome wSerially
+ -- , benchIO "toListSome" $ Ops.toListSome wSerially
, benchIO "filterAllOut" $ Ops.filterAllOut wSerially
, benchIO "filterAllIn" $ Ops.filterAllIn wSerially
, benchIO "filterSome" $ Ops.filterSome wSerially
@@ -45,10 +45,9 @@ main = do
, bgroup "aheadly"
[ benchIO "toNull" $ Ops.toNull aheadly
, benchIO "toList" $ Ops.toList aheadly
- , benchIO "toListSome" $ Ops.toListSome aheadly
+ -- , benchIO "toListSome" $ Ops.toListSome aheadly
, benchIO "filterAllOut" $ Ops.filterAllOut aheadly
, benchIO "filterAllIn" $ Ops.filterAllIn aheadly
- -- this hangs, need to investigate
, benchIO "filterSome" $ Ops.filterSome aheadly
, benchIO "breakAfterSome" $ Ops.breakAfterSome aheadly
]
@@ -56,7 +55,7 @@ main = do
, bgroup "asyncly"
[ benchIO "toNull" $ Ops.toNull asyncly
, benchIO "toList" $ Ops.toList asyncly
- , benchIO "toListSome" $ Ops.toListSome asyncly
+ -- , benchIO "toListSome" $ Ops.toListSome asyncly
, benchIO "filterAllOut" $ Ops.filterAllOut asyncly
, benchIO "filterAllIn" $ Ops.filterAllIn asyncly
, benchIO "filterSome" $ Ops.filterSome asyncly
@@ -66,7 +65,7 @@ main = do
, bgroup "wAsyncly"
[ benchIO "toNull" $ Ops.toNull wAsyncly
, benchIO "toList" $ Ops.toList wAsyncly
- , benchIO "toListSome" $ Ops.toListSome wAsyncly
+ -- , benchIO "toListSome" $ Ops.toListSome wAsyncly
, benchIO "filterAllOut" $ Ops.filterAllOut wAsyncly
, benchIO "filterAllIn" $ Ops.filterAllIn wAsyncly
, benchIO "filterSome" $ Ops.filterSome wAsyncly
@@ -76,7 +75,7 @@ main = do
, bgroup "parallely"
[ benchIO "toNull" $ Ops.toNull parallely
, benchIO "toList" $ Ops.toList parallely
- , benchIO "toListSome" $ Ops.toListSome parallely
+ --, benchIO "toListSome" $ Ops.toListSome parallely
, benchIO "filterAllOut" $ Ops.filterAllOut parallely
, benchIO "filterAllIn" $ Ops.filterAllIn parallely
, benchIO "filterSome" $ Ops.filterSome parallely
diff --git a/benchmark/NestedOps.hs b/benchmark/NestedOps.hs
index b2fb387..f41aa56 100644
--- a/benchmark/NestedOps.hs
+++ b/benchmark/NestedOps.hs
@@ -20,7 +20,7 @@ sumCount :: Int
sumCount = 1000000
prodCount :: Int
-prodCount = 1000
+prodCount = 100
-------------------------------------------------------------------------------
-- Stream generation and elimination
@@ -48,7 +48,7 @@ sourceUnfoldr start n = S.unfoldr step start
step cnt =
if cnt > start + n
then Nothing
- else (Just (cnt, cnt + 1))
+ else Just (cnt, cnt + 1)
{-# INLINE runStream #-}
runStream :: Monad m => Stream m a -> m ()
@@ -98,7 +98,7 @@ filterAllOut t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
let s = x + y
- if (s < 0)
+ if s < 0
then return s
else S.nil
@@ -110,7 +110,7 @@ filterAllIn t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
let s = x + y
- if (s > 0)
+ if s > 0
then return s
else S.nil
@@ -122,7 +122,7 @@ filterSome t start = runStream . t $ do
x <- source start prodCount
y <- source start prodCount
let s = x + y
- if (s > 1100000)
+ if s > 1100000
then return s
else S.nil
@@ -135,7 +135,7 @@ breakAfterSome t start = do
x <- source start prodCount
y <- source start prodCount
let s = x + y
- if (s > 1100000)
+ if s > 1100000
then error "break"
else return s
return ()
diff --git a/benchmark/StreamDOps.hs b/benchmark/StreamDOps.hs
index 0486931..4b028f3 100644
--- a/benchmark/StreamDOps.hs
+++ b/benchmark/StreamDOps.hs
@@ -9,9 +9,10 @@
module StreamDOps where
+import Control.Monad (when)
import Prelude
(Monad, Int, (+), ($), (.), return, (>), even, (<=),
- subtract, undefined, Maybe(..), not)
+ subtract, undefined, Maybe(..), not, mapM_, (>>=))
import qualified Streamly.Streams.StreamD as S
@@ -76,7 +77,7 @@ sourceUnfoldr n = S.unfoldr step n
step cnt =
if cnt > n + value
then Nothing
- else (Just (cnt, cnt + 1))
+ else Just (cnt, cnt + 1)
{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: Monad m => Int -> Stream m Int
@@ -97,7 +98,7 @@ sourceFromList n = S.fromList [n..n+value]
{-# INLINE source #-}
source :: Monad m => Int -> Stream m Int
-source n = sourceUnfoldrM n
+source = sourceUnfoldrM
-------------------------------------------------------------------------------
-- Elimination
@@ -115,14 +116,9 @@ uncons s = do
Just (_, t) -> uncons t
nullHeadTail s = do
r <- S.null s
- if not r
- then do
+ when (not r) $ do
_ <- S.head s
- t <- S.tail s
- case t of
- Nothing -> return ()
- Just x -> nullHeadTail x
- else return ()
+ S.tail s >>= mapM_ nullHeadTail
toList = S.toList
foldl = S.foldl' (+) 0
last = S.last
@@ -151,7 +147,7 @@ dropWhileTrue = transform . S.dropWhile (<= maxValue)
-- Zipping and concat
-------------------------------------------------------------------------------
-zip src = transform $ (S.zipWith (,) src src)
+zip src = transform $ S.zipWith (,) src src
-- concat _n = return ()
-------------------------------------------------------------------------------
diff --git a/benchmark/StreamKOps.hs b/benchmark/StreamKOps.hs
index ddb1b5a..534198e 100644
--- a/benchmark/StreamKOps.hs
+++ b/benchmark/StreamKOps.hs
@@ -9,9 +9,10 @@
module StreamKOps where
+import Control.Monad (when)
import Prelude
(Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=),
- subtract, undefined, Maybe(..), not)
+ subtract, undefined, Maybe(..), not, mapM_, (>>=))
import qualified Streamly.Streams.StreamK as S
import qualified Streamly.Streams.Prelude as S
@@ -76,7 +77,7 @@ sourceUnfoldr n = S.unfoldr step n
step cnt =
if cnt > n + value
then Nothing
- else (Just (cnt, cnt + 1))
+ else Just (cnt, cnt + 1)
{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: S.MonadAsync m => Int -> Stream m Int
@@ -105,15 +106,15 @@ sourceFromFoldableM n = S.fromFoldableM (Prelude.fmap return [n..n+value])
{-# INLINE sourceFoldMapWith #-}
sourceFoldMapWith :: Int -> Stream m Int
-sourceFoldMapWith n = S.foldMapWith (S.serial) S.yield [n..n+value]
+sourceFoldMapWith n = S.foldMapWith S.serial S.yield [n..n+value]
{-# INLINE sourceFoldMapWithM #-}
sourceFoldMapWithM :: Monad m => Int -> Stream m Int
-sourceFoldMapWithM n = S.foldMapWith (S.serial) (S.yieldM . return) [n..n+value]
+sourceFoldMapWithM n = S.foldMapWith S.serial (S.yieldM . return) [n..n+value]
{-# INLINE source #-}
source :: S.MonadAsync m => Int -> Stream m Int
-source n = sourceUnfoldrM n
+source = sourceUnfoldrM
-------------------------------------------------------------------------------
-- Elimination
@@ -133,31 +134,20 @@ uncons s = do
{-# INLINE init #-}
init :: (Monad m, S.IsStream t) => t m a -> m ()
init s = do
- r <- S.init s
- case r of
- Nothing -> return ()
- Just x -> S.runStream x
+ t <- S.init s
+ mapM_ S.runStream t
{-# INLINE tail #-}
tail :: (Monad m, S.IsStream t) => t m a -> m ()
-tail s = do
- r <- S.tail s
- case r of
- Nothing -> return ()
- Just x -> tail x
+tail s = S.tail s >>= mapM_ tail
-- | If the stream is not null get its head and tail and then do the same to
-- the tail.
nullHeadTail s = do
r <- S.null s
- if not r
- then do
+ when (not r) $ do
_ <- S.head s
- t <- S.tail s
- case t of
- Nothing -> return ()
- Just x -> nullHeadTail x
- else return ()
+ S.tail s >>= mapM_ nullHeadTail
toList = S.toList
foldl = S.foldl' (+) 0
@@ -187,7 +177,7 @@ dropWhileTrue = transform . S.dropWhile (<= maxValue)
-- Zipping and concat
-------------------------------------------------------------------------------
-zip src = transform $ (S.zipWith (,) src src)
+zip src = transform $ S.zipWith (,) src src
concat _n = return ()
-------------------------------------------------------------------------------
diff --git a/docs/streamly-vs-async.md b/docs/streamly-vs-async.md
new file mode 100644
index 0000000..1b35512
--- /dev/null
+++ b/docs/streamly-vs-async.md
@@ -0,0 +1,230 @@
+# Streamly
+
+Streamly is a library to make concurrent programming a joy. The venerable
+`async` package is the go to package for concurrent programming for most
+Haskellers. Streamly is a higher level library than `async` and provides a lot
+more power and functionality, using a simpler and concise expression of
+concurrency. At a high level, you should be able to express everything with
+streamly that you can with `async`, if you can't please raise an issue. If you
+are familiar with `async`, in this document we highlight how streamly can be
+used where you would use `async`.
+
+## `async/wait` vs Concurrent Streams
+
+Unlike `async`, streamly does not use a spawn and `wait` model. Streamly uses
+a more high level approach to concurrency and has no explicit notion of
+threads. In streamly, we compose multiple actions as a stream and then express
+whether you want to run the actions in the stream `serially` or `parallely`.
+There are many different ways in which you can run streams concurrently, see
+the reference documentation for details.
+
+Since there is no explicit notion of threads in streamly, there are no
+equivalents of `async`, `wait`, `cancel`, `poll` or `link` combinators from the
+`async` package.
+
+Since streamly is a monad transformer it can work with all monads and not just
+IO, you won't need adaptations like `lifted-async` to use it for a generic
+monad.
+
+## Using Streamly for Concurrency
+
+You can write all of your program in a streamly monad and use the full power of
+the library. Streamly can be used as a direct replacement of the IO monad with
+no loss of performance, and no change in code except using `liftIO` or `yieldM`
+to run any IO actions. Streamly IO monads (e.g. `SerialT IO`) are just a
+generalization of the IO monad with non-deterministic composition of streams
+added on top.
+
+However, if you would like to just run only some concurrent portions of your
+program using streamly, you can do that too. Just use `runStream` if you want
+to run the stream without collecting the outputs of the concurrent actions or
+use `toList` if you want to convert the output stream into a list. Other
+stream folding operations can also be used, see the docs for more details.
+
+## Features as Compared with `async`
+
+Use the following imports to run the snippets shown below:
+
+```haskell
+import Streamly
+import Streamly.Prelude ((|:))
+import qualified Streamly.Prelude as S
+import qualified Data.Text as Text
+import Control.Concurrent (threadDelay)
+```
+
+Let us simulate a URL fetch with a delay of `n` seconds using the following
+functions:
+
+```haskell
+getURL :: Int -> IO String
+getURL n = threadDelay (n * 1000000) >> return (show n)
+getURLString = getURL
+getURLText n = getURL n >>= return . Text.pack
+```
+
+### concurrently
+
+You can run any number of actions concurrently. For example, to fetch two URLs
+concurrently:
+
+```haskell
+ urls <- S.toList $ parallely $ getURL 2 |: getURL 1 |: S.nil
+```
+
+This would return the results in their arrival order i.e. first 1 and then 2.
+If you want to preserve the order of the results, use the lookahead style
+stream `aheadly` instead. In the following example both URLs are fetched
+concurrently, and even though URL 1 arrives before URL 2 the results will
+return 2 first and then 1.
+
+```haskell
+ urls <- S.toList $ aheadly $ getURL 2 |: getURL 1 |: S.nil
+```
+
+### concurrently_
+
+Use `runStream` instead of `toList` to run the actions but ignore the results:
+
+```haskell
+ runStream $ parallely $ getURL 1 |: getURL 2 |: S.nil
+```
+
+### Concurrent Applicative
+
+If the actions that you are executing result in different output types you can
+use applicative zip to collect the results or to directly apply them to a
+function:
+
+```haskell
+ tuples <- S.toList $ zipAsyncly $
+ (,) <$> S.yieldM (getURLString 1) <*> S.yieldM (getURLText 2)
+```
+
+### race
+
+There are two ways to achieve the race functionality, using `take` or using
+exceptions.
+
+#### `race` Using `take`
+
+We can run multiple actions concurrently and take the first result that
+arrives:
+
+```haskell
+ urls <- S.toList $ S.take 1 $ parallely $ getURL 1 |: getURL 2 |: S.nil
+```
+
+After the first result arrives, the rest of the actions are canceled
+automatically. In general, we can take first `n` results as they arrive:
+
+```haskell
+ urls <- S.toList $ S.take 2 $ parallely $ getURL 1 |: getURL 2 |: S.nil
+```
+
+#### `race` Using Exceptions
+
+When an exception occurs in a concurrent stream all the concurrently running
+actions are cacnceled on arrival of the exception. This can be used to
+implement the race functionality. Each action in the stream can use an
+exception to communicate the result. As soon as the first result arrives all
+other actions will be canceled, for example:
+
+```haskell
+ data Result = Result String deriving Show
+ instance Exception Result
+
+ main = do
+ url <- try $ runStream $ parallely $
+ (getURL 2 >>= throwM . Result)
+ |: (getURL 1 >>= throwM . Result)
+ |: S.nil
+ case url of
+ Left (e :: SomeException) -> print e
+ Right _ -> undefined
+```
+
+### mapConcurrently
+
+There are many ways to map concurrently on a container and collect the results:
+
+You can create a concurrent stream from a `Foldable` container of monadic
+actions:
+
+```haskell
+ urls <- S.toList $ aheadly $ S.fromFoldableM $ fmap getURL [1..3]
+```
+
+You can first convert a `Foldable` into a stream and then map an action on the
+stream concurrently:
+
+```haskell
+ urls <- S.toList $ aheadly $ S.mapM getURL $ foldMap return [1..3]
+```
+
+You can map a monadic action to a `Foldable` container to convert it into a
+stream and at the same time fold it:
+
+```haskell
+ urls <- S.toList $ aheadly $ foldMap (S.yieldM . getURL) [1..3]
+```
+
+### replicateConcurrently
+
+Streamly has not just the equivalent of `replicateConcurrently` which is
+`replicateM` but many more ways to generate concurrent streams, for example,
+`|:`, `unfoldrM`, `repeatM`, `iterateM`, `fromFoldableM` etc. See the
+[Streamly.Prelude](https://hackage.haskell.org/package/streamly/docs/Streamly-Prelude.html)
+module documentation for more details.
+
+```haskell
+ xs <- S.toList $ parallely $ S.replicateM 2 $ getURL 1
+```
+
+### Functor
+
+The stream resulting from concurrent actions can be mapped serially or
+concurrently.
+
+To map serially just use `fmap`:
+
+```haskell
+ xs <- S.toList $ parallely $ fmap (+1) $ return 1 |: return 2 |: S.nil
+```
+
+To map a monadic action concurrently on all elements of the stream use `mapM`:
+
+```haskell
+ xs <- S.toList $ parallely $ S.mapM (\x -> return (x + 1))
+ $ return 1 |: return 2 |: S.nil
+```
+
+### Semigroup
+
+The `Semigroup` instances of streamly merge multiple streams serially or
+concurrently.
+
+### Monad
+
+The `Monad` instances of streamly nest loops concurrently (concurrent
+non-determinism).
+
+### Performance
+
+Streamly has very little concurrency overhead (ranging from a few 100
+nanoseconds to a few microseconds on a 2.2 GHz Intel Core i7), you can even run
+very lightweight actions in parallel without worrying about the overhead of
+concurrency. See the performance benchmarks [comparing streamly with the `async`
+package in this repo](https://github.com/composewell/concurrency-benchmarks).
+
+## Further Reading
+
+There is much more that you can do with streamly. For example, you can use the
+`maxThreads` combinator to restrict the total number of concurrent threads or
+use the `maxBuffer` combinator to restrict the total number of bufferred
+results or you can use the `avgRate` combinator to control the rate at which
+the concurrent actions are executed.
+
+See the [haddock documentation on
+hackage](https://hackage.haskell.org/package/streamly) and [a comprehensive tutorial
+here](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html).
diff --git a/docs/transformers.md b/docs/transformers.md
new file mode 100644
index 0000000..c24e8c3
--- /dev/null
+++ b/docs/transformers.md
@@ -0,0 +1,32 @@
+## Using Monad Transformers
+
+Common monad transformers can be used with streamly serial streams, without any
+issues. `ReaderT` can be used with concurrent streams as well without any
+issues.
+
+The semantics of monads other than `ReaderT` with concurrent streams are
+not yet finalized and will change in future, therefore as of now they are not
+recommended to be used.
+
+## Ordering of Monad Transformers
+
+In most cases it is a good idea to keep streamly as the top level monad.
+
+## State Sharing
+### Serial Applications
+
+Read only global state can always be shared using the `Reader` monad.
+Read-write global state can be shared either using an `IORef` in the `Reader`
+monad or using the `State` monad.
+
+See `AcidRain.hs` example for a usage of `StateT` in the serially executing
+portion of the program.
+
+### Concurrent Applications
+
+The current recommended method for sharing modifiable global state across
+concurrent tasks is to put the shared state inside an `IORef` in a `Reader`
+monad or just share the `IORef` by passing it to the required functions. The
+`IORef` can be updated atomically using `atomicModifyIORef`.
+
+The `CirclingSquare.hs` example shares an `IORef` across parallel tasks.
diff --git a/examples/AcidRain.hs b/examples/AcidRain.hs
index 686bdd3..c2b59fa 100644
--- a/examples/AcidRain.hs
+++ b/examples/AcidRain.hs
@@ -7,9 +7,9 @@ import Streamly
import Streamly.Prelude as S
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(liftIO))
-import Control.Monad.State (MonadState, get, modify, runStateT)
+import Control.Monad.State (MonadState, get, modify, runStateT, put)
-data Event = Harm Int | Heal Int | Quit deriving (Show)
+data Event = Harm Int | Heal Int deriving (Show)
userAction :: MonadAsync m => SerialT m Event
userAction = S.repeatM $ liftIO askUser
@@ -18,7 +18,7 @@ userAction = S.repeatM $ liftIO askUser
command <- getLine
case command of
"potion" -> return (Heal 10)
- "quit" -> return Quit
+ "quit" -> fail "quit"
_ -> putStrLn "What?" >> askUser
acidRain :: MonadAsync m => SerialT m Event
@@ -30,11 +30,10 @@ game = do
case event of
Harm n -> modify $ \h -> h - n
Heal n -> modify $ \h -> h + n
- Quit -> fail "quit"
h <- get
when (h <= 0) $ fail "You die!"
- liftIO $ putStrLn $ "Health = " ++ show h
+ liftIO $ putStrLn $ "Health = " <> show h
main :: IO ()
main = do
diff --git a/examples/CirclingSquare.hs b/examples/CirclingSquare.hs
index eb9add9..6429f24 100644
--- a/examples/CirclingSquare.hs
+++ b/examples/CirclingSquare.hs
@@ -55,8 +55,7 @@ updateController :: IORef (Double, Double) -> IO ()
updateController ref = do
e <- pollEvent
case e of
- MouseMotion x y _ _ -> do
- writeIORef ref (fromIntegral x, fromIntegral y)
+ MouseMotion x y _ _ -> writeIORef ref (fromIntegral x, fromIntegral y)
_ -> return ()
------------------------------------------------------------------------------
@@ -67,7 +66,7 @@ updateDisplay :: IORef (Double, Double) -> IO ()
updateDisplay cref = do
time <- SDL.getTicks
(x, y) <- readIORef cref
- let t = (fromIntegral time) * speed / 1000
+ let t = fromIntegral time * speed / 1000
in display (x + cos t * radius, y + sin t * radius)
where
diff --git a/examples/ControlFlow.hs b/examples/ControlFlow.hs
new file mode 100644
index 0000000..5361aed
--- /dev/null
+++ b/examples/ControlFlow.hs
@@ -0,0 +1,309 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-------------------------------------------------------------------------------
+-- Combining control flow manipulating monad transformers (MaybeT, exceptT,
+-- ContT) with Streamly
+-------------------------------------------------------------------------------
+--
+-- Streamly streams are non-determinism (nested looping) monads. We can use a
+-- control flow monad on top or streamly on top depending on whether we want to
+-- superimpose control flow manipulation on top of non-deterministic
+-- composition or vice-versa.
+--
+-- This file provides an example where we enter a sequence of characters "x",
+-- and "y" on separate lines, on the command line. When any other sequence is
+-- entered the control flow short circuits at the first non-matching char and
+-- exits.
+
+import Control.Concurrent (threadDelay)
+import Control.Exception (catch, SomeException)
+import Control.Monad
+import Control.Monad.Catch (MonadThrow, throwM, Exception)
+import Control.Monad.IO.Class
+import Control.Monad.Trans.Class
+import Control.Monad.Trans.Maybe
+import Control.Monad.Trans.Except
+import Control.Monad.Trans.Cont
+import Streamly
+import Streamly.Prelude ((|:))
+import qualified Streamly.Prelude as S
+
+-------------------------------------------------------------------------------
+-- Using MaybeT below streamly
+-------------------------------------------------------------------------------
+--
+-- When streamly is on top MaybeT would terminate all iterations of
+-- non-determinism.
+--
+getSequenceMaybeBelow
+ :: ( IsStream t
+ , Monad m
+ , MonadTrans t
+ , Applicative (t (MaybeT m))
+ , MonadIO (t (MaybeT m))
+ )
+ => t (MaybeT m) ()
+getSequenceMaybeBelow = do
+ liftIO $ putStrLn "MaybeT below streamly: Enter one char per line: "
+
+ i <- S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ r1 <- liftIO getLine
+ when (r1 /= "x") $ lift mzero
+
+ r2 <- liftIO getLine
+ when (r2 /= "y") $ lift mzero
+
+mainMaybeBelow :: IO ()
+mainMaybeBelow = do
+ r <- runMaybeT (runStream getSequenceMaybeBelow)
+ case r of
+ Just _ -> putStrLn "Bingo"
+ Nothing -> putStrLn "Wrong"
+
+-------------------------------------------------------------------------------
+-- Using MaybeT above streamly
+-------------------------------------------------------------------------------
+--
+-- When MaybeT is on top a Nothing would terminate only the current iteration
+-- of non-determinism below.
+--
+-- Note that this is redundant configuration as the same behavior can be
+-- acheived with just streamly, using mzero.
+--
+getSequenceMaybeAbove :: (IsStream t, MonadIO (t m)) => MaybeT (t m) ()
+getSequenceMaybeAbove = do
+ liftIO $ putStrLn "MaybeT above streamly: Enter one char per line: "
+
+ i <- lift $ S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ r1 <- liftIO getLine
+ when (r1 /= "x") $ mzero
+
+ r2 <- liftIO getLine
+ when (r2 /= "y") $ mzero
+
+mainMaybeAbove :: (IsStream t, MonadIO (t m)) => MaybeT (t m) ()
+mainMaybeAbove = do
+ getSequenceMaybeAbove
+ liftIO $ putStrLn "Bingo"
+
+-------------------------------------------------------------------------------
+-- Using ExceptT below streamly
+-------------------------------------------------------------------------------
+--
+-- XXX need to have a specialized liftCatch to lift catchE
+--
+-- Note that throwE would terminate all iterations of non-determinism
+-- altogether.
+getSequenceEitherBelow
+ :: ( IsStream t
+ , MonadTrans t
+ , Monad m
+ , MonadIO (t m)
+ , MonadIO (t (ExceptT String m))
+ )
+ => t (ExceptT String m) ()
+getSequenceEitherBelow = do
+ liftIO $ putStrLn "ExceptT below streamly: Enter one char per line: "
+
+ i <- S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ r1 <- liftIO getLine
+ when (r1 /= "x") $ lift $ throwE $ "Expecting x got: " ++ r1
+
+ r2 <- liftIO getLine
+ when (r2 /= "y") $ lift $ throwE $ "Expecting y got: " ++ r2
+
+mainEitherBelow :: IO ()
+mainEitherBelow = do
+ -- XXX Cannot lift catchE
+ r <- runExceptT (runStream getSequenceEitherBelow)
+ case r of
+ Right _ -> liftIO $ putStrLn "Bingo"
+ Left s -> liftIO $ putStrLn s
+
+-------------------------------------------------------------------------------
+-- Using ExceptT below concurrent streamly
+-------------------------------------------------------------------------------
+--
+-- XXX does not work correctly yet
+--
+getSequenceEitherAsyncBelow
+ :: ( IsStream t
+ , MonadTrans t
+ , MonadIO m
+ , MonadAsync m
+ , MonadIO (t m)
+ , MonadIO (t (ExceptT String m))
+ , Semigroup (t (ExceptT [Char] m) Integer)
+ )
+ => t (ExceptT String m) ()
+getSequenceEitherAsyncBelow = do
+ liftIO $ putStrLn "ExceptT below concurrent streamly: "
+
+ i <- (liftIO (threadDelay 1000)
+ >> lift (throwE "First task")
+ >> return 1)
+ <> (lift (throwE "Second task") >> return 2)
+ <> S.yield (3 :: Integer)
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+mainEitherAsyncBelow :: IO ()
+mainEitherAsyncBelow = do
+ r <- runExceptT (runStream $ asyncly $ getSequenceEitherAsyncBelow)
+ case r of
+ Right _ -> liftIO $ putStrLn "Bingo"
+ Left s -> liftIO $ putStrLn s
+
+-------------------------------------------------------------------------------
+-- Using ExceptT above streamly
+-------------------------------------------------------------------------------
+--
+-- When ExceptT is on top, we can lift the non-determinism of stream from
+-- below.
+--
+-- Note that throwE would terminate/break only current iteration of
+-- non-determinism and not all of them altogether.
+--
+-- Here we can use catchE directly but will have to use monad-control to lift
+-- stream operations with stream arguments.
+getSequenceEitherAbove :: (IsStream t, Monad m, MonadIO (t m))
+ => ExceptT String (t m) ()
+getSequenceEitherAbove = do
+ liftIO $ putStrLn "ExceptT above streamly: Enter one char per line: "
+
+ i <- lift $ S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ r1 <- liftIO getLine
+ when (r1 /= "x") $ throwE $ "Expecting x got: " ++ r1
+
+ r2 <- liftIO getLine
+ when (r2 /= "y") $ throwE $ "Expecting y got: " ++ r2
+
+mainEitherAbove :: (IsStream t, Monad m, MonadIO (t m))
+ => ExceptT String (t m) ()
+mainEitherAbove = do
+ catchE (getSequenceEitherAbove >> liftIO (putStrLn "Bingo"))
+ (\e -> liftIO $ putStrLn e)
+
+-------------------------------------------------------------------------------
+-- Using MonadThrow to throw exceptions in streamly
+-------------------------------------------------------------------------------
+--
+data Unexpected = Unexpected String deriving Show
+
+instance Exception Unexpected
+
+-- Note that unlike when ExceptT is used on top, MonadThrow terminates all
+-- iterations of non-determinism rather then just the current iteration.
+--
+getSequenceMonadThrow :: (IsStream t, Monad m, MonadIO (t m), MonadThrow (t m))
+ => t m ()
+getSequenceMonadThrow = do
+ liftIO $ putStrLn "MonadThrow in streamly: Enter one char per line: "
+
+ i <- S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ r1 <- liftIO getLine
+ when (r1 /= "x") $ throwM $ Unexpected $ "Expecting x got: " ++ r1
+
+ r2 <- liftIO getLine
+ when (r2 /= "y") $ throwM $ Unexpected $ "Expecting y got: " ++ r2
+
+mainMonadThrow :: IO ()
+mainMonadThrow = do
+ catch (runStream getSequenceMonadThrow >> liftIO (putStrLn "Bingo"))
+ (\(e :: SomeException) -> liftIO $ putStrLn $ show e)
+
+-------------------------------------------------------------------------------
+-- Using ContT below streamly
+-------------------------------------------------------------------------------
+--
+-- CallCC is the goto/setjmp/longjmp equivalent
+-- Allows us to manipulate the control flow in arbitrary ways
+--
+-- XXX need to have a specialized liftCallCC to actually lift callCC
+--
+getSequenceContBelow
+ :: (IsStream t, MonadTrans t, MonadIO m, MonadIO (t (ContT r m)))
+ => t (ContT r m) (Either String ())
+getSequenceContBelow = do
+ liftIO $ putStrLn "ContT below streamly: Enter one char per line: "
+
+ i <- S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ r <- lift $ callCC $ \exit -> do
+ r1 <- liftIO getLine
+ _ <- if r1 /= "x"
+ then exit $ Left $ "Expecting x got: " ++ r1
+ else return $ Right ()
+
+ r2 <- liftIO getLine
+ if r2 /= "y"
+ then exit $ Left $ "Expecting y got: " ++ r2
+ else return $ Right ()
+ liftIO $ putStrLn $ "done iteration = " ++ show i
+ return r
+
+mainContBelow
+ :: (IsStream t, MonadIO m, MonadTrans t, MonadIO (t (ContT r m)))
+ => t (ContT r m) ()
+mainContBelow = do
+ r <- getSequenceContBelow
+ case r of
+ Right _ -> liftIO $ putStrLn "Bingo"
+ Left s -> liftIO $ putStrLn s
+
+-------------------------------------------------------------------------------
+-- Using ContT above streamly
+-------------------------------------------------------------------------------
+--
+getSequenceContAbove :: (IsStream t, Monad m, MonadIO (t m))
+ => ContT r (t m) (Either String ())
+getSequenceContAbove = do
+ liftIO $ putStrLn "ContT above streamly: Enter one char per line: "
+
+ i <- lift $ S.fromFoldable [1..2 :: Int]
+ liftIO $ putStrLn $ "iteration = " ++ show i
+
+ callCC $ \exit -> do
+ r1 <- liftIO getLine
+ _ <- if r1 /= "x"
+ then exit $ Left $ "Expecting x got: " ++ r1
+ else return $ Right ()
+
+ r2 <- liftIO getLine
+ if r2 /= "y"
+ then exit $ Left $ "Expecting y got: " ++ r2
+ else return $ Right ()
+
+mainContAbove :: (IsStream t, Monad m, MonadIO (t m)) => ContT r (t m) ()
+mainContAbove = do
+ r <- getSequenceContAbove
+ case r of
+ Right _ -> liftIO $ putStrLn "Bingo"
+ Left s -> liftIO $ putStrLn s
+
+-------------------------------------------------------------------------------
+-- Combining control flow manipulating monad transformers (MaybeT, exceptT,
+-- ContT) with Streamly
+-------------------------------------------------------------------------------
+
+main :: IO ()
+main = do
+ mainMaybeBelow
+ runStream $ runMaybeT mainMaybeAbove
+ runContT (runStream mainContBelow) return
+ runStream (runContT mainContAbove return)
+ mainEitherBelow
+ runStream (runExceptT mainEitherAbove)
+ mainMonadThrow
+ mainEitherAsyncBelow
diff --git a/examples/ListDir.hs b/examples/ListDir.hs
index fb535ea..ae4ff4b 100644
--- a/examples/ListDir.hs
+++ b/examples/ListDir.hs
@@ -1,7 +1,7 @@
import Control.Monad.IO.Class (liftIO)
import Path.IO (listDir, getCurrentDir)
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
-import Streamly (runStream, aheadly)
+import Streamly (runStream, aheadly, (<>))
-- | List the current directory recursively using concurrent processing
--
@@ -14,5 +14,5 @@ main = do
runStream . aheadly $ getCurrentDir >>= readdir
where readdir d = do
(ds, fs) <- listDir d
- liftIO $ mapM_ putStrLn $ map show fs ++ map show ds
+ liftIO $ mapM_ putStrLn $ fmap show fs <> fmap show ds
foldMap readdir ds
diff --git a/examples/MergeSort.hs b/examples/MergeSort.hs
index ae33ede..f0f78c3 100644
--- a/examples/MergeSort.hs
+++ b/examples/MergeSort.hs
@@ -30,11 +30,11 @@ merge a b = do
case b1 of
Nothing -> return x <> ma
Just (y, mb) ->
- if (y < x)
- then (return y) <> merge (return x <> ma) mb
- else (return x) <> merge ma (return y <> mb)
+ if y < x
+ then return y <> merge (return x <> ma) mb
+ else return x <> merge ma (return y <> mb)
main :: IO ()
main = do
xs <- A.toList $ mergeAsync getSorted getSorted
- putStrLn $ show $ length xs
+ print $ length xs
diff --git a/examples/SearchQuery.hs b/examples/SearchQuery.hs
index 0405149..05ead2d 100644
--- a/examples/SearchQuery.hs
+++ b/examples/SearchQuery.hs
@@ -21,7 +21,7 @@ main = do
where
get :: String -> IO ()
- get s = httpNoBody (parseRequest_ s) >> putStrLn (show s)
+ get s = httpNoBody (parseRequest_ s) >> print s
google, bing, duckduckgo :: IO ()
google = get "https://www.google.com/search?q=haskell"
diff --git a/src/Streamly/Internal.hs b/src/Streamly/Internal.hs
new file mode 100644
index 0000000..6f05dfd
--- /dev/null
+++ b/src/Streamly/Internal.hs
@@ -0,0 +1,19 @@
+-- |
+-- Module : Streamly.Internal
+-- Copyright : (c) 2018 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+--
+-- This module is only for internal use. There is no warranty for the routines
+-- in this module to work correctly, please use at your own risk. These
+-- routines are subject to change or be removed without notice.
+--
+module Streamly.Internal
+ ( inspectMode
+ )
+where
+
+import Streamly.Streams.SVar
diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs
index eeddc1d..4ddc00d 100644
--- a/src/Streamly/Prelude.hs
+++ b/src/Streamly/Prelude.hs
@@ -1,4 +1,3 @@
-{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
@@ -46,41 +45,70 @@
module Streamly.Prelude
(
-- * Construction
- -- | Primitives to construct a stream.
+ -- | All other stream construction and generation combinators described
+ -- later, and even more custom combinators can be expressed in terms of
+ -- these primitives. However, the special versions provided in this module
+ -- can be more efficient in some situations.
+
+ -- ** From Elements
+ -- | Primitives to construct a stream from pure values or monadic actions.
K.nil
, K.cons
, (K..:)
, consM
, (|:)
+ , yield
+ , yieldM
- -- * Deconstruction
- , uncons
+ -- ** From Streams
+ -- | You can construct streams by appending or merging existing streams.
+ -- When constructing streams from streams, '<>' and 'mempty' are the
+ -- intuitive equivalents of 'K.cons' and 'K.nil', respectively. These
+ -- primitives can be very useful when constructing your own custom stream
+ -- combinators. Also see the variants of '<>' defined in the "Streamly"
+ -- module. Note that appending streams is inexpensive, it is much more
+ -- efficient than appending lists.
-- * Generation
- -- ** Unfolds
+ -- ** Unfold and Iterate
+ -- | Note that the generative steps of unfold and iterate are inherently
+ -- serial as the next step depends on the result of the previous step.
+ -- However, consumption of the result from the previous step can happen in
+ -- parallel with the generation by the next step.
, unfoldr
, unfoldrM
+ , iterate
+ , iterateM
- -- ** Specialized Generation
- -- | Generate a monadic stream from a seed.
+ -- ** Replicate and Repeat
+ -- | Generate a monadic stream from a seed value or function. Note that
+ -- these functions can generate a stream fully concurrently as, unlike
+ -- unfolds, there is no dependency between steps, therefore, an unbounded
+ -- number of steps can run concurrently. All of these can be expressed in
+ -- terms of 'K.cons' and 'K.nil' primitives.
, replicateM
, K.repeat
, repeatM
- , iterate
- , iterateM
- -- ** Conversions
- -- | Transform an input structure into a stream.
- , yield
- , yieldM
+ -- ** Generate From
+ -- | Convert an input structure, container or source into a stream. All of
+ -- these can be expressed in terms of primitives.
, fromList
, fromListM
, K.fromFoldable
, fromFoldableM
, fromHandle
+ -- * Deconstruction
+ , uncons
+
-- * Elimination
+
-- ** General Folds
+ -- | All the folds can be implemented in terms of 'uncons', however the
+ -- specific implementations provided here are generally more efficient.
+ -- Folds are inherently serial as each step needs to use the result of
+ -- the previous step.
, foldr
, foldr1
, foldrM
@@ -91,6 +119,8 @@ module Streamly.Prelude
, foldxM
-- ** Specialized Folds
+ -- | These folds can be expressed in terms of the general fold routines but
+ -- the special versions here can be more efficient in many cases.
-- Filtering folds: extract parts of the stream
, head
@@ -118,27 +148,40 @@ module Streamly.Prelude
, sum
, product
- -- ** Map and Fold
- , mapM_
-
- -- ** Conversions
- -- | Transform a stream into an output structure of another type.
+ -- ** Fold To
+ -- | Convert or divert a stream into an output structure, container or
+ -- sink.
, toList
, toHandle
-- * Transformation
- -- ** Mapping
- , Serial.map
- , mapM
- , sequence
+ -- | One to one transformations, each element in the input stream is
+ -- transformed into a corresponding element in the output stream.
+ -- Therefore, the length of the stream and the ordering of elements in the
+ -- stream remains unchanged after the transformation.
-- ** Scanning
-- | Scan is a transformation by continuously folding the result with the
- -- next element of the stream.
+ -- next element of the stream. This is the generalized way to transform a
+ -- stream carrying state from previous transformation steps, other forms of
+ -- transformation like map can be expressed in terms of this.
, scanl'
, scanlM'
, scanx
+ -- ** Mapping
+ -- | Map is a special form of scan where no state is carried from one step
+ -- to the next.
+ , Serial.map
+ , mapM
+
+ -- ** Flattening
+ , sequence
+
+ -- * Filtering and Insertion
+ -- | Adding or removing elements from the stream thus changing the length
+ -- of the stream.
+
-- ** Filtering
, filter
, filterM
@@ -152,17 +195,21 @@ module Streamly.Prelude
-- ** Inserting
, intersperseM
- -- ** Reordering
+ -- * Reordering
, reverse
- -- ** Indices
- , findIndices
- , elemIndices
+ -- * Hybrid Operations
+ -- ** Map and Fold
+ , mapM_
-- ** Map and Filter
, mapMaybe
, mapMaybeM
+ -- ** Scan and filter
+ , findIndices
+ , elemIndices
+
-- * Zipping
, zipWith
, zipWithM
@@ -182,7 +229,7 @@ import Control.Monad.IO.Class (MonadIO(..))
import Data.Maybe (isJust, fromJust)
import Prelude
hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
- foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem,
+ foldl, mapM, mapM_, sequence, all, any, sum, product, elem,
notElem, maximum, minimum, head, last, tail, length, null,
reverse, iterate, init, and, or, lookup, foldr1)
import qualified Prelude
@@ -259,6 +306,15 @@ uncons m = K.uncons (K.adapt m)
-- [0,1,2,3]
-- @
--
+-- unfoldr can be expressed in terms of 'yield' and '<>' as follows:
+--
+-- @
+-- unfoldr step s =
+-- case step s of
+-- Nothing -> mempty
+-- Just (a, b) -> 'yield' a '<>' (unfoldr step b)
+-- @
+--
-- @since 0.1.0
{-# INLINE_EARLY unfoldr #-}
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
@@ -310,18 +366,19 @@ unfoldrMSerial step seed = fromStreamS (S.unfoldrM step seed)
-- Specialized Generation
------------------------------------------------------------------------------
--- Faster than yieldM because there is no bind. Usually we can construct a
--- stream from a pure value using "pure" in an applicative, however in case of
--- Zip streams pure creates an infinite stream.
+-- Faster than yieldM because there is no bind.
--
--- | Create a singleton stream from a pure value. In monadic streams, 'pure' or
--- 'return' can be used in place of 'yield', however, in Zip applicative
--- streams 'pure' is equivalent to 'repeat'.
+-- | Create a singleton stream from a pure value. Same as @a `cons` nil@ but
+-- slighly more efficient. Note that in monadic streams, 'yield' is the same
+-- as 'pure' or 'return', however, in Zip applicative streams it is not the
+-- same as 'pure' because in that case 'pure' is equivalent to 'repeat'
+-- instead. In all other stream types, 'yield' is the same as @yieldM . pure@
+-- but more efficient.
--
-- @since 0.4.0
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
-yield a = K.yield a
+yield = K.yield
-- | Create a singleton stream from a monadic action. Same as @m \`consM` nil@
-- but more efficient.
@@ -335,9 +392,10 @@ yield a = K.yield a
-- @since 0.4.0
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
-yieldM m = K.yieldM m
+yieldM = K.yieldM
--- | Generate a stream by performing a monadic action @n@ times.
+-- | Generate a stream by performing a monadic action @n@ times. Can be
+-- expressed as @stimes n (yieldM m)@.
--
--
-- @
@@ -353,7 +411,8 @@ replicateM n m = go n
where
go cnt = if cnt <= 0 then K.nil else m |: go (cnt - 1)
--- | Generate a stream by repeatedly executing a monadic action forever.
+-- | Generate a stream by repeatedly executing a monadic action forever. Can be
+-- expressed as @cycle1 . yieldM@.
--
-- @
-- runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
@@ -405,8 +464,9 @@ iterateM step = go
-- Conversions
------------------------------------------------------------------------------
--- | Construct a stream from a list containing pure values. This can be more
--- efficient than 'K.fromFoldable' for lists as it can fuse the list.
+-- | Construct a stream from a list containing pure values. More efficient list
+-- specific implementation of 'K.fromFoldable' as it works well with fusion
+-- optimization.
--
-- @since 0.4.0
{-# INLINE_EARLY fromList #-}
@@ -415,9 +475,9 @@ fromList = fromStreamS . S.fromList
{-# RULES "fromList fallback to StreamK" [1]
forall a. S.toStreamK (S.fromList a) = K.fromFoldable a #-}
--- | Construct a stream from a list containing monadic actions. This can be
--- more efficient than 'fromFoldableM' especially for serial streams as it can
--- fuse the list.
+-- | Construct a stream from a list containing monadic actions. More efficient
+-- list specific implementation of 'fromFoldableM' especially for serial
+-- streams as it works well with fusion optimization.
--
-- @since 0.4.0
{-# INLINE_EARLY fromListM #-}
@@ -426,7 +486,8 @@ fromListM = fromStreamD . D.fromListM
{-# RULES "fromListM fallback to StreamK" [1]
forall a. D.toStreamK (D.fromListM a) = fromFoldableM a #-}
--- | Construct a stream from a 'Foldable' containing monadic actions.
+-- | Construct a stream from a 'Foldable' containing monadic actions. Same as
+-- @'Prelude.foldr' 'consM' 'K.nil'@.
--
-- @
-- runStream $ serially $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1)
@@ -565,14 +626,14 @@ foldlM' step begin m = S.foldlM' step begin $ toStreamS m
-- @since 0.1.1
{-# INLINE null #-}
null :: Monad m => SerialT m a -> m Bool
-null m = K.null m
+null = K.null
-- | Extract the first element of the stream, if any.
--
-- @since 0.1.0
{-# INLINE head #-}
head :: Monad m => SerialT m a -> m (Maybe a)
-head m = K.head m
+head = K.head
-- | Extract all but the first element of the stream, if any.
--
@@ -687,7 +748,8 @@ lookup = K.lookup
find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
find = K.find
--- | Finds all the indices of elements satisfying the given predicate.
+-- | Find all the indices where the element in the stream satisfies the given
+-- predicate.
--
-- @since 0.5.0
{-# INLINE findIndices #-}
@@ -702,8 +764,8 @@ findIndices = K.findIndices
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
findIndex p = head . findIndices p
--- | Finds the index of all elements in the stream which are equal to the
--- given.
+-- | Find all the indices where the value of the element in the stream is equal
+-- to the given value.
--
-- @since 0.5.0
{-# INLINE elemIndices #-}
@@ -751,7 +813,7 @@ toHandle h m = go (toStream m)
let stop = return ()
single a = liftIO (IO.hPutStrLn h a)
yieldk a r = liftIO (IO.hPutStrLn h a) >> go r
- in (K.unStream m1) defState stop single yieldk
+ in K.unStream m1 defState stop single yieldk
------------------------------------------------------------------------------
-- Transformation by Folding (Scans)
diff --git a/src/Streamly/SVar.hs b/src/Streamly/SVar.hs
index 5ee8e57..94183f1 100644
--- a/src/Streamly/SVar.hs
+++ b/src/Streamly/SVar.hs
@@ -8,6 +8,7 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UnboxedTuples #-}
@@ -19,11 +20,6 @@
-- Maintainer : harendra.kumar@gmail.com
-- Stability : experimental
-- Portability : GHC
---
---
-#ifdef DIAGNOSTICS_VERBOSE
-#define DIAGNOSTICS
-#endif
module Streamly.SVar
(
@@ -45,6 +41,8 @@ module Streamly.SVar
, setStreamLatency
, getYieldLimit
, setYieldLimit
+ , getInspectMode
+ , setInspectMode
, cleanupSVar
, cleanupSVarFromWorker
@@ -52,6 +50,8 @@ module Streamly.SVar
-- SVar related
, newAheadVar
, newParallelVar
+ , captureMonadState
+ , RunInIO (..)
, atomicModifyIORefCAS
, WorkerInfo (..)
@@ -77,9 +77,11 @@ module Streamly.SVar
, requeueOnHeapTop
, updateHeapSeq
, withIORef
+ , heapIsSane
, Rate (..)
, getYieldRateInfo
+ , newSVarStats
, collectLatency
, workerUpdateLatency
, isBeyondMaxRate
@@ -99,21 +101,22 @@ module Streamly.SVar
, toStreamVar
, SVarStats (..)
, NanoSecs (..)
-#ifdef DIAGNOSTICS
, dumpSVar
-#endif
)
where
import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar
- (MVar, newEmptyMVar, tryPutMVar, takeMVar, newMVar)
-import Control.Exception (SomeException(..), catch, mask, assert, Exception)
+ (MVar, newEmptyMVar, tryPutMVar, takeMVar, newMVar, tryReadMVar)
+import Control.Exception
+ (SomeException(..), catch, mask, assert, Exception, catches,
+ throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
+ BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
-import Control.Monad.Trans.Control (MonadBaseControl, control)
+import Control.Monad.Trans.Control (MonadBaseControl, control, StM)
import Data.Atomics
(casIORef, readForCAS, peekTicket, atomicModifyIORefCAS_,
writeBarrier, storeLoadBarrier)
@@ -125,29 +128,18 @@ import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.List ((\\))
import Data.Maybe (fromJust)
+import Data.Semigroup ((<>))
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.Clock (TimeSpec, Clock(Monotonic), getTime, toNanoSecs)
+import System.IO (hPutStrLn, stderr)
+import Text.Printf (printf)
import qualified Data.Heap as H
import qualified Data.Set as S
--- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we
--- can keep it on in production to debug problems quickly if and when they
--- happen, but it may result in unexpected output when threads are left hanging
--- until they are GCed because the consumer went away.
-
-#ifdef DIAGNOSTICS
-import Control.Concurrent.MVar (tryTakeMVar)
-import Control.Exception
- (catches, throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
- BlockedIndefinitelyOnSTM(..))
-import System.IO (hPutStrLn, stderr)
-import Text.Printf (printf)
-#endif
-
-- Always use signed arithmetic to avoid inadvertant overflows of signed values
-- on conversion when comparing unsigned quantities with signed.
newtype NanoSecs = NanoSecs Int64
@@ -189,7 +181,8 @@ data ChildEvent a =
-- | Sorting out-of-turn outputs in a heap for Ahead style streams
data AheadHeapEntry (t :: (* -> *) -> * -> *) m a =
- AheadEntryPure a
+ AheadEntryNull
+ | AheadEntryPure a
| AheadEntryStream (t m a)
------------------------------------------------------------------------------
@@ -335,7 +328,8 @@ data Limit = Unlimited | Limited Word deriving Show
data SVar t m a = SVar
{
-- Read only state
- svarStyle :: SVarStyle
+ svarStyle :: SVarStyle
+ , svarMrun :: RunInIO m
-- Shared output queue (events, length)
-- XXX For better efficiency we can try a preallocated array type (perhaps
@@ -370,13 +364,14 @@ data SVar t m a = SVar
, svarStats :: SVarStats
-- to track garbage collection of SVar
, svarRef :: Maybe (IORef ())
-#ifdef DIAGNOSTICS
+
+ -- Only for diagnostics
+ , svarInspectMode :: Bool
, svarCreator :: ThreadId
, outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
, Maybe Int)
-- Shared work queue (stream, seqNo)
, aheadWorkQueue :: IORef ([t m a], Int)
-#endif
}
-------------------------------------------------------------------------------
@@ -400,6 +395,7 @@ data State t m a = State
-- XXX these two can be collapsed into a single type
, _streamLatency :: Maybe NanoSecs -- bootstrap latency
, _maxStreamRate :: Maybe Rate
+ , _inspectMode :: Bool
}
-------------------------------------------------------------------------------
@@ -427,6 +423,7 @@ defState = State
, _bufferHigh = defaultMaxBuffer
, _maxStreamRate = Nothing
, _streamLatency = Nothing
+ , _inspectMode = False
}
-- XXX if perf gets affected we can have all the Nothing params in a single
@@ -496,16 +493,20 @@ getStreamRate = _maxStreamRate
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency n st =
st { _streamLatency =
- if n < 0
+ if n <= 0
then Nothing
- else if n == 0
- then Nothing
- else Just (fromIntegral n)
+ else Just (fromIntegral n)
}
getStreamLatency :: State t m a -> Maybe NanoSecs
getStreamLatency = _streamLatency
+setInspectMode :: State t m a -> State t m a
+setInspectMode st = st { _inspectMode = True }
+
+getInspectMode :: State t m a -> Bool
+getInspectMode = _inspectMode
+
-------------------------------------------------------------------------------
-- Cleanup
-------------------------------------------------------------------------------
@@ -513,21 +514,20 @@ getStreamLatency = _streamLatency
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar sv = do
workers <- readIORef (workerThreads sv)
- Prelude.mapM_ (\tid -> throwTo tid ThreadAbort)
+ Prelude.mapM_ (`throwTo` ThreadAbort)
(S.toList workers)
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker sv = do
workers <- readIORef (workerThreads sv)
self <- myThreadId
- mapM_ (\tid -> throwTo tid ThreadAbort)
+ mapM_ (`throwTo` ThreadAbort)
(S.toList workers \\ [self])
-------------------------------------------------------------------------------
-- Dumping the SVar for debug/diag
-------------------------------------------------------------------------------
-#ifdef DIAGNOSTICS
-- | Convert a number of seconds to a string. The string will consist
-- of four decimal places, followed by a short description of the time
-- units.
@@ -554,8 +554,8 @@ secs k
| otherwise = printf "%.3f %s" t u
-- XXX Code duplicated from collectLatency
-drainLatency :: SVarStats -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
-drainLatency _ss yinfo = do
+drainLatency :: SVar t m a -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
+drainLatency sv yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
@@ -575,21 +575,19 @@ drainLatency _ss yinfo = do
if (pendingCount > 0)
then do
let new = pendingTime `div` (fromIntegral pendingCount)
-#ifdef DIAGNOSTICS
- minLat <- readIORef (minWorkerLatency _ss)
- when (new < minLat || minLat == 0) $
- writeIORef (minWorkerLatency _ss) new
-
- maxLat <- readIORef (maxWorkerLatency _ss)
- when (new > maxLat) $ writeIORef (maxWorkerLatency _ss) new
-#endif
+ when (svarInspectMode sv) $ do
+ let ss = svarStats sv
+ minLat <- readIORef (minWorkerLatency ss)
+ when (new < minLat || minLat == 0) $
+ writeIORef (minWorkerLatency ss) new
+
+ maxLat <- readIORef (maxWorkerLatency ss)
+ when (new > maxLat) $ writeIORef (maxWorkerLatency ss) new
+ modifyIORef (avgWorkerLatency ss) $
+ \(cnt, t) -> (cnt + pendingCount, t + pendingTime)
-- To avoid minor fluctuations update in batches
writeIORef col (0, 0)
writeIORef measured new
-#ifdef DIAGNOSTICS
- modifyIORef (avgWorkerLatency _ss) $
- \(cnt, t) -> (cnt + pendingCount, t + pendingTime)
-#endif
modifyIORef longTerm $ \(_, t) -> (lcount', t)
return (lcount', ltime, new)
else return notUpdated
@@ -599,7 +597,7 @@ dumpSVarStats sv ss style = do
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> do
- _ <- liftIO $ drainLatency (svarStats sv) yinfo
+ _ <- liftIO $ drainLatency sv yinfo
return ()
dispatches <- readIORef $ totalDispatches ss
@@ -621,41 +619,41 @@ dumpSVarStats sv ss style = do
Nothing -> do
now <- getTime Monotonic
let interval = toNanoSecs (now - startTime)
- return $ (cnt, gl, interval `div` fromIntegral cnt)
+ return (cnt, gl, interval `div` fromIntegral cnt)
Just stopTime -> do
let interval = toNanoSecs (stopTime - startTime)
- return $ (cnt, gl, interval `div` fromIntegral cnt)
+ return (cnt, gl, interval `div` fromIntegral cnt)
else return (0, 0, 0)
return $ unlines
- [ "total dispatches = " ++ show dispatches
- , "max workers = " ++ show maxWrk
- , "max outQSize = " ++ show maxOq
- ++ (if style == AheadVar
- then "\nheap max size = " ++ show maxHp
+ [ "total dispatches = " <> show dispatches
+ , "max workers = " <> show maxWrk
+ , "max outQSize = " <> show maxOq
+ <> (if style == AheadVar
+ then "\nheap max size = " <> show maxHp
else "")
- ++ (if minLat > 0
+ <> (if minLat > 0
then "\nmin worker latency = "
- ++ secs (fromIntegral minLat * 1e-9)
+ <> secs (fromIntegral minLat * 1e-9)
else "")
- ++ (if maxLat > 0
+ <> (if maxLat > 0
then "\nmax worker latency = "
- ++ secs (fromIntegral maxLat * 1e-9)
+ <> secs (fromIntegral maxLat * 1e-9)
else "")
- ++ (if avgCnt > 0
+ <> (if avgCnt > 0
then let lat = avgTime `div` fromIntegral avgCnt
in "\navg worker latency = "
- ++ secs (fromIntegral lat * 1e-9)
+ <> secs (fromIntegral lat * 1e-9)
else "")
- ++ (if svarLat > 0
+ <> (if svarLat > 0
then "\nSVar latency = "
- ++ secs (fromIntegral svarLat * 1e-9)
+ <> secs (fromIntegral svarLat * 1e-9)
else "")
- ++ (if svarCnt > 0
- then "\nSVar yield count = " ++ show svarCnt
+ <> (if svarCnt > 0
+ then "\nSVar yield count = " <> show svarCnt
else "")
- ++ (if svarGainLossCnt > 0
- then "\nSVar gain/loss yield count = " ++ show svarGainLossCnt
+ <> (if svarGainLossCnt > 0
+ then "\nSVar gain/loss yield count = " <> show svarGainLossCnt
else "")
]
@@ -663,17 +661,17 @@ dumpSVarStats sv ss style = do
dumpSVar :: SVar t m a -> IO String
dumpSVar sv = do
(oqList, oqLen) <- readIORef $ outputQueue sv
- db <- tryTakeMVar $ outputDoorBell sv
+ db <- tryReadMVar $ outputDoorBell sv
aheadDump <-
if svarStyle sv == AheadVar
then do
(oheap, oheapSeq) <- readIORef $ outputHeap sv
(wq, wqSeq) <- readIORef $ aheadWorkQueue sv
return $ unlines
- [ "heap length = " ++ show (H.size oheap)
- , "heap seqeunce = " ++ show oheapSeq
- , "work queue length = " ++ show (length wq)
- , "work queue sequence = " ++ show wqSeq
+ [ "heap length = " <> show (H.size oheap)
+ , "heap seqeunce = " <> show oheapSeq
+ , "work queue length = " <> show (length wq)
+ , "work queue sequence = " <> show wqSeq
]
else return []
@@ -687,46 +685,52 @@ dumpSVar sv = do
stats <- dumpSVarStats sv (svarStats sv) (svarStyle sv)
return $ unlines
- [ "Creator tid = " ++ show (svarCreator sv)
- , "style = " ++ show (svarStyle sv)
+ [
+ "Creator tid = " <> show (svarCreator sv),
+ "style = " <> show (svarStyle sv)
, "---------CURRENT STATE-----------"
- , "outputQueue length computed = " ++ show (length oqList)
- , "outputQueue length maintained = " ++ show oqLen
+ , "outputQueue length computed = " <> show (length oqList)
+ , "outputQueue length maintained = " <> show oqLen
-- XXX print the types of events in the outputQueue, first 5
- , "outputDoorBell = " ++ show db
+ , "outputDoorBell = " <> show db
]
- ++ aheadDump ++ unlines
- [ "needDoorBell = " ++ show waiting
- , "running threads = " ++ show rthread
+ <> aheadDump
+ <> unlines
+ [ "needDoorBell = " <> show waiting
+ , "running threads = " <> show rthread
-- XXX print the status of first 5 threads
- , "running thread count = " ++ show workers
+ , "running thread count = " <> show workers
]
- ++ "---------STATS-----------\n"
- ++ stats
+ <> "---------STATS-----------\n"
+ <> stats
+
+-- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we
+-- can keep it on in production to debug problems quickly if and when they
+-- happen, but it may result in unexpected output when threads are left hanging
+-- until they are GCed because the consumer went away.
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler sv label e@BlockedIndefinitelyOnMVar = do
svInfo <- dumpSVar sv
- hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnMVar\n" ++ svInfo
+ hPutStrLn stderr $ label <> " " <> "BlockedIndefinitelyOnMVar\n" <> svInfo
throwIO e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler sv label e@BlockedIndefinitelyOnSTM = do
svInfo <- dumpSVar sv
- hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnSTM\n" ++ svInfo
+ hPutStrLn stderr $ label <> " " <> "BlockedIndefinitelyOnSTM\n" <> svInfo
throwIO e
-withDBGMVar :: SVar t m a -> String -> IO () -> IO ()
-withDBGMVar sv label action =
- action `catches` [ Handler (mvarExcHandler sv label)
- , Handler (stmExcHandler sv label)
- ]
-#else
-withDBGMVar :: SVar t m a -> String -> IO () -> IO ()
-withDBGMVar _ _ action = action
-#endif
+withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
+withDiagMVar sv label action =
+ if svarInspectMode sv
+ then
+ action `catches` [ Handler (mvarExcHandler sv label)
+ , Handler (stmExcHandler sv label)
+ ]
+ else action
-------------------------------------------------------------------------------
-- CAS
@@ -751,6 +755,21 @@ atomicModifyIORefCAS ref fn = do
then return result
else loop tkt (tries - 1)
+{-# INLINE ringDoorBell #-}
+ringDoorBell :: SVar t m a -> IO ()
+ringDoorBell sv = do
+ storeLoadBarrier
+ w <- readIORef $ needDoorBell sv
+ when w $ do
+ -- Note: the sequence of operations is important for correctness here.
+ -- We need to set the flag to false strictly before sending the
+ -- outputDoorBell, otherwise the outputDoorBell may get processed too early and
+ -- then we may set the flag to False to later making the consumer lose
+ -- the flag, even without receiving a outputDoorBell.
+ atomicModifyIORefCAS_ (needDoorBell sv) (const False)
+ void $ tryPutMVar (outputDoorBell sv) ()
+
+
------------------------------------------------------------------------------
-- Spawning threads and collecting result in streamed fashion
------------------------------------------------------------------------------
@@ -762,6 +781,18 @@ atomicModifyIORefCAS ref fn = do
-- @since 0.1.0
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
+-- When we run computations concurrently, we completely isolate the state of
+-- the concurrent computations from the parent computation. The invariant is
+-- that we should never be running two concurrent computations in the same
+-- thread without using the runInIO function. Also, we should never be running
+-- a concurrent computation in the parent thread, otherwise it may affect the
+-- state of the parent which is against the defined semantics of concurrent
+-- execution.
+newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO (StM m b) }
+
+captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
+captureMonadState = control $ \run -> run (return $ RunInIO run)
+
-- Stolen from the async package. The perf improvement is modest, 2% on a
-- thread heavy benchmark (parallel composition using noop computations).
-- A version of forkIO that does not include the outer exception
@@ -770,19 +801,20 @@ type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
- case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
+ case fork# action s of (# s1, tid #) -> (# s1, ThreadId tid #)
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
+ -> RunInIO m
-> (SomeException -> IO ())
-> m ThreadId
-doFork action exHandler =
- control $ \runInIO ->
+doFork action (RunInIO mrun) exHandler =
+ control $ \run ->
mask $ \restore -> do
- tid <- rawForkIO $ catch (restore $ void $ runInIO action)
+ tid <- rawForkIO $ catch (restore $ void $ mrun action)
exHandler
- runInIO (return tid)
+ run (return tid)
-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv
-- faster, along with the fields in sv required by send?
@@ -857,7 +889,7 @@ send sv msg = do
Unlimited -> return True
Limited lim -> do
active <- readIORef (workerCount sv)
- return $ len < ((fromIntegral lim) - active)
+ return $ len < (fromIntegral lim - active)
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecs))
workerCollectLatency winfo = do
@@ -865,7 +897,7 @@ workerCollectLatency winfo = do
cnt1 <- readIORef (workerYieldCount winfo)
let cnt = cnt1 - cnt0
- if (cnt > 0)
+ if cnt > 0
then do
t1 <- getTime Monotonic
let period = fromInteger $ toNanoSecs (t1 - t0)
@@ -919,7 +951,7 @@ checkRatePeriodic :: SVar t m a
checkRatePeriodic sv yinfo winfo ycnt = do
i <- readIORef (workerPollingInterval yinfo)
-- XXX use generation count to check if the interval has been updated
- if (i /= 0 && (ycnt `mod` i) == 0)
+ if i /= 0 && (ycnt `mod` i) == 0
then do
workerUpdateLatency yinfo winfo
-- XXX not required for parallel streams
@@ -961,12 +993,11 @@ workerStopUpdate winfo info = do
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop sv mwinfo = do
atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
- case mwinfo of
- Just winfo ->
- case yieldRateInfo sv of
- Nothing -> return ()
- Just info -> workerStopUpdate winfo info
- Nothing -> return ()
+ case (mwinfo, yieldRateInfo sv) of
+ (Just winfo, Just info) ->
+ workerStopUpdate winfo info
+ _ ->
+ return ()
myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
-------------------------------------------------------------------------------
@@ -981,16 +1012,7 @@ sendStop sv mwinfo = do
enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO sv q m = do
atomicModifyIORefCAS_ q $ \ms -> m : ms
- storeLoadBarrier
- w <- readIORef $ needDoorBell sv
- when w $ do
- -- Note: the sequence of operations is important for correctness here.
- -- We need to set the flag to false strictly before sending the
- -- outputDoorBell, otherwise the outputDoorBell may get processed too early and
- -- then we may set the flag to False to later making the consumer lose
- -- the flag, even without receiving a outputDoorBell.
- atomicModifyIORefCAS_ (needDoorBell sv) (const False)
- void $ tryPutMVar (outputDoorBell sv) ()
+ ringDoorBell sv
-------------------------------------------------------------------------------
-- WAsync
@@ -1004,16 +1026,7 @@ enqueueLIFO sv q m = do
enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO sv q m = do
pushL q m
- storeLoadBarrier
- w <- readIORef $ needDoorBell sv
- when w $ do
- -- Note: the sequence of operations is important for correctness here.
- -- We need to set the flag to false strictly before sending the
- -- outputDoorBell, otherwise the outputDoorBell may get processed too early and
- -- then we may set the flag to False to later making the consumer lose
- -- the flag, even without receiving a outputDoorBell.
- atomicModifyIORefCAS_ (needDoorBell sv) (const False)
- void $ tryPutMVar (outputDoorBell sv) ()
+ ringDoorBell sv
-------------------------------------------------------------------------------
-- Ahead
@@ -1085,16 +1098,7 @@ enqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n + 1) -- increment sequence
_ -> error "not empty"
- storeLoadBarrier
- w <- readIORef $ needDoorBell sv
- when w $ do
- -- Note: the sequence of operations is important for correctness here.
- -- We need to set the flag to false strictly before sending the
- -- outputDoorBell, otherwise the outputDoorBell may get processed too early and
- -- then we may set the flag to False to later making the consumer lose
- -- the flag, even without receiving a outputDoorBell.
- atomicModifyIORefCAS_ (needDoorBell sv) (const False)
- void $ tryPutMVar (outputDoorBell sv) ()
+ ringDoorBell sv
-- enqueue without incrementing the sequence number
{-# INLINE reEnqueueAhead #-}
@@ -1103,11 +1107,7 @@ reEnqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n) -- DO NOT increment sequence
_ -> error "not empty"
- storeLoadBarrier
- w <- readIORef $ needDoorBell sv
- when w $ do
- atomicModifyIORefCAS_ (needDoorBell sv) (const False)
- void $ tryPutMVar (outputDoorBell sv) ()
+ ringDoorBell sv
-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
@@ -1138,7 +1138,7 @@ queueEmptyAhead q = liftIO $ do
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
-dequeueAhead q = liftIO $ do
+dequeueAhead q = liftIO $
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
@@ -1171,9 +1171,11 @@ dequeueFromHeap hpVar =
Just n -> do
let r = H.uncons hp
case r of
- Just (ent@(Entry seqNo _ev), hp') | seqNo == n ->
- ((hp', Nothing), Ready ent)
- _ -> (pair, Waiting n)
+ Just (ent@(Entry seqNo _ev), hp') ->
+ if seqNo == n
+ then ((hp', Nothing), Ready ent)
+ else assert (seqNo >= n) (pair, Waiting n)
+ Nothing -> (pair, Waiting n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
@@ -1186,11 +1188,19 @@ dequeueFromHeapSeq hpVar i =
Nothing -> do
let r = H.uncons hp
case r of
- Just (ent@(Entry seqNo _ev), hp') | seqNo == i ->
- ((hp', Nothing), Ready ent)
- _ -> ((hp, Just i), Waiting i)
+ Just (ent@(Entry seqNo _ev), hp') ->
+ if seqNo == i
+ then ((hp', Nothing), Ready ent)
+ else assert (seqNo >= i) ((hp, Just i), Waiting i)
+ Nothing -> ((hp, Just i), Waiting i)
Just _ -> error "dequeueFromHeapSeq: unreachable"
+heapIsSane :: Maybe Int -> Int -> Bool
+heapIsSane snum seqNo =
+ case snum of
+ Nothing -> True
+ Just n -> seqNo >= n
+
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
@@ -1198,7 +1208,8 @@ requeueOnHeapTop
-> Int
-> IO ()
requeueOnHeapTop hpVar ent seqNo =
- atomicModifyIORef_ hpVar $ \(hp, _) -> (H.insert ent hp, Just seqNo)
+ atomicModifyIORef_ hpVar $ \(hp, snum) ->
+ assert (heapIsSane snum seqNo) (H.insert ent hp, Just seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
@@ -1206,7 +1217,8 @@ updateHeapSeq
-> Int
-> IO ()
updateHeapSeq hpVar seqNo =
- atomicModifyIORef_ hpVar $ \(hp, _) -> (hp, Just seqNo)
+ atomicModifyIORef_ hpVar $ \(hp, snum) ->
+ assert (heapIsSane snum seqNo) (hp, Just seqNo)
-------------------------------------------------------------------------------
-- WAhead
@@ -1235,7 +1247,7 @@ addThread sv tid =
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread sv tid =
- liftIO $ modifyIORef (workerThreads sv) $ (\s -> S.delete tid s)
+ liftIO $ modifyIORef (workerThreads sv) (S.delete tid)
-- If present then delete else add. This takes care of out of order add and
-- delete i.e. a delete arriving before we even added a thread.
@@ -1245,14 +1257,13 @@ delThread sv tid =
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread sv tid = do
changed <- liftIO $ atomicModifyIORefCAS (workerThreads sv) $ \old ->
- if (S.member tid old)
- then let new = (S.delete tid old) in (new, new)
- else let new = (S.insert tid old) in (new, old)
- if null changed
- then liftIO $ do
- writeBarrier
- void $ tryPutMVar (outputDoorBell sv) ()
- else return ()
+ if S.member tid old
+ then let new = S.delete tid old in (new, new)
+ else let new = S.insert tid old in (new, old)
+ when (null changed) $
+ liftIO $ do
+ writeBarrier
+ void $ tryPutMVar (outputDoorBell sv) ()
-- | This is safe even if we are adding more threads concurrently because if
-- a child thread is adding another thread then anyway 'workerThreads' will
@@ -1267,22 +1278,19 @@ handleChildException sv e = do
tid <- myThreadId
void $ send sv (ChildStop tid (Just e))
-#ifdef DIAGNOSTICS
+{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers sv = liftIO $ do
active <- readIORef (workerCount sv)
maxWrk <- readIORef (maxWorkers $ svarStats sv)
when (active > maxWrk) $ writeIORef (maxWorkers $ svarStats sv) active
modifyIORef (totalDispatches $ svarStats sv) (+1)
-#endif
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker yieldMax sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
-#ifdef DIAGNOSTICS
- recordMaxWorkers sv
-#endif
+ when (svarInspectMode sv) $ recordMaxWorkers sv
-- This allocation matters when significant number of workers are being
-- sent. We allocate it only when needed.
winfo <-
@@ -1292,12 +1300,13 @@ pushWorker yieldMax sv = do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
- return $ Just $ WorkerInfo
+ return $ Just WorkerInfo
{ workerYieldMax = yieldMax
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
- doFork (workLoop sv winfo) (handleChildException sv) >>= addThread sv
+ doFork (workLoop sv winfo) (svarMrun sv) (handleChildException sv)
+ >>= addThread sv
-- XXX we can push the workerCount modification in accountThread and use the
-- same pushWorker for Parallel case as well.
@@ -1307,33 +1316,41 @@ pushWorker yieldMax sv = do
-- producer side. So we need to use a thread safe modification of
-- workerThreads. Alternatively, we can use a CreateThread event to avoid
-- using a CAS based modification.
-{-# NOINLINE pushWorkerPar #-}
-pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
-pushWorkerPar sv wloop = do
- -- We do not use workerCount in case of ParallelVar but still there is no
- -- harm in maintaining it correctly.
-#ifdef DIAGNOSTICS
- liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
- recordMaxWorkers sv
- -- This allocation matters when significant number of workers are being
- -- sent. We allocate it only when needed. The overhead increases by 4x.
- winfo <-
- case yieldRateInfo sv of
- Nothing -> return Nothing
- Just _ -> liftIO $ do
- cntRef <- newIORef 0
- t <- getTime Monotonic
- lat <- newIORef (0, t)
- return $ Just $ WorkerInfo
- { workerYieldMax = 0
- , workerYieldCount = cntRef
- , workerLatencyStart = lat
- }
+{-# INLINE pushWorkerPar #-}
+pushWorkerPar
+ :: MonadAsync m
+ => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
+pushWorkerPar sv wloop =
+ if svarInspectMode sv
+ then forkWithDiag
+ else doFork (wloop Nothing) (svarMrun sv) (handleChildException sv)
+ >>= modifyThread sv
- doFork (wloop winfo) (handleChildException sv) >>= modifyThread sv
-#else
- doFork (wloop Nothing) (handleChildException sv) >>= modifyThread sv
-#endif
+ where
+
+ {-# NOINLINE forkWithDiag #-}
+ forkWithDiag = do
+ -- We do not use workerCount in case of ParallelVar but still there is
+ -- no harm in maintaining it correctly.
+ liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
+ recordMaxWorkers sv
+ -- This allocation matters when significant number of workers are being
+ -- sent. We allocate it only when needed. The overhead increases by 4x.
+ winfo <-
+ case yieldRateInfo sv of
+ Nothing -> return Nothing
+ Just _ -> liftIO $ do
+ cntRef <- newIORef 0
+ t <- getTime Monotonic
+ lat <- newIORef (0, t)
+ return $ Just WorkerInfo
+ { workerYieldMax = 0
+ , workerYieldCount = cntRef
+ , workerLatencyStart = lat
+ }
+
+ doFork (wloop winfo) (svarMrun sv) (handleChildException sv)
+ >>= modifyThread sv
-- Returns:
-- True: can dispatch more
@@ -1347,14 +1364,14 @@ dispatchWorker yieldCount sv = do
-- Note, "done" may not mean that the work is actually finished if there
-- are workers active, because there may be a worker which has not yet
-- queued the leftover work.
- if (not done)
+ if not done
then do
qDone <- liftIO $ isQueueDone sv
-- Note that the worker count is only decremented during event
-- processing in fromStreamVar and therefore it is safe to read and
-- use it without a lock.
active <- liftIO $ readIORef $ workerCount sv
- if (not qDone)
+ if not qDone
then do
-- Note that we may deadlock if the previous workers (tasks in the
-- stream) wait/depend on the future workers (tasks in the stream)
@@ -1408,7 +1425,7 @@ rateRecoveryTime :: NanoSecs
rateRecoveryTime = 1000000
nanoToMicroSecs :: NanoSecs -> Int
-nanoToMicroSecs s = (fromIntegral s) `div` 1000
+nanoToMicroSecs s = fromIntegral s `div` 1000
-- We either block, or send one worker with limited yield count or one or more
-- workers with unlimited yield count.
@@ -1493,7 +1510,7 @@ estimateWorkers workerLimit svarYields gainLossYields
in assert (sleepTime >= 0) $
-- if s is less than 0 it means our maxSleepTime is less
-- than the worker latency.
- if (s > 0) then BlockWait s else ManyWorkers 1 (Count 0)
+ if s > 0 then BlockWait s else ManyWorkers 1 (Count 0)
where
withLimit n =
case workerLimit of
@@ -1519,7 +1536,7 @@ getWorkerLatency yinfo = do
pendingTime = colTime + time
new =
if pendingCount > 0
- then let lat = pendingTime `div` (fromIntegral pendingCount)
+ then let lat = pendingTime `div` fromIntegral pendingCount
-- XXX Give more weight to new?
in (lat + prev) `div` 2
else prev
@@ -1565,8 +1582,8 @@ updateWorkerPollingInterval yinfo latency = do
-- whereas the average is used for future estimates e.g. how many workers
-- should be maintained to maintain the rate.
-- CAUTION! keep it in sync with getWorkerLatency
-collectLatency :: SVarStats -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
-collectLatency _ss yinfo = do
+collectLatency :: SVar t m a -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
+collectLatency sv yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
@@ -1583,33 +1600,33 @@ collectLatency _ss yinfo = do
lcount' = lcount + pendingCount
tripleWith lat = (lcount', ltime, lat)
- if (pendingCount > 0)
+ if pendingCount > 0
then do
let new = pendingTime `div` (fromIntegral pendingCount)
-#ifdef DIAGNOSTICS
- minLat <- readIORef (minWorkerLatency _ss)
- when (new < minLat || minLat == 0) $
- writeIORef (minWorkerLatency _ss) new
-
- maxLat <- readIORef (maxWorkerLatency _ss)
- when (new > maxLat) $ writeIORef (maxWorkerLatency _ss) new
-#endif
+ when (svarInspectMode sv) $ do
+ let ss = svarStats sv
+ minLat <- readIORef (minWorkerLatency ss)
+ when (new < minLat || minLat == 0) $
+ writeIORef (minWorkerLatency ss) new
+
+ maxLat <- readIORef (maxWorkerLatency ss)
+ when (new > maxLat) $ writeIORef (maxWorkerLatency ss) new
-- When we have collected a significant sized batch we compute the new
-- latency using that batch and return the new latency, otherwise we
-- return the previous latency derived from the previous batch.
if (pendingCount > fromIntegral magicMaxBuffer)
|| (pendingTime > minThreadDelay)
- || (let r = (fromIntegral new) / (fromIntegral prev) :: Double
+ || (let r = fromIntegral new / fromIntegral prev :: Double
in prev > 0 && (r > 2 || r < 0.5))
|| (prev == 0)
then do
+ when (svarInspectMode sv) $ do
+ let ss = svarStats sv
+ modifyIORef (avgWorkerLatency ss) $
+ \(cnt, t) -> (cnt + pendingCount, t + pendingTime)
updateWorkerPollingInterval yinfo (max new prev)
writeIORef col (0, 0)
writeIORef measured ((prev + new) `div` 2)
-#ifdef DIAGNOSTICS
- modifyIORef (avgWorkerLatency _ss) $
- \(cnt, t) -> (cnt + pendingCount, t + pendingTime)
-#endif
modifyIORef longTerm $ \(_, t) -> (lcount', t)
return $ tripleWith new
else do
@@ -1631,7 +1648,7 @@ dispatchWorkerPaced sv = do
(svarYields, svarElapsed, wLatency) <- do
now <- liftIO $ getTime Monotonic
(yieldCount, baseTime, lat) <-
- liftIO $ collectLatency (svarStats sv) yinfo
+ liftIO $ collectLatency sv yinfo
let elapsed = fromInteger $ toNanoSecs $ now - baseTime
let latency =
if lat == 0
@@ -1698,18 +1715,17 @@ dispatchWorkerPaced sv = do
liftIO $ writeIORef periodRef period
cnt <- liftIO $ readIORef $ workerCount sv
- if (cnt < netWorkers)
+ if cnt < netWorkers
then do
let total = netWorkers - cnt
batch = max 1 $ fromIntegral $
minThreadDelay `div` targetLat
- r <- dispatchN (min total batch)
-- XXX stagger the workers over a period?
-- XXX cannot sleep, as that would mean we cannot process the
-- outputs. need to try a different mechanism to stagger.
-- when (total > batch) $
-- liftIO $ threadDelay $ nanoToMicroSecs minThreadDelay
- return r
+ dispatchN (min total batch)
else return False
where
@@ -1723,7 +1739,7 @@ dispatchWorkerPaced sv = do
else yields + buf
liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta)
- dispatchN n = do
+ dispatchN n =
if n == 0
then return True
else do
@@ -1736,12 +1752,13 @@ sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced _ = return ()
sendWorkerDelay :: SVar t m a -> IO ()
-sendWorkerDelay _sv = do
+sendWorkerDelay _sv =
-- XXX we need a better way to handle this than hardcoded delays. The
-- delays may be different for different systems.
-- If there is a usecase where this is required we can create a combinator
-- to set it as a config in the state.
{-
+ do
ncpu <- getNumCapabilities
if ncpu <= 1
then
@@ -1816,7 +1833,7 @@ sendWorkerWait delay dispatch sv = do
-- doorbell on the next enqueue.
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
- liftIO $ storeLoadBarrier
+ liftIO storeLoadBarrier
canDoMore <- dispatch sv
-- XXX test for the case when we miss sending a worker when the worker
@@ -1829,7 +1846,7 @@ sendWorkerWait delay dispatch sv = do
if canDoMore
then sendWorkerWait delay dispatch sv
else do
- liftIO $ withDBGMVar sv "sendWorkerWait: nothing to do"
+ liftIO $ withDiagMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
when (len <= 0) $ sendWorkerWait delay dispatch sv
@@ -1838,10 +1855,10 @@ sendWorkerWait delay dispatch sv = do
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw sv = do
(list, len) <- atomicModifyIORefCAS (outputQueue sv) $ \x -> (([],0), x)
-#ifdef DIAGNOSTICS
- oqLen <- readIORef (maxOutQSize $ svarStats sv)
- when (len > oqLen) $ writeIORef (maxOutQSize $ svarStats sv) len
-#endif
+ when (svarInspectMode sv) $ do
+ let ref = maxOutQSize $ svarStats sv
+ oqLen <- readIORef ref
+ when (len > oqLen) $ writeIORef ref len
return (list, len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
@@ -1865,12 +1882,12 @@ readOutputQBounded sv = do
cnt <- liftIO $ readIORef $ workerCount sv
when (cnt <= 0) $ do
done <- liftIO $ isWorkDone sv
- when (not done) $ pushWorker 0 sv
+ when (not done) (pushWorker 0 sv)
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelay (dispatchWorker 0) sv
- liftIO $ (readOutputQRaw sv >>= return . fst)
+ liftIO (fst `fmap` readOutputQRaw sv)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced sv = do
@@ -1888,7 +1905,7 @@ readOutputQPaced sv = do
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelayPaced dispatchWorkerPaced sv
- liftIO $ (readOutputQRaw sv >>= return . fst)
+ liftIO (fst `fmap` readOutputQRaw sv)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded sv = do
@@ -1907,7 +1924,7 @@ postProcessBounded sv = do
r <- liftIO $ isWorkDone sv
-- Note that we need to guarantee a worker, therefore we cannot just
-- use dispatchWorker which may or may not send a worker.
- when (not r) $ pushWorker 0 sv
+ when (not r) (pushWorker 0 sv)
-- XXX do we need to dispatch many here?
-- void $ dispatchWorker sv
return r
@@ -1968,6 +1985,30 @@ getYieldRateInfo st = do
, svarAllTimeLatency = wlong
}
+newSVarStats :: IO SVarStats
+newSVarStats = do
+ disp <- newIORef 0
+ maxWrk <- newIORef 0
+ maxOq <- newIORef 0
+ maxHs <- newIORef 0
+ maxWq <- newIORef 0
+ avgLat <- newIORef (0, NanoSecs 0)
+ maxLat <- newIORef (NanoSecs 0)
+ minLat <- newIORef (NanoSecs 0)
+ stpTime <- newIORef Nothing
+
+ return SVarStats
+ { totalDispatches = disp
+ , maxWorkers = maxWrk
+ , maxOutQSize = maxOq
+ , maxHeapSize = maxHs
+ , maxWorkQSize = maxWq
+ , avgWorkerLatency = avgLat
+ , minWorkerLatency = minLat
+ , maxWorkerLatency = maxLat
+ , svarStopTime = stpTime
+ }
+
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
@@ -1976,8 +2017,9 @@ getAheadSVar :: MonadAsync m
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
+ -> RunInIO m
-> IO (SVar t m a)
-getAheadSVar st f = do
+getAheadSVar st f mrun = do
outQ <- newIORef ([], 0)
-- the second component of the tuple is "Nothing" when heap is being
-- cleared, "Just n" when we are expecting sequence number n to arrive
@@ -1987,6 +2029,8 @@ getAheadSVar st f = do
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
+ -- Sequence number is incremented whenever something is queued, therefore,
+ -- first sequence number would be 0
q <- newIORef ([], -1)
stopMVar <- newMVar ()
yl <- case getYieldLimit st of
@@ -1994,18 +2038,8 @@ getAheadSVar st f = do
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
- disp <- newIORef 0
- maxWrk <- newIORef 0
- maxOq <- newIORef 0
- maxHs <- newIORef 0
- maxWq <- newIORef 0
- avgLat <- newIORef (0, NanoSecs 0)
- maxLat <- newIORef (NanoSecs 0)
- minLat <- newIORef (NanoSecs 0)
- stpTime <- newIORef Nothing
-#ifdef DIAGNOSTICS
+ stats <- newSVarStats
tid <- myThreadId
-#endif
let getSVar sv readOutput postProc = SVar
{ outputQueue = outQ
@@ -2023,26 +2057,16 @@ getAheadSVar st f = do
, isQueueDone = isQueueDoneAhead sv q
, needDoorBell = wfw
, svarStyle = AheadVar
+ , svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = stopMVar
, svarRef = Nothing
-#ifdef DIAGNOSTICS
+ , svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = q
, outputHeap = outH
-#endif
- , svarStats = SVarStats
- { totalDispatches = disp
- , maxWorkers = maxWrk
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
- , avgWorkerLatency = avgLat
- , minWorkerLatency = minLat
- , maxWorkerLatency = maxLat
- , svarStopTime = stpTime
- }
+ , svarStats = stats
}
let sv =
@@ -2079,8 +2103,8 @@ getAheadSVar st f = do
(xs, _) <- readIORef q
return $ null xs
-getParallelSVar :: MonadIO m => State t m a -> IO (SVar t m a)
-getParallelSVar st = do
+getParallelSVar :: MonadIO m => State t m a -> RunInIO m -> IO (SVar t m a)
+getParallelSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
@@ -2090,18 +2114,8 @@ getParallelSVar st = do
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
- disp <- newIORef 0
- maxWrk <- newIORef 0
- maxOq <- newIORef 0
- maxHs <- newIORef 0
- maxWq <- newIORef 0
- avgLat <- newIORef (0, NanoSecs 0)
- maxLat <- newIORef (NanoSecs 0)
- minLat <- newIORef (NanoSecs 0)
- stpTime <- newIORef Nothing
-#ifdef DIAGNOSTICS
+ stats <- newSVarStats
tid <- myThreadId
-#endif
let sv =
SVar { outputQueue = outQ
@@ -2120,37 +2134,28 @@ getParallelSVar st = do
, isQueueDone = undefined
, needDoorBell = undefined
, svarStyle = ParallelVar
+ , svarMrun = mrun
, workerCount = active
, accountThread = modifyThread sv
, workerStopMVar = undefined
, svarRef = Nothing
-#ifdef DIAGNOSTICS
+ , svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
-#endif
- , svarStats = SVarStats
- { totalDispatches = disp
- , maxWorkers = maxWrk
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
- , avgWorkerLatency = avgLat
- , minWorkerLatency = minLat
- , maxWorkerLatency = maxLat
- , svarStopTime = stpTime
- }
+ , svarStats = stats
}
in return sv
where
readOutputQPar sv = liftIO $ do
- withDBGMVar sv "readOutputQPar: doorbell" $ takeMVar (outputDoorBell sv)
+ withDiagMVar sv "readOutputQPar: doorbell"
+ $ takeMVar (outputDoorBell sv)
case yieldRateInfo sv of
Nothing -> return ()
- Just yinfo -> void $ collectLatency (svarStats sv) yinfo
- readOutputQRaw sv >>= return . fst
+ Just yinfo -> void $ collectLatency sv yinfo
+ fst `fmap` readOutputQRaw sv
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker sv m = do
@@ -2160,7 +2165,7 @@ sendFirstWorker sv m = do
liftIO $ enqueue sv m
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
- Just yinfo -> do
+ Just yinfo ->
if svarLatencyTarget yinfo == maxBound
then liftIO $ threadDelay maxBound
else pushWorker 1 sv
@@ -2178,19 +2183,22 @@ newAheadVar :: MonadAsync m
-> m ())
-> m (SVar t m a)
newAheadVar st m wloop = do
- sv <- liftIO $ getAheadSVar st wloop
+ mrun <- captureMonadState
+ sv <- liftIO $ getAheadSVar st wloop mrun
sendFirstWorker sv m
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m => State t m a -> m (SVar t m a)
-newParallelVar st = liftIO $ getParallelSVar st
+newParallelVar st = do
+ mrun <- captureMonadState
+ liftIO $ getParallelSVar st mrun
-- XXX this errors out for Parallel/Ahead SVars
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar sv m = do
- liftIO $ (enqueue sv) m
+ liftIO $ enqueue sv m
done <- allThreadsDone sv
-- XXX This is safe only when called from the consumer thread or when no
-- consumer is present. There may be a race if we are not running in the
diff --git a/src/Streamly/Streams/Ahead.hs b/src/Streamly/Streams/Ahead.hs
index e87e1bd..e7d5fe4 100644
--- a/src/Streamly/Streams/Ahead.hs
+++ b/src/Streamly/Streams/Ahead.hs
@@ -5,13 +5,8 @@
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-#ifdef DIAGNOSTICS_VERBOSE
-#define DIAGNOSTICS
-#endif
-
-- |
-- Module : Streamly.Streams.Ahead
-- Copyright : (c) 2017 Harendra Kumar
@@ -32,7 +27,8 @@ module Streamly.Streams.Ahead
where
import Control.Concurrent.MVar (putMVar, takeMVar)
-import Control.Monad (ap, void)
+import Control.Exception (assert)
+import Control.Monad (ap, void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class (MonadError(..))
@@ -41,7 +37,7 @@ import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Heap (Heap, Entry(..))
-import Data.IORef (IORef, readIORef, atomicModifyIORef)
+import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
import GHC.Exts (inline)
@@ -51,13 +47,11 @@ import qualified Data.Heap as H
import Streamly.Streams.SVar (fromSVar)
import Streamly.Streams.Serial (map)
import Streamly.SVar
-import Streamly.Streams.StreamK (IsStream(..), Stream(..))
+import Streamly.Streams.StreamK
+ (IsStream(..), Stream(..), unstreamShared, unStreamIsolated,
+ runStreamSVar)
import qualified Streamly.Streams.StreamK as K
-#ifdef DIAGNOSTICS
-import Control.Monad (when)
-import Data.IORef (writeIORef)
-#endif
import Prelude hiding (map)
#include "Instances.hs"
@@ -148,15 +142,13 @@ underMaxHeap sv hp = do
-- XXX simplify this
let maxHeap = case maxBufferLimit sv of
Limited lim -> Limited $
- if (fromIntegral lim) >= len
- then lim - (fromIntegral len)
- else 0
+ max 0 (lim - fromIntegral len)
Unlimited -> Unlimited
case maxHeap of
Limited lim -> do
active <- readIORef (workerCount sv)
- return $ H.size hp + active <= (fromIntegral lim)
+ return $ H.size hp + active <= fromIntegral lim
Unlimited -> return True
-- Return value:
@@ -166,7 +158,7 @@ preStopCheck ::
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
-> IO Bool
-preStopCheck sv heap = do
+preStopCheck sv heap =
-- check the stop condition under a lock before actually
-- stopping so that the whole herd does not stop at once.
withIORef heap $ \(hp, _) -> do
@@ -187,6 +179,17 @@ preStopCheck sv heap = do
if rateOk then continue else stop
else stop
+abortExecution ::
+ IORef ([Stream m a], Int)
+ -> SVar Stream m a
+ -> Maybe WorkerInfo
+ -> Stream m a
+ -> IO ()
+abortExecution q sv winfo m = do
+ reEnqueueAhead sv q m
+ incrementYieldLimit sv
+ sendStop sv winfo
+
-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
@@ -230,8 +233,9 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
sendStop sv winfo
else runStreamWithYieldLimit True seqNo r
- loopHeap seqNo ent = do
+ loopHeap seqNo ent =
case ent of
+ AheadEntryNull -> nextHeap seqNo
AheadEntryPure a -> do
-- Use 'send' directly so that we do not account this in worker
-- latency as this will not be the real latency.
@@ -239,7 +243,7 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
-- transferring available results from heap to outputQueue.
void $ liftIO $ send sv (ChildYield a)
nextHeap seqNo
- AheadEntryStream r -> do
+ AheadEntryStream r ->
if stopping
then stopIfNeeded ent seqNo r
else runStreamWithYieldLimit True seqNo r
@@ -249,14 +253,14 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
case res of
Ready (Entry seqNo hent) -> loopHeap seqNo hent
Clearing -> liftIO $ sendStop sv winfo
- _ -> do
+ Waiting _ ->
if stopping
then do
r <- liftIO $ preStopCheck sv heap
if r
then liftIO $ sendStop sv winfo
else processWorkQueue prevSeqNo
- else (inline processWorkQueue) prevSeqNo
+ else inline processWorkQueue prevSeqNo
processWorkQueue prevSeqNo = do
work <- dequeueAhead q
@@ -265,14 +269,11 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
Just (m, seqNo) -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
if yieldLimitOk
- then do
+ then
if seqNo == prevSeqNo + 1
then processWithToken q heap st sv winfo m seqNo
else processWithoutToken q heap st sv winfo m seqNo
- else liftIO $ do
- liftIO $ reEnqueueAhead sv q m
- incrementYieldLimit sv
- sendStop sv winfo
+ else liftIO $ abortExecution q sv winfo m
-- We do not stop the worker on buffer full here as we want to proceed to
-- nextHeap anyway so that we can clear any subsequent entries. We stop
@@ -297,7 +298,7 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
let stop = do
liftIO (incrementYieldLimit sv)
nextHeap seqNo
- unStream r st stop
+ runStreamSVar sv r st stop
(singleStreamFromHeap seqNo)
(yieldStreamFromHeap seqNo)
else liftIO $ do
@@ -336,39 +337,46 @@ processWithoutToken :: MonadIO m
-> Stream m a
-> Int
-> m ()
-processWithoutToken q heap st sv winfo m sno = do
+processWithoutToken q heap st sv winfo m seqNo = do
-- we have already decremented the yield limit for m
let stop = do
liftIO (incrementYieldLimit sv)
- workLoopAhead q heap st sv winfo
+ -- If the stream stops without yielding anything, and we do not put
+ -- anything on heap, but if heap was waiting for this seq number
+ -- then it will keep waiting forever, because we are never going to
+ -- put it on heap. So we have to put a null entry on heap even when
+ -- we stop.
+ toHeap AheadEntryNull
- unStream m st stop (singleToHeap sno) (yieldToHeap sno)
+ runStreamSVar sv m st stop
+ (toHeap . AheadEntryPure)
+ (\a r -> toHeap $ AheadEntryStream $ K.cons a r)
where
-- XXX to reduce contention each CPU can have its own heap
- toHeap seqNo ent = do
+ toHeap ent = do
-- Heap insertion is an expensive affair so we use a non CAS based
-- modification, otherwise contention and retries can make a thread
-- context switch and throw it behind other threads which come later in
-- sequence.
newHp <- liftIO $ atomicModifyIORef heap $ \(hp, snum) ->
let hp' = H.insert (Entry seqNo ent) hp
- in ((hp', snum), hp')
-
-#ifdef DIAGNOSTICS
- liftIO $ do
- maxHp <- readIORef (maxHeapSize $ svarStats sv)
- when (H.size newHp > maxHp) $
- writeIORef (maxHeapSize $ svarStats sv) (H.size newHp)
-#endif
+ in assert (heapIsSane snum seqNo) ((hp', snum), hp')
+
+ when (svarInspectMode sv) $
+ liftIO $ do
+ maxHp <- readIORef (maxHeapSize $ svarStats sv)
+ when (H.size newHp > maxHp) $
+ writeIORef (maxHeapSize $ svarStats sv) (H.size newHp)
+
heapOk <- liftIO $ underMaxHeap sv newHp
let drainAndStop = drainHeap q heap st sv winfo
mainLoop = workLoopAhead q heap st sv winfo
status <-
case yieldRateInfo sv of
Nothing -> return HContinue
- Just yinfo -> do
+ Just yinfo ->
case winfo of
Just info -> do
rateOk <- liftIO $ workerRateControl sv yinfo info
@@ -384,9 +392,6 @@ processWithoutToken q heap st sv winfo m sno = do
HStop -> drainAndStop
else drainAndStop
- singleToHeap seqNo a = toHeap seqNo (AheadEntryPure a)
- yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `K.cons` r))
-
processWithToken :: MonadIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
@@ -401,16 +406,16 @@ processWithToken q heap st sv winfo action sno = do
-- XXX deduplicate stop in all invocations
let stop = do
liftIO (incrementYieldLimit sv)
- loopWithToken sno
+ loopWithToken (sno + 1)
- unStream action st stop (singleOutput sno) (yieldOutput sno)
+ runStreamSVar sv action st stop (singleOutput sno) (yieldOutput sno)
where
singleOutput seqNo a = do
continue <- liftIO $ sendYield sv winfo (ChildYield a)
if continue
- then loopWithToken seqNo
+ then loopWithToken (seqNo + 1)
else do
liftIO $ updateHeapSeq heap (seqNo + 1)
drainHeap q heap st sv winfo
@@ -425,8 +430,8 @@ processWithToken q heap st sv winfo action sno = do
then do
let stop = do
liftIO (incrementYieldLimit sv)
- loopWithToken seqNo
- unStream r st stop
+ loopWithToken (seqNo + 1)
+ runStreamSVar sv r st stop
(singleOutput seqNo)
(yieldOutput seqNo)
else do
@@ -435,28 +440,30 @@ processWithToken q heap st sv winfo action sno = do
liftIO $ incrementYieldLimit sv
drainHeap q heap st sv winfo
- loopWithToken prevSeqNo = do
+ loopWithToken nextSeqNo = do
work <- dequeueAhead q
case work of
Nothing -> do
- liftIO $ updateHeapSeq heap (prevSeqNo + 1)
+ liftIO $ updateHeapSeq heap nextSeqNo
workLoopAhead q heap st sv winfo
Just (m, seqNo) -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
+ let undo = liftIO $ do
+ updateHeapSeq heap nextSeqNo
+ reEnqueueAhead sv q m
+ incrementYieldLimit sv
if yieldLimitOk
- then do
- if seqNo == prevSeqNo + 1
+ then
+ if seqNo == nextSeqNo
then do
let stop = do
liftIO (incrementYieldLimit sv)
- loopWithToken seqNo
- unStream m st stop
+ loopWithToken (seqNo + 1)
+ runStreamSVar sv m st stop
(singleOutput seqNo)
(yieldOutput seqNo)
- else do
- liftIO $ updateHeapSeq heap (prevSeqNo + 1)
- liftIO (incrementYieldLimit sv)
+ else
-- To avoid a race when another thread puts something
-- on the heap and goes away, the consumer will not get
-- a doorBell and we will not clear the heap before
@@ -464,13 +471,8 @@ processWithToken q heap st sv winfo action sno = do
-- on the output that is stuck in the heap then this
-- will result in a deadlock. So we always clear the
-- heap before executing the next action.
- liftIO $ reEnqueueAhead sv q m
- workLoopAhead q heap st sv winfo
- else do
- liftIO $ updateHeapSeq heap (prevSeqNo + 1)
- liftIO $ reEnqueueAhead sv q m
- liftIO $ incrementYieldLimit sv
- drainHeap q heap st sv winfo
+ undo >> workLoopAhead q heap st sv winfo
+ else undo >> drainHeap q heap st sv winfo
-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
@@ -482,6 +484,8 @@ processWithToken q heap st sv winfo action sno = do
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.
+-- XXX we can remove the sv parameter as it can be derived from st
+
workLoopAhead :: MonadIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
@@ -490,13 +494,6 @@ workLoopAhead :: MonadIO m
-> Maybe WorkerInfo
-> m ()
workLoopAhead q heap st sv winfo = do
-#ifdef DIAGNOSTICS
- liftIO $ do
- maxHp <- readIORef (maxHeapSize $ svarStats sv)
- (hp, _) <- readIORef heap
- when (H.size hp > maxHp) $ writeIORef (maxHeapSize $ svarStats sv)
- (H.size hp)
-#endif
r <- liftIO $ dequeueFromHeap heap
case r of
Ready (Entry seqNo hent) ->
@@ -525,19 +522,16 @@ workLoopAhead q heap st sv winfo = do
Just (m, seqNo) -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
if yieldLimitOk
- then do
+ then
if seqNo == 0
then processWithToken q heap st sv winfo m seqNo
else processWithoutToken q heap st sv winfo m seqNo
- else liftIO $ do
- -- If some worker decremented the yield limit but
- -- then did not yield anything and therefore
- -- incremented it later, then if we did not requeue
- -- m here we may find the work queue empty and
- -- therefore miss executing the remaining action.
- liftIO $ reEnqueueAhead sv q m
- incrementYieldLimit sv
- sendStop sv winfo
+ -- If some worker decremented the yield limit but then
+ -- did not yield anything and therefore incremented it
+ -- later, then if we did not requeue m here we may find
+ -- the work queue empty and therefore miss executing
+ -- the remaining action.
+ else liftIO $ abortExecution q sv winfo m
-------------------------------------------------------------------------------
-- WAhead
@@ -560,7 +554,7 @@ forkSVarAhead m1 m2 = Stream $ \st stp sng yld -> do
{-# INLINE aheadS #-}
aheadS :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
-aheadS m1 m2 = Stream $ \st stp sng yld -> do
+aheadS m1 m2 = Stream $ \st stp sng yld ->
case streamVar st of
Just sv | svarStyle sv == AheadVar -> do
liftIO $ enqueue sv m2
@@ -631,7 +625,7 @@ newtype AheadT m a = AheadT {getAheadT :: Stream m a}
-- 'AheadT' documentation for more details.
--
-- @since 0.3.0
-type Ahead a = AheadT IO a
+type Ahead = AheadT IO
-- | Fix the type of a polymorphic stream as 'AheadT'.
--
@@ -689,10 +683,13 @@ aheadbind m f = go m
where
go (Stream g) =
Stream $ \st stp sng yld ->
- let run x = unStream x st stp sng yld
- single a = run $ f a
- yieldk a r = run $ f a `aheadS` go r
- in g (rstState st) stp single yieldk
+ let runShared x = unstreamShared x st stp sng yld
+ runIsolated x = unStreamIsolated x st stp sng yld
+
+ single a = runIsolated $ f a
+ yieldk a r = runShared $
+ K.isolateStream (f a) `aheadS` go r
+ in g (rstState st) stp single yieldk
instance MonadAsync m => Monad (AheadT m) where
return = pure
diff --git a/src/Streamly/Streams/Async.hs b/src/Streamly/Streams/Async.hs
index 0206898..8577db9 100644
--- a/src/Streamly/Streams/Async.hs
+++ b/src/Streamly/Streams/Async.hs
@@ -6,13 +6,9 @@
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE StandaloneDeriving #-}
+{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-#ifdef DIAGNOSTICS_VERBOSE
-#define DIAGNOSTICS
-#endif
-
-- |
-- Module : Streamly.Streams.Async
-- Copyright : (c) 2017 Harendra Kumar
@@ -40,6 +36,7 @@ module Streamly.Streams.Async
)
where
+import Control.Concurrent (myThreadId)
import Control.Monad (ap)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
@@ -60,13 +57,9 @@ import qualified Data.Set as S
import Streamly.Streams.SVar (fromSVar)
import Streamly.Streams.Serial (map)
import Streamly.SVar
-import Streamly.Streams.StreamK (IsStream(..), Stream(..), adapt)
+import Streamly.Streams.StreamK (IsStream(..), Stream(..), adapt, runStreamSVar)
import qualified Streamly.Streams.StreamK as K
-#ifdef DIAGNOSTICS
-import Control.Concurrent (myThreadId)
-#endif
-
#include "Instances.hs"
-------------------------------------------------------------------------------
@@ -89,7 +82,7 @@ workLoopLIFO q st sv winfo = run
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv winfo
- Just m -> unStream m st run single yieldk
+ Just m -> runStreamSVar sv m st run single yieldk
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
@@ -98,7 +91,7 @@ workLoopLIFO q st sv winfo = run
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
- then unStream r st run single yieldk
+ then runStreamSVar sv r st run single yieldk
else liftIO $ do
enqueueLIFO sv q r
sendStop sv winfo
@@ -139,7 +132,7 @@ workLoopLIFOLimited q st sv winfo = run
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
- unStream m st stop single yieldk
+ runStreamSVar sv m st stop single yieldk
-- Avoid any side effects, undo the yield limit decrement if we
-- never yielded anything.
else liftIO $ do
@@ -158,7 +151,7 @@ workLoopLIFOLimited q st sv winfo = run
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
- then unStream r st stop single yieldk
+ then runStreamSVar sv r st stop single yieldk
else liftIO $ do
incrementYieldLimit sv
enqueueLIFO sv q r
@@ -172,6 +165,8 @@ workLoopLIFOLimited q st sv winfo = run
-- WAsync
-------------------------------------------------------------------------------
+-- XXX we can remove sv as it is derivable from st
+
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadIO m
@@ -188,7 +183,7 @@ workLoopFIFO q st sv winfo = run
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv winfo
- Just m -> unStream m st run single yieldk
+ Just m -> runStreamSVar sv m st run single yieldk
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
@@ -197,7 +192,7 @@ workLoopFIFO q st sv winfo = run
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
- then unStream r st run single yieldk
+ then runStreamSVar sv r st run single yieldk
else liftIO $ do
enqueueFIFO sv q r
sendStop sv winfo
@@ -223,7 +218,7 @@ workLoopFIFOLimited q st sv winfo = run
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
- unStream m st stop single yieldk
+ runStreamSVar sv m st stop single yieldk
else liftIO $ do
enqueueFIFO sv q m
incrementYieldLimit sv
@@ -238,7 +233,7 @@ workLoopFIFOLimited q st sv winfo = run
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
- then unStream r st stop single yieldk
+ then runStreamSVar sv r st stop single yieldk
else liftIO $ do
incrementYieldLimit sv
enqueueFIFO sv q r
@@ -253,8 +248,9 @@ workLoopFIFOLimited q st sv winfo = run
-- function argument to this function results in a perf degradation of more
-- than 10%. Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
-getLifoSVar :: MonadAsync m => State Stream m a -> IO (SVar Stream m a)
-getLifoSVar st = do
+getLifoSVar :: forall m a. MonadAsync m
+ => State Stream m a -> RunInIO m -> IO (SVar Stream m a)
+getLifoSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
@@ -266,18 +262,8 @@ getLifoSVar st = do
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
- disp <- newIORef 0
- maxWrk <- newIORef 0
- maxOq <- newIORef 0
- maxHs <- newIORef 0
- maxWq <- newIORef 0
- avgLat <- newIORef (0, NanoSecs 0)
- maxLat <- newIORef (NanoSecs 0)
- minLat <- newIORef (NanoSecs 0)
- stpTime <- newIORef Nothing
-#ifdef DIAGNOSTICS
+ stats <- newSVarStats
tid <- myThreadId
-#endif
let isWorkFinished _ = null <$> readIORef q
@@ -291,7 +277,17 @@ getLifoSVar st = do
qEmpty <- null <$> readIORef q
return $ qEmpty || yieldsDone
- let getSVar sv readOutput postProc workDone wloop = SVar
+ let getSVar :: SVar Stream m a
+ -> (SVar Stream m a -> m [ChildEvent a])
+ -> (SVar Stream m a -> m Bool)
+ -> (SVar Stream m a -> IO Bool)
+ -> (IORef [Stream m a]
+ -> State Stream m a
+ -> SVar Stream m a
+ -> Maybe WorkerInfo
+ -> m())
+ -> SVar Stream m a
+ getSVar sv readOutput postProc workDone wloop = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
@@ -307,26 +303,16 @@ getLifoSVar st = do
, isQueueDone = workDone sv
, needDoorBell = wfw
, svarStyle = AsyncVar
+ , svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = undefined
, svarRef = Nothing
-#ifdef DIAGNOSTICS
+ , svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
-#endif
- , svarStats = SVarStats
- { totalDispatches = disp
- , maxWorkers = maxWrk
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
- , avgWorkerLatency = avgLat
- , minWorkerLatency = minLat
- , maxWorkerLatency = maxLat
- , svarStopTime = stpTime
- }
+ , svarStats = stats
}
let sv =
@@ -353,8 +339,9 @@ getLifoSVar st = do
workLoopLIFOLimited
in return sv
-getFifoSVar :: MonadAsync m => State Stream m a -> IO (SVar Stream m a)
-getFifoSVar st = do
+getFifoSVar :: forall m a. MonadAsync m
+ => State Stream m a -> RunInIO m -> IO (SVar Stream m a)
+getFifoSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
@@ -366,18 +353,8 @@ getFifoSVar st = do
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
- disp <- newIORef 0
- maxWrk <- newIORef 0
- maxOq <- newIORef 0
- maxHs <- newIORef 0
- maxWq <- newIORef 0
- avgLat <- newIORef (0, NanoSecs 0)
- maxLat <- newIORef (NanoSecs 0)
- minLat <- newIORef (NanoSecs 0)
- stpTime <- newIORef Nothing
-#ifdef DIAGNOSTICS
+ stats <- newSVarStats
tid <- myThreadId
-#endif
let isWorkFinished _ = nullQ q
let isWorkFinishedLimited sv = do
@@ -390,7 +367,17 @@ getFifoSVar st = do
qEmpty <- nullQ q
return $ qEmpty || yieldsDone
- let getSVar sv readOutput postProc workDone wloop = SVar
+ let getSVar :: SVar Stream m a
+ -> (SVar Stream m a -> m [ChildEvent a])
+ -> (SVar Stream m a -> m Bool)
+ -> (SVar Stream m a -> IO Bool)
+ -> (LinkedQueue (Stream m a)
+ -> State Stream m a
+ -> SVar Stream m a
+ -> Maybe WorkerInfo
+ -> m())
+ -> SVar Stream m a
+ getSVar sv readOutput postProc workDone wloop = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
@@ -406,27 +393,17 @@ getFifoSVar st = do
, isQueueDone = workDone sv
, needDoorBell = wfw
, svarStyle = WAsyncVar
+ , svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = undefined
, svarRef = Nothing
-#ifdef DIAGNOSTICS
+ , svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
-#endif
- , svarStats = SVarStats
- { totalDispatches = disp
- , maxWorkers = maxWrk
- , maxOutQSize = maxOq
- , maxHeapSize = maxHs
- , maxWorkQSize = maxWq
- , avgWorkerLatency = avgLat
- , minWorkerLatency = minLat
- , maxWorkerLatency = maxLat
- , svarStopTime = stpTime
- }
- }
+ , svarStats = stats
+ }
let sv =
case getStreamRate st of
@@ -456,7 +433,8 @@ getFifoSVar st = do
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar st m = do
- sv <- liftIO $ getLifoSVar st
+ mrun <- captureMonadState
+ sv <- liftIO $ getLifoSVar st mrun
sendFirstWorker sv m
-- XXX Get rid of this?
@@ -469,18 +447,19 @@ newAsyncVar st m = do
-- @since 0.2.0
{-# INLINABLE mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
-mkAsync m = newAsyncVar defState (toStream m) >>= return . fromSVar
+mkAsync m = fmap fromSVar (newAsyncVar defState (toStream m))
{-# INLINABLE mkAsync' #-}
mkAsync' :: (IsStream t, MonadAsync m) => State Stream m a -> t m a -> m (t m a)
-mkAsync' st m = newAsyncVar st (toStream m) >>= return . fromSVar
+mkAsync' st m = fmap fromSVar (newAsyncVar st (toStream m))
-- | Create a new SVar and enqueue one stream computation on it.
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar st m = do
- sv <- liftIO $ getFifoSVar st
+ mrun <- captureMonadState
+ sv <- liftIO $ getFifoSVar st mrun
sendFirstWorker sv m
------------------------------------------------------------------------------
@@ -563,7 +542,7 @@ forkSVarAsync style m1 m2 = Stream $ \st stp sng yld -> do
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
-joinStreamVarAsync style m1 m2 = Stream $ \st stp sng yld -> do
+joinStreamVarAsync style m1 m2 = Stream $ \st stp sng yld ->
case streamVar st of
Just sv | svarStyle sv == style ->
liftIO (enqueue sv m2) >> unStream m1 st stp sng yld
@@ -662,7 +641,7 @@ newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a}
-- type @a@. See 'AsyncT' documentation for more details.
--
-- @since 0.2.0
-type Async a = AsyncT IO a
+type Async = AsyncT IO
-- | Fix the type of a polymorphic stream as 'AsyncT'.
--
@@ -789,7 +768,7 @@ newtype WAsyncT m a = WAsyncT {getWAsyncT :: Stream m a}
-- See 'WAsyncT' documentation for more details.
--
-- @since 0.2.0
-type WAsync a = WAsyncT IO a
+type WAsync = WAsyncT IO
-- | Fix the type of a polymorphic stream as 'WAsyncT'.
--
diff --git a/src/Streamly/Streams/Parallel.hs b/src/Streamly/Streams/Parallel.hs
index d1d8729..e99c3a4 100644
--- a/src/Streamly/Streams/Parallel.hs
+++ b/src/Streamly/Streams/Parallel.hs
@@ -5,7 +5,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-- |
@@ -68,6 +67,7 @@ runOne st m winfo = unStream m st stop single yieldk
where
sv = fromJust $ streamVar st
+ mrun = runInIO $ svarMrun sv
withLimitCheck action = do
yieldLimitOk <- liftIO $ decrementYieldLimitPost sv
@@ -83,7 +83,8 @@ runOne st m winfo = unStream m st stop single yieldk
-- queue and queue it back on that and exit the thread when the outputQueue
-- overflows. Parallel is dangerous because it can accumulate unbounded
-- output in the buffer.
- yieldk a r = void (sendit a) >> withLimitCheck (runOne st r winfo)
+ yieldk a r = void (sendit a)
+ >> withLimitCheck (void $ liftIO $ mrun $ runOne st r winfo)
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
@@ -91,7 +92,7 @@ forkSVarPar m r = Stream $ \st stp sng yld -> do
sv <- newParallelVar st
pushWorkerPar sv (runOne st{streamVar = Just sv} m)
pushWorkerPar sv (runOne st{streamVar = Just sv} r)
- (unStream (fromSVar sv)) (rstState st) stp sng yld
+ unStream (fromSVar sv) (rstState st) stp sng yld
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
@@ -119,7 +120,7 @@ consMParallel m r = K.yieldM m `parallelStream` r
-- @since 0.2.0
{-# INLINE parallel #-}
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
-parallel m1 m2 = fromStream $ Stream $ \st stp sng yld -> do
+parallel m1 m2 = fromStream $ Stream $ \st stp sng yld ->
unStream (parallelStream (toStream m1) (toStream m2))
st stp sng yld
@@ -330,7 +331,7 @@ newtype ParallelT m a = ParallelT {getParallelT :: Stream m a}
-- See 'ParallelT' documentation for more details.
--
-- @since 0.2.0
-type Parallel a = ParallelT IO a
+type Parallel = ParallelT IO
-- | Fix the type of a polymorphic stream as 'ParallelT'.
--
diff --git a/src/Streamly/Streams/Prelude.hs b/src/Streamly/Streams/Prelude.hs
index 62b5229..b6a2695 100644
--- a/src/Streamly/Streams/Prelude.hs
+++ b/src/Streamly/Streams/Prelude.hs
@@ -2,11 +2,9 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-- |
diff --git a/src/Streamly/Streams/SVar.hs b/src/Streamly/Streams/SVar.hs
index ceb9fe2..1b244c9 100644
--- a/src/Streamly/Streams/SVar.hs
+++ b/src/Streamly/Streams/SVar.hs
@@ -2,19 +2,12 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
#include "inline.h"
-#ifdef DIAGNOSTICS_VERBOSE
-#define DIAGNOSTICS
-#endif
-
-- |
-- Module : Streamly.Streams.SVar
-- Copyright : (c) 2017 Harendra Kumar
@@ -37,37 +30,38 @@ module Streamly.Streams.SVar
, minRate
, maxRate
, constRate
+ , inspectMode
+ , printState
)
where
import Control.Exception (fromException)
+import Control.Monad (when)
import Control.Monad.Catch (throwM)
+import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)
-import Control.Monad.IO.Class (liftIO)
-import Data.IORef (newIORef, mkWeakIORef)
-#ifdef DIAGNOSTICS
-import Data.IORef (writeIORef)
+import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
+import Data.Maybe (isNothing)
+import Data.Semigroup ((<>))
import System.IO (hPutStrLn, stderr)
import System.Clock (Clock(Monotonic), getTime)
-#endif
+import System.Mem (performMajorGC)
import Streamly.SVar
import Streamly.Streams.StreamK
import Streamly.Streams.Serial (SerialT)
--- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we
--- can keep it on in production to debug problems quickly if and when they
--- happen, but it may result in unexpected output when threads are left hanging
--- until they are GCed because the consumer went away.
-
-#ifdef DIAGNOSTICS
-#ifdef DIAGNOSTICS_VERBOSE
printSVar :: SVar t m a -> String -> IO ()
printSVar sv how = do
svInfo <- dumpSVar sv
- hPutStrLn stderr $ "\n" ++ how ++ "\n" ++ svInfo
-#endif
-#endif
+ hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo
+
+printState :: MonadIO m => State Stream m a -> m ()
+printState st = liftIO $ do
+ let msv = streamVar st
+ case msv of
+ Just sv -> dumpSVar sv >>= putStrLn
+ Nothing -> putStrLn "No SVar"
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
@@ -82,14 +76,11 @@ fromStreamVar sv = Stream $ \st stp sng yld -> do
where
allDone stp = do
-#ifdef DIAGNOSTICS
+ when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
-#ifdef DIAGNOSTICS_VERBOSE
liftIO $ printSVar sv "SVar Done"
-#endif
-#endif
- stp
+ stp
{-# INLINE processEvents #-}
processEvents [] = Stream $ \st stp sng yld -> do
@@ -110,11 +101,11 @@ fromStreamVar sv = Stream $ \st stp sng yld -> do
case fromException ex of
Just ThreadAbort ->
unStream rest (rstState st) stp sng yld
- Nothing -> throwM ex
+ Nothing -> liftIO (cleanupSVar sv) >> throwM ex
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
-fromSVar sv = do
+fromSVar sv =
fromStream $ Stream $ \st stp sng yld -> do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref hook
@@ -125,10 +116,14 @@ fromSVar sv = do
where
hook = do
-#ifdef DIAGNOSTICS_VERBOSE
- printSVar sv "SVar Garbage Collected"
-#endif
+ when (svarInspectMode sv) $ do
+ r <- liftIO $ readIORef (svarStopTime (svarStats sv))
+ when (isNothing r) $
+ printSVar sv "SVar Garbage Collected"
cleanupSVar sv
+ -- If there are any SVars referenced by this SVar a GC will prompt
+ -- them to be cleaned up quickly.
+ when (svarInspectMode sv) performMajorGC
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
@@ -154,7 +149,7 @@ toSVar sv m = toStreamVar sv (toStream m)
-- @since 0.4.0
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
-maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do
+maxThreads n m = fromStream $ Stream $ \st stp sng yld ->
unStream (toStream m) (setMaxThreads n st) stp sng yld
{-
@@ -179,7 +174,7 @@ maxThreadsSerial _ = id
-- @since 0.4.0
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
-maxBuffer n m = fromStream $ Stream $ \st stp sng yld -> do
+maxBuffer n m = fromStream $ Stream $ \st stp sng yld ->
unStream (toStream m) (setMaxBuffer n st) stp sng yld
{-
@@ -203,7 +198,7 @@ maxBufferSerial _ = id
-- @since 0.5.0
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
-rate r m = fromStream $ Stream $ \st stp sng yld -> do
+rate r m = fromStream $ Stream $ \st stp sng yld ->
case r of
Just (Rate low goal _ _) | goal < low ->
error "rate: Target rate cannot be lower than minimum rate."
@@ -213,6 +208,8 @@ rate r m = fromStream $ Stream $ \st stp sng yld -> do
error "rate: Minimum rate cannot be greater than maximum rate."
_ -> unStream (toStream m) (setStreamRate r st) stp sng yld
+-- XXX implement for serial streams as well, as a simple delay
+
{-
{-# RULES "rate serial" rate = yieldRateSerial #-}
yieldRateSerial :: Double -> SerialT m a -> SerialT m a
@@ -281,7 +278,7 @@ constRate r = rate (Just $ Rate r r r 0)
--
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
-_serialLatency n m = fromStream $ Stream $ \st stp sng yld -> do
+_serialLatency n m = fromStream $ Stream $ \st stp sng yld ->
unStream (toStream m) (setStreamLatency n st) stp sng yld
{-
@@ -296,9 +293,14 @@ serialLatencySerial _ = id
-- inherited by everything in enclosed scope.
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
-maxYields n m = fromStream $ Stream $ \st stp sng yld -> do
+maxYields n m = fromStream $ Stream $ \st stp sng yld ->
unStream (toStream m) (setYieldLimit n st) stp sng yld
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial _ = id
+
+-- | Print debug information about an SVar when the stream ends
+inspectMode :: IsStream t => t m a -> t m a
+inspectMode m = fromStream $ Stream $ \st stp sng yld ->
+ unStream (toStream m) (setInspectMode st) stp sng yld
diff --git a/src/Streamly/Streams/Serial.hs b/src/Streamly/Streams/Serial.hs
index a626ecf..ec7a5ba 100644
--- a/src/Streamly/Streams/Serial.hs
+++ b/src/Streamly/Streams/Serial.hs
@@ -5,7 +5,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-- |
@@ -128,7 +127,7 @@ newtype SerialT m a = SerialT {getSerialT :: Stream m a}
-- for more details.
--
-- @since 0.2.0
-type Serial a = SerialT IO a
+type Serial = SerialT IO
-- |
-- @since 0.1.0
@@ -177,7 +176,7 @@ serial m1 m2 = fromStream $ Stream $ \st stp sng yld ->
instance Monad m => Monad (SerialT m) where
return = pure
(SerialT (Stream m)) >>= f = SerialT $ Stream $ \st stp sng yld ->
- let run x = (unStream x) (rstState st) stp sng yld
+ let run x = unStream x (rstState st) stp sng yld
single a = run $ toStream (f a)
yieldk a r = run $ toStream $ f a <> (fromStream r >>= f)
in m (rstState st) stp single yieldk
@@ -248,7 +247,7 @@ newtype WSerialT m a = WSerialT {getWSerialT :: Stream m a}
-- documentation for more details.
--
-- @since 0.2.0
-type WSerial a = WSerialT IO a
+type WSerial = WSerialT IO
-- |
-- @since 0.1.0
@@ -289,10 +288,10 @@ instance IsStream WSerialT where
{-# INLINE interleave #-}
interleave :: Stream m a -> Stream m a -> Stream m a
interleave m1 m2 = Stream $ \st stp sng yld -> do
- let stop = (unStream m2) (rstState st) stp sng yld
+ let stop = unStream m2 (rstState st) stp sng yld
single a = yld a m2
yieldk a r = yld a (interleave m2 r)
- (unStream m1) (rstState st) stop single yieldk
+ unStream m1 (rstState st) stop single yieldk
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WSerialT'.
-- Interleaves two streams, yielding one element from each stream alternately.
@@ -332,7 +331,7 @@ instance Monoid (WSerialT m a) where
instance Monad m => Monad (WSerialT m) where
return = pure
(WSerialT (Stream m)) >>= f = WSerialT $ Stream $ \st stp sng yld ->
- let run x = (unStream x) (rstState st) stp sng yld
+ let run x = unStream x (rstState st) stp sng yld
single a = run $ toStream (f a)
yieldk a r = run $ toStream $ f a <> (fromStream r >>= f)
in m (rstState st) stp single yieldk
diff --git a/src/Streamly/Streams/StreamD.hs b/src/Streamly/Streams/StreamD.hs
index 80ab83b..ee94ab4 100644
--- a/src/Streamly/Streams/StreamD.hs
+++ b/src/Streamly/Streams/StreamD.hs
@@ -4,11 +4,8 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE UnboxedTuples #-}
#include "inline.h"
@@ -182,7 +179,7 @@ uncons (Stream step state) = go state
go st = do
r <- step defState st
return $ case r of
- Yield x s -> Just (x, (Stream step s))
+ Yield x s -> Just (x, Stream step s)
Stop -> Nothing
------------------------------------------------------------------------------
@@ -217,8 +214,8 @@ enumFromStepN from stride n =
from `seq` stride `seq` n `seq` Stream step (from, n)
where
{-# INLINE_LATE step #-}
- step _ (x, i) | i > 0 = return $ Yield x (x + stride, i - 1)
- | otherwise = return $ Stop
+ step _ (x, i) | i > 0 = return $ Yield x (x + stride, i - 1)
+ | otherwise = return Stop
-------------------------------------------------------------------------------
-- Generation by Conversion
@@ -246,7 +243,7 @@ yieldM m = Stream step True
-- | Convert a list of monadic actions to a 'Stream'
{-# INLINE_LATE fromListM #-}
fromListM :: MonadAsync m => [m a] -> Stream m a
-fromListM zs = Stream step zs
+fromListM = Stream step
where
{-# INLINE_LATE step #-}
step _ (m:ms) = m >>= \x -> return $ Yield x ms
@@ -255,7 +252,7 @@ fromListM zs = Stream step zs
-- | Convert a list of pure values to a 'Stream'
{-# INLINE_LATE fromList #-}
fromList :: Monad m => [a] -> Stream m a
-fromList zs = Stream step zs
+fromList = Stream step
where
{-# INLINE_LATE step #-}
step _ (x:xs) = return $ Yield x xs
@@ -264,7 +261,7 @@ fromList zs = Stream step zs
-- XXX pass the state to streamD
{-# INLINE_LATE fromStreamK #-}
fromStreamK :: Monad m => K.Stream m a -> Stream m a
-fromStreamK m = Stream step m
+fromStreamK = Stream step
where
step gst m1 =
let stop = return Stop
@@ -530,7 +527,7 @@ takeWhileM f (Stream step state) = Stream step' state
Yield x s -> do
b <- f x
return $ if b then Yield x s else Stop
- Stop -> return $ Stop
+ Stop -> return Stop
{-# INLINE takeWhile #-}
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
@@ -593,7 +590,7 @@ filterM f (Stream step state) = Stream step' state
if b
then return $ Yield x s
else step' gst s
- Stop -> return $ Stop
+ Stop -> return Stop
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
diff --git a/src/Streamly/Streams/StreamK.hs b/src/Streamly/Streams/StreamK.hs
index af07cc7..9c6a4d3 100644
--- a/src/Streamly/Streams/StreamK.hs
+++ b/src/Streamly/Streams/StreamK.hs
@@ -4,11 +4,8 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-- |
@@ -36,6 +33,10 @@ module Streamly.Streams.StreamK
-- * The stream type
, Stream (..)
+ , unStreamIsolated
+ , isolateStream
+ , unstreamShared
+ , runStreamSVar
-- * Construction
, mkStream
@@ -143,6 +144,7 @@ module Streamly.Streams.StreamK
where
import Control.Monad (void)
+import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Semigroup (Semigroup(..))
@@ -186,6 +188,51 @@ newtype Stream m a =
-> m r
}
+-- XXX make this the default "unStream"
+-- | unwraps the Stream type producing the stream function that can be run with
+-- continuations.
+{-# INLINE unStreamIsolated #-}
+unStreamIsolated ::
+ Stream m a
+ -> State Stream m a -- state
+ -> m r -- stop
+ -> (a -> m r) -- singleton
+ -> (a -> Stream m a -> m r) -- yield
+ -> m r
+unStreamIsolated x st = unStream x (rstState st)
+
+{-# INLINE isolateStream #-}
+isolateStream :: Stream m a -> Stream m a
+isolateStream x = Stream $ \st stp sng yld ->
+ unStreamIsolated x st stp sng yld
+
+-- | Like unstream, but passes a shared SVar across continuations.
+{-# INLINE unstreamShared #-}
+unstreamShared ::
+ Stream m a
+ -> State Stream m a -- state
+ -> m r -- stop
+ -> (a -> m r) -- singleton
+ -> (a -> Stream m a -> m r) -- yield
+ -> m r
+unstreamShared = unStream
+
+-- Run the stream using a run function associated with the SVar that runs the
+-- streams with a captured snapshot of the monadic state.
+{-# INLINE runStreamSVar #-}
+runStreamSVar
+ :: MonadIO m
+ => SVar Stream m a
+ -> Stream m a
+ -> State Stream m a -- state
+ -> m r -- stop
+ -> (a -> m r) -- singleton
+ -> (a -> Stream m a -> m r) -- yield
+ -> m ()
+runStreamSVar sv m st stp sng yld =
+ let mrun = runInIO $ svarMrun sv
+ in void $ liftIO $ mrun $ unStream m st stp sng yld
+
------------------------------------------------------------------------------
-- Types that can behave as a Stream
------------------------------------------------------------------------------
@@ -366,7 +413,7 @@ uncons m =
let stop = return Nothing
single a = return (Just (a, nil))
yieldk a r = return (Just (a, fromStream r))
- in (unStream (toStream m)) defState stop single yieldk
+ in unStream (toStream m) defState stop single yieldk
-------------------------------------------------------------------------------
-- Generation
@@ -412,6 +459,7 @@ once :: (Monad m, IsStream t) => m a -> t m a
once = yieldM
-- | Generate an infinite stream by repeating a pure value.
+-- Can be expressed as @cycle1 . yield@.
--
-- @since 0.4.0
repeat :: IsStream t => a -> t m a
@@ -421,7 +469,8 @@ repeat a = let x = cons a x in x
-- Conversions
-------------------------------------------------------------------------------
--- | Construct a stream from a 'Foldable' containing pure values.
+-- | Construct a stream from a 'Foldable' containing pure values. Same as
+-- @'Prelude.foldr' 'cons' 'nil'@.
--
-- @since 0.2.0
{-# INLINE fromFoldable #-}
@@ -452,7 +501,7 @@ foldStream
-> m r
foldStream st blank single step m =
let yieldk a x = step a (fromStream x)
- in (unStream (toStream m)) st blank single yieldk
+ in unStream (toStream m) st blank single yieldk
-- | Lazy right associative fold.
foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
@@ -462,7 +511,7 @@ foldr step acc m = go (toStream m)
let stop = return acc
single a = return (step a acc)
yieldk a r = go r >>= \b -> return (step a b)
- in (unStream m1) defState stop single yieldk
+ in unStream m1 defState stop single yieldk
-- | Lazy right fold with a monadic step function.
{-# INLINE foldrM #-}
@@ -473,7 +522,7 @@ foldrM step acc m = go (toStream m)
let stop = return acc
single a = step a acc
yieldk a r = go r >>= step a
- in (unStream m1) defState stop single yieldk
+ in unStream m1 defState stop single yieldk
{-# INLINE foldr1 #-}
foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a)
@@ -481,12 +530,12 @@ foldr1 step m = do
r <- uncons m
case r of
Nothing -> return Nothing
- Just (h, t) -> go h (toStream t) >>= return . Just
+ Just (h, t) -> fmap Just (go h (toStream t))
where
go p m1 =
let stp = return p
single a = return $ step a p
- yieldk a r = go a r >>= return . (step p)
+ yieldk a r = fmap (step p) (go a r)
in unStream m1 defState stp single yieldk
-- | Strict left fold with an extraction function. Like the standard strict
@@ -501,7 +550,7 @@ foldx step begin done m = get $ go (toStream m) begin
{-# NOINLINE get #-}
get m1 =
let single = return . done
- in (unStream m1) undefined undefined single undefined
+ in unStream m1 undefined undefined single undefined
-- Note, this can be implemented by making a recursive call to "go",
-- however that is more expensive because of unnecessary recursion
@@ -512,13 +561,13 @@ foldx step begin done m = get $ go (toStream m) begin
single a = sng $ step acc a
yieldk a r =
let stream = go r (step acc a)
- in (unStream stream) defState undefined sng yld
- in (unStream m1) defState stop single yieldk
+ in unStream stream defState undefined sng yld
+ in unStream m1 defState stop single yieldk
-- | Strict left associative fold.
{-# INLINE foldl' #-}
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
-foldl' step begin m = foldx step begin id m
+foldl' step begin = foldx step begin id
-- XXX replace the recursive "go" with explicit continuations.
-- | Like 'foldx', but with a monadic step function.
@@ -530,11 +579,11 @@ foldxM step begin done m = go begin (toStream m)
let stop = acc >>= done
single a = acc >>= \b -> step b a >>= done
yieldk a r = acc >>= \b -> step b a >>= \x -> go (return x) r
- in (unStream m1) defState stop single yieldk
+ in unStream m1 defState stop single yieldk
-- | Like 'foldl'' but with a monadic step function.
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b
-foldlM' step begin m = foldxM step (return begin) return m
+foldlM' step begin = foldxM step (return begin) return
------------------------------------------------------------------------------
-- Specialized folds
@@ -596,7 +645,7 @@ elem e m = go (toStream m)
let stop = return False
single a = return (a == e)
yieldk a r = if a == e then return True else go r
- in (unStream m1) defState stop single yieldk
+ in unStream m1 defState stop single yieldk
{-# INLINE notElem #-}
notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@@ -606,7 +655,7 @@ notElem e m = go (toStream m)
let stop = return True
single a = return (a /= e)
yieldk a r = if a == e then return False else go r
- in (unStream m1) defState stop single yieldk
+ in unStream m1 defState stop single yieldk
all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
all p m = go (toStream m)
@@ -723,7 +772,7 @@ mapM_ f m = go (toStream m)
let stop = return ()
single a = void (f a)
yieldk a r = f a >> go r
- in (unStream m1) defState stop single yieldk
+ in unStream m1 defState stop single yieldk
------------------------------------------------------------------------------
-- Converting folds
@@ -755,7 +804,7 @@ scanx step begin done m =
{-# INLINE scanl' #-}
scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b
-scanl' step begin m = scanx step begin id m
+scanl' step begin = scanx step begin id
-------------------------------------------------------------------------------
-- Filtering
@@ -769,7 +818,7 @@ filter p m = fromStream $ go (toStream m)
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
- | otherwise = (unStream r) (rstState st) stp single yieldk
+ | otherwise = unStream r (rstState st) stp single yieldk
in unStream m1 (rstState st) stp single yieldk
{-# INLINE take #-}
@@ -812,7 +861,7 @@ dropWhile p m = fromStream $ go (toStream m)
go m1 = Stream $ \st stp sng yld ->
let single a | p a = stp
| otherwise = sng a
- yieldk a r | p a = (unStream r) (rstState st) stp single yieldk
+ yieldk a r | p a = unStream r (rstState st) stp single yieldk
| otherwise = yld a r
in unStream m1 (rstState st) stp single yieldk
@@ -835,8 +884,8 @@ mapM f m = go (toStream m)
where
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single a = f a >>= sng
- yieldk a r = unStream (toStream (f a |: (go r))) st stp sng yld
- in (unStream m1) (rstState st) stp single yieldk
+ yieldk a r = unStream (toStream (f a |: go r)) st stp sng yld
+ in unStream m1 (rstState st) stp single yieldk
-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
@@ -847,7 +896,7 @@ sequence m = go (toStream m)
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single ma = ma >>= sng
yieldk ma r = unStream (toStream $ ma |: go r) st stp sng yld
- in (unStream m1) (rstState st) stp single yieldk
+ in unStream m1 (rstState st) stp single yieldk
-------------------------------------------------------------------------------
-- Inserting
@@ -879,7 +928,7 @@ mapMaybe f m = go (toStream m)
Nothing -> stp
yieldk a r = case f a of
Just b -> yld b (toStream $ go r)
- Nothing -> (unStream r) (rstState st) stp single yieldk
+ Nothing -> unStream r (rstState st) stp single yieldk
in unStream m1 (rstState st) stp single yieldk
------------------------------------------------------------------------------
@@ -888,15 +937,15 @@ mapMaybe f m = go (toStream m)
{-# INLINE zipWithS #-}
zipWithS :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
-zipWithS f m1 m2 = go m1 m2
+zipWithS f = go
where
go mx my = Stream $ \st stp sng yld -> do
let merge a ra =
let single2 b = sng (f a b)
yield2 b rb = yld (f a b) (go ra rb)
in unStream my (rstState st) stp single2 yield2
- let single1 a = merge a nil
- yield1 a ra = merge a ra
+ let single1 a = merge a nil
+ yield1 = merge
unStream mx (rstState st) stp single1 yield1
-- | Zip two streams serially using a pure zipping function.
@@ -918,8 +967,8 @@ zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
single2 b = f a b >>= sng
yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb)
in unStream my (rstState st) stp single2 yield2
- let single1 a = merge a nil
- yield1 a ra = merge a ra
+ let single1 a = merge a nil
+ yield1 = merge
unStream mx (rstState st) stp single1 yield1
------------------------------------------------------------------------------
@@ -933,7 +982,7 @@ serial :: Stream m a -> Stream m a -> Stream m a
serial m1 m2 = go m1
where
go (Stream m) = Stream $ \st stp sng yld ->
- let stop = (unStream m2) (rstState st) stp sng yld
+ let stop = unStream m2 (rstState st) stp sng yld
single a = yld a m2
yieldk a r = yld a (go r)
in m (rstState st) stop single yieldk
@@ -970,10 +1019,12 @@ bindWith par m f = go m
where
go (Stream g) =
Stream $ \st stp sng yld ->
- let run x = (unStream x) st stp sng yld
- single a = run $ f a
- yieldk a r = run $ f a `par` go r
- in g (rstState st) stp single yieldk
+ let runShared x = unstreamShared x st stp sng yld
+ runIsolated x = unStreamIsolated x st stp sng yld
+
+ single a = runIsolated $ f a
+ yieldk a r = runShared $ isolateStream (f a) `par` go r
+ in g (rstState st) stp single yieldk
------------------------------------------------------------------------------
-- Alternative & MonadPlus
@@ -993,7 +1044,7 @@ withLocal f m =
Stream $ \st stp sng yld ->
let single = local f . sng
yieldk a r = local f $ yld a (withLocal f r)
- in (unStream m) (rstState st) (local f stp) single yieldk
+ in unStream m (rstState st) (local f stp) single yieldk
------------------------------------------------------------------------------
-- MonadError
diff --git a/src/Streamly/Streams/Zip.hs b/src/Streamly/Streams/Zip.hs
index 387907a..1e5ad7c 100644
--- a/src/Streamly/Streams/Zip.hs
+++ b/src/Streamly/Streams/Zip.hs
@@ -5,7 +5,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-- |
@@ -83,7 +82,7 @@ type ZipStream = ZipSerialM
-- | An IO stream whose applicative instance zips streams serially.
--
-- @since 0.2.0
-type ZipSerial a = ZipSerialM IO a
+type ZipSerial = ZipSerialM IO
-- | Fix the type of a polymorphic stream as 'ZipSerialM'.
--
@@ -172,7 +171,7 @@ newtype ZipAsyncM m a = ZipAsyncM {getZipAsyncM :: Stream m a}
-- | An IO stream whose applicative instance zips streams wAsyncly.
--
-- @since 0.2.0
-type ZipAsync a = ZipAsyncM IO a
+type ZipAsync = ZipAsyncM IO
-- | Fix the type of a polymorphic stream as 'ZipAsyncM'.
--
diff --git a/stack-7.10.yaml b/stack-7.10.yaml
index f4c2a56..64796c4 100644
--- a/stack-7.10.yaml
+++ b/stack-7.10.yaml
@@ -10,7 +10,7 @@ extra-deps:
- http-client-0.5.0
- http-client-tls-0.3.0
- SDL-0.6.5.1
- - gauge-0.2.3
+ - gauge-0.2.4
- basement-0.0.7
flags: {}
extra-package-dbs: []
diff --git a/stack-8.0.yaml b/stack-8.0.yaml
index d42112b..3517879 100644
--- a/stack-8.0.yaml
+++ b/stack-8.0.yaml
@@ -6,7 +6,7 @@ extra-deps:
- lockfree-queue-0.2.3.1
- simple-conduit-0.6.0
- SDL-0.6.5.1
- - gauge-0.2.3
+ - gauge-0.2.4
- basement-0.0.4
flags: {}
extra-package-dbs: []
diff --git a/stack.yaml b/stack.yaml
index 5af0dc3..22b9601 100644
--- a/stack.yaml
+++ b/stack.yaml
@@ -1,14 +1,14 @@
-resolver: lts-12.0
+resolver: lts-12.11
packages:
- '.'
allow-newer: true
extra-deps:
- SDL-0.6.6.0
- - gauge-0.2.3
- - bench-graph-0.1.3
+ - gauge-0.2.4
- Chart-1.9
- Chart-diagrams-1.9
- SVGFonts-1.6.0.3
+ - bench-show-0.2.1
flags: {}
extra-package-dbs: []
diff --git a/streamly.cabal b/streamly.cabal
index b50eaed..9dbabdf 100644
--- a/streamly.cabal
+++ b/streamly.cabal
@@ -1,5 +1,5 @@
name: streamly
-version: 0.5.1
+version: 0.5.2
synopsis: Beautiful Streaming, Concurrent and Reactive Composition
description:
Streamly, short for streaming concurrently, provides monadic streams, with a
@@ -55,9 +55,15 @@ description:
.
Where to find more information:
.
- * @README@ shipped with the package for a quick overview
- * "Streamly.Tutorial" module in the haddock documentation for a detailed introduction
- * @examples@ directory in the package for some simple practical examples
+ * /Quick Overview/: <src/README.md README file> in the package
+ * /Detailed Tutorial/: "Streamly.Tutorial" module in the haddock documentation
+ * /Reference Documentation/: Haddock documentation for the respective modules
+ * /Examples/: <src/examples examples directory> in the package
+ * /Guides/: <src/docs docs directory> in the package, for documentation on
+ advanced topics, limitations, semantics of the library or on specific use
+ cases.
+ * <https://github.com/composewell/streaming-benchmarks Streaming Benchmarks>
+ * <https://github.com/composewell/concurrency-benchmarks Concurrency Benchmarks>
homepage: https://github.com/composewell/streamly
bug-reports: https://github.com/composewell/streamly/issues
@@ -75,6 +81,8 @@ cabal-version: >= 1.10
extra-source-files:
Changelog.md
README.md
+ docs/streamly-vs-async.md
+ docs/transformers.md
bench.sh
stack-7.10.yaml
stack-8.0.yaml
@@ -86,11 +94,6 @@ source-repository head
type: git
location: https://github.com/composewell/streamly
-flag diag
- description: Diagnostics build
- manual: True
- default: False
-
flag dev
description: Development build
manual: True
@@ -137,6 +140,7 @@ library
, Streamly.Time
, Streamly
, Streamly.Tutorial
+ , Streamly.Internal
default-language: Haskell2010
ghc-options: -Wall
@@ -147,11 +151,7 @@ library
if flag(no-fusion)
cpp-options: -DDISABLE_FUSION
- if flag(diag)
- cpp-options: -DDIAGNOSTICS
-
if flag(dev)
- cpp-options: -DDIAGNOSTICS
ghc-options: -Wmissed-specialisations
-Wall-missed-specialisations
-fno-ignore-asserts
@@ -196,7 +196,7 @@ test-suite test
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
- ghc-options: -O0 -Wall -threaded -with-rtsopts=-N
+ ghc-options: -O0 -Wall -threaded -with-rtsopts=-N -fno-ignore-asserts
if flag(dev)
cpp-options: -DDEVBUILD
ghc-options: -Wmissed-specialisations
@@ -224,7 +224,7 @@ test-suite properties
type: exitcode-stdio-1.0
main-is: Prop.hs
hs-source-dirs: test
- ghc-options: -Wall -O0 -threaded -with-rtsopts=-N
+ ghc-options: -fno-ignore-asserts -Wall -O0 -threaded -with-rtsopts=-N
if flag(dev)
cpp-options: -DDEVBUILD
ghc-options: -Wmissed-specialisations
@@ -243,6 +243,9 @@ test-suite properties
, base >= 4.8 && < 5
, QuickCheck >= 2.10 && < 2.13
, hspec >= 2.0 && < 3
+ if impl(ghc < 8.0)
+ build-depends:
+ transformers >= 0.4 && < 0.6
default-language: Haskell2010
test-suite maxrate
@@ -250,7 +253,7 @@ test-suite maxrate
default-language: Haskell2010
main-is: MaxRate.hs
hs-source-dirs: test
- ghc-options: -O2 -Wall -threaded -with-rtsopts=-N
+ ghc-options: -fno-ignore-asserts -O2 -Wall -threaded -with-rtsopts=-N
if flag(dev)
buildable: True
build-Depends:
@@ -267,6 +270,7 @@ test-suite loops
default-language: Haskell2010
main-is: loops.hs
hs-source-dirs: test
+ ghc-options: -fno-ignore-asserts -O2 -Wall -threaded -with-rtsopts=-N
build-Depends:
streamly
, base >= 4.8 && < 5
@@ -276,6 +280,7 @@ test-suite nested-loops
default-language: Haskell2010
main-is: nested-loops.hs
hs-source-dirs: test
+ ghc-options: -fno-ignore-asserts -O2 -Wall -threaded -with-rtsopts=-N
build-Depends:
streamly
, base >= 4.8 && < 5
@@ -286,6 +291,7 @@ test-suite parallel-loops
default-language: Haskell2010
main-is: parallel-loops.hs
hs-source-dirs: test
+ ghc-options: -fno-ignore-asserts -O2 -Wall -threaded -with-rtsopts=-N
build-Depends:
streamly
, base >= 4.8 && < 5
@@ -320,7 +326,62 @@ benchmark linear
, base >= 4.8 && < 5
, deepseq >= 1.4.0 && < 1.5
, random >= 1.0 && < 2.0
- , gauge >= 0.2.3 && < 0.3
+ , gauge >= 0.2.4 && < 0.3
+
+benchmark linear-async
+ type: exitcode-stdio-1.0
+ hs-source-dirs: benchmark
+ main-is: LinearAsync.hs
+ other-modules: LinearOps
+ default-language: Haskell2010
+ ghc-options: -O2 -Wall
+ cpp-options: -DLINEAR_ASYNC
+ if flag(dev)
+ ghc-options: -Wmissed-specialisations
+ -Wall-missed-specialisations
+ -fno-ignore-asserts
+ if impl(ghc >= 8.0)
+ ghc-options: -Wcompat
+ -Wunrecognised-warning-flags
+ -Widentities
+ -Wincomplete-record-updates
+ -Wincomplete-uni-patterns
+ -Wredundant-constraints
+ -Wnoncanonical-monad-instances
+ -Wnoncanonical-monadfail-instances
+ build-depends:
+ streamly
+ , base >= 4.8 && < 5
+ , deepseq >= 1.4.0 && < 1.5
+ , random >= 1.0 && < 2.0
+ , gauge >= 0.2.4 && < 0.3
+
+benchmark linear-rate
+ type: exitcode-stdio-1.0
+ hs-source-dirs: benchmark
+ main-is: LinearRate.hs
+ other-modules: LinearOps
+ default-language: Haskell2010
+ ghc-options: -O2 -Wall
+ if flag(dev)
+ ghc-options: -Wmissed-specialisations
+ -Wall-missed-specialisations
+ -fno-ignore-asserts
+ if impl(ghc >= 8.0)
+ ghc-options: -Wcompat
+ -Wunrecognised-warning-flags
+ -Widentities
+ -Wincomplete-record-updates
+ -Wincomplete-uni-patterns
+ -Wredundant-constraints
+ -Wnoncanonical-monad-instances
+ -Wnoncanonical-monadfail-instances
+ build-depends:
+ streamly
+ , base >= 4.8 && < 5
+ , deepseq >= 1.4.0 && < 1.5
+ , random >= 1.0 && < 2.0
+ , gauge >= 0.2.4 && < 0.3
benchmark nested
type: exitcode-stdio-1.0
@@ -347,7 +408,7 @@ benchmark nested
, base >= 4.8 && < 5
, deepseq >= 1.4.0 && < 1.5
, random >= 1.0 && < 2.0
- , gauge >= 0.2.3 && < 0.3
+ , gauge >= 0.2.4 && < 0.3
-------------------------------------------------------------------------------
-- Internal benchmarks for unexposed modules
@@ -388,7 +449,7 @@ benchmark base
base >= 4.8 && < 5
, deepseq >= 1.4.0 && < 1.5
, random >= 1.0 && < 2.0
- , gauge >= 0.2.3 && < 0.3
+ , gauge >= 0.2.4 && < 0.3
, ghc-prim >= 0.2 && < 0.6
, containers >= 0.5 && < 0.7
@@ -409,29 +470,17 @@ benchmark base
build-depends:
semigroups >= 0.18 && < 0.19
-executable chart-linear
- default-language: Haskell2010
- hs-source-dirs: benchmark
- main-is: ChartLinear.hs
- if flag(dev)
- buildable: True
- build-Depends:
- base >= 4.8 && < 5
- , bench-graph >= 0.1 && < 0.2
- , split
- else
- buildable: False
-
-executable chart-nested
+executable chart
default-language: Haskell2010
hs-source-dirs: benchmark
- main-is: ChartNested.hs
+ main-is: Chart.hs
if flag(dev)
buildable: True
build-Depends:
base >= 4.8 && < 5
- , bench-graph >= 0.1 && < 0.2
+ , bench-show >= 0.2 && < 0.3
, split
+ , transformers >= 0.4 && < 0.6
else
buildable: False
@@ -510,3 +559,21 @@ executable CirclingSquare
, SDL >= 0.6.5 && < 0.7
else
buildable: False
+
+executable ControlFlow
+ default-language: Haskell2010
+ main-is: ControlFlow.hs
+ hs-source-dirs: examples
+ if flag(examples) || flag(examples-sdl)
+ buildable: True
+ build-Depends:
+ streamly
+ , base >= 4.8 && < 5
+ , exceptions >= 0.8 && < 0.11
+ , transformers >= 0.4 && < 0.6
+ , transformers-base >= 0.4 && < 0.5
+ if impl(ghc < 8.0)
+ build-depends:
+ semigroups >= 0.18 && < 0.19
+ else
+ buildable: False
diff --git a/test/Main.hs b/test/Main.hs
index e36acf9..5a640af 100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -7,8 +7,11 @@ module Main (main) where
import Control.Concurrent (threadDelay)
import Control.Exception (Exception, try, ErrorCall(..), catch, throw)
+import Control.Monad (void)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.Error.Class (throwError, MonadError)
+import Control.Monad.IO.Class (MonadIO(liftIO))
+import Control.Monad.State (MonadState, get, modify, runStateT, StateT)
import Control.Monad.Trans.Except (runExceptT, ExceptT)
import Data.Foldable (forM_, fold)
import Data.List (sort)
@@ -178,10 +181,10 @@ checkCleanupFold t op = do
testFoldOpsCleanup :: String -> (SerialT IO Int -> IO a) -> Spec
testFoldOpsCleanup name f = do
let testOp op x = op x >> return Nothing
- it (name ++ " asyncly") $ checkCleanupFold asyncly (testOp f)
- it (name ++ " wAsyncly") $ checkCleanupFold wAsyncly (testOp f)
- it (name ++ " aheadly") $ checkCleanupFold aheadly (testOp f)
- it (name ++ " parallely") $ checkCleanupFold parallely (testOp f)
+ it (name <> " asyncly") $ checkCleanupFold asyncly (testOp f)
+ it (name <> " wAsyncly") $ checkCleanupFold wAsyncly (testOp f)
+ it (name <> " aheadly") $ checkCleanupFold aheadly (testOp f)
+ it (name <> " parallely") $ checkCleanupFold parallely (testOp f)
#endif
parallelTests :: SpecWith ()
@@ -194,9 +197,9 @@ parallelTests = H.parallel $ do
it "simple serially with IO" $
(runStream . serially) (S.yieldM $ putStrLn "hello") `shouldReturn` ()
- describe "Empty" $ do
+ describe "Empty" $ -- do
it "Monoid - mempty" $
- (toListSerial mempty) `shouldReturn` ([] :: [Int])
+ toListSerial mempty `shouldReturn` ([] :: [Int])
-- it "Alternative - empty" $
-- (toListSerial empty) `shouldReturn` ([] :: [Int])
-- it "MonadPlus - mzero" $
@@ -210,11 +213,11 @@ parallelTests = H.parallel $ do
-- XXX we should do these through property tests by using a
-- construction via list fold construction method.
it "fmap on composed (<>)" $
- (toListSerial $ fmap (+1) (return 1 <> return 2))
+ toListSerial (fmap (+1) (return 1 <> return 2))
`shouldReturn` ([2,3] :: [Int])
it "fmap on composed (<>)" $
- ((toListParallel $ fmap (+1) (return 1 <> return 2)) >>= return . sort)
+ sort <$> toListParallel (fmap (+1) (return 1 <> return 2))
`shouldReturn` ([2,3] :: [Int])
---------------------------------------------------------------------------
@@ -225,19 +228,19 @@ parallelTests = H.parallel $ do
-- XXX we should do these through property tests by using a
-- construction via list fold construction method.
it "Apply - serial composed first argument" $
- (toListSerial $ (,) <$> (return 1 <> return 2) <*> (return 3))
+ toListSerial ((,) <$> (return 1 <> return 2) <*> return 3)
`shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)])
it "Apply - serial composed second argument" $
- (toListSerial $ (,) <$> (return 1) <*> (return 2 <> return 3))
+ toListSerial ((,) <$> return 1 <*> (return 2 <> return 3))
`shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
it "Apply - parallel composed first argument" $
- (toListParallel ((,) <$> (return 1 <> return 2) <*> (return 3)) >>= return . sort)
+ sort <$> toListParallel ((,) <$> (return 1 <> return 2) <*> return 3)
`shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)])
it "Apply - parallel composed second argument" $
- (toListParallel ((,) <$> (return 1) <*> (return 2 <> return 3)) >>= return . sort)
+ sort <$> toListParallel ((,) <$> return 1 <*> (return 2 <> return 3))
`shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
---------------------------------------------------------------------------
@@ -259,37 +262,41 @@ parallelTests = H.parallel $ do
---------------------------------------------------------------------------
-- TBD need more such combinations to be tested.
- describe "serial <> and serial <>" $ composeAndComposeSimple serially serially (cycle [[1 .. 9]])
- describe "ahead <> and ahead <>" $ composeAndComposeSimple aheadly aheadly (cycle [[1 .. 9]])
- describe "ahead <> and serial <>" $ composeAndComposeSimple aheadly serially (cycle [[1 .. 9]])
- describe "serial <> and ahead <>" $ composeAndComposeSimple serially aheadly (cycle [[1 .. 9]])
+ describe "serial <> and serial <>" $
+ composeAndComposeSimple serially serially (repeat [1 .. 9])
+ describe "ahead <> and ahead <>" $
+ composeAndComposeSimple aheadly aheadly (repeat [1 .. 9])
+ describe "ahead <> and serial <>" $
+ composeAndComposeSimple aheadly serially (repeat [1 .. 9])
+ describe "serial <> and ahead <>" $
+ composeAndComposeSimple serially aheadly (repeat [1 .. 9])
describe "<> and <=>" $ composeAndComposeSimple
serially
wSerially
- ([ [1 .. 9]
+ [ [1 .. 9]
, [1 .. 9]
, [1, 3, 2, 4, 6, 5, 7, 9, 8]
, [1, 3, 2, 4, 6, 5, 7, 9, 8]
- ])
+ ]
describe "<=> and <=>" $ composeAndComposeSimple
wSerially
wSerially
- ([ [1, 4, 2, 7, 3, 5, 8, 6, 9]
+ [ [1, 4, 2, 7, 3, 5, 8, 6, 9]
, [1, 7, 4, 8, 2, 9, 5, 3, 6]
, [1, 4, 3, 7, 2, 6, 9, 5, 8]
, [1, 7, 4, 9, 3, 8, 6, 2, 5]
- ])
+ ]
describe "<=> and <>" $ composeAndComposeSimple
wSerially
serially
- ([ [1, 4, 2, 7, 3, 5, 8, 6, 9]
+ [ [1, 4, 2, 7, 3, 5, 8, 6, 9]
, [1, 7, 4, 8, 2, 9, 5, 3, 6]
, [1, 4, 2, 7, 3, 5, 8, 6, 9]
, [1, 7, 4, 8, 2, 9, 5, 3, 6]
- ])
+ ]
---------------------------------------------------------------------------
-- Monoidal composition recursion loops
@@ -492,6 +499,34 @@ parallelTests = H.parallel $ do
it "scanlM' is strict enough" (checkScanlMStrictness scanlM'StrictCheck)
---------------------------------------------------------------------------
+ -- Monadic state snapshot in concurrent tasks
+ ---------------------------------------------------------------------------
+
+ it "asyncly maintains independent states in concurrent tasks"
+ (monadicStateSnapshot asyncly)
+ it "asyncly limited maintains independent states in concurrent tasks"
+ (monadicStateSnapshot (asyncly . S.take 10000))
+ it "wAsyncly maintains independent states in concurrent tasks"
+ (monadicStateSnapshot wAsyncly)
+ it "wAsyncly limited maintains independent states in concurrent tasks"
+ (monadicStateSnapshot (wAsyncly . S.take 10000))
+ it "aheadly maintains independent states in concurrent tasks"
+ (monadicStateSnapshot aheadly)
+ it "aheadly limited maintains independent states in concurrent tasks"
+ (monadicStateSnapshot (aheadly . S.take 10000))
+ it "parallely maintains independent states in concurrent tasks"
+ (monadicStateSnapshot parallely)
+
+ it "async maintains independent states in concurrent tasks"
+ (monadicStateSnapshotOp async)
+ it "ahead maintains independent states in concurrent tasks"
+ (monadicStateSnapshotOp ahead)
+ it "wAsync maintains independent states in concurrent tasks"
+ (monadicStateSnapshotOp wAsync)
+ it "parallel maintains independent states in concurrent tasks"
+ (monadicStateSnapshotOp Streamly.parallel)
+
+ ---------------------------------------------------------------------------
-- Slower tests are at the end
---------------------------------------------------------------------------
@@ -509,12 +544,83 @@ parallelTests = H.parallel $ do
replicate 4000 $ S.yieldM $ threadDelay 1000000)
`shouldReturn` ()
+-- Each snapshot carries an independent state. Multiple parallel tasks should
+-- not affect each other's state. This is especially important when we run
+-- multiple tasks in a single thread.
+snapshot :: (IsStream t, MonadAsync m, MonadState Int m) => t m ()
+snapshot =
+ -- We deliberately use a replicate count 1 here, because a lower count
+ -- catches problems that a higher count doesn't.
+ S.replicateM 1 $ do
+ -- Even though we modify the state here it should not reflect in other
+ -- parallel tasks, it is local to each concurrent task.
+ modify (+1) >> get >>= liftIO . (`shouldSatisfy` (==1))
+ modify (+1) >> get >>= liftIO . (`shouldSatisfy` (==2))
+
+snapshot1 :: (IsStream t, MonadAsync m, MonadState Int m) => t m ()
+snapshot1 = S.replicateM 1000 $
+ modify (+1) >> get >>= liftIO . (`shouldSatisfy` (==2))
+
+snapshot2 :: (IsStream t, MonadAsync m, MonadState Int m) => t m ()
+snapshot2 = S.replicateM 1000 $
+ modify (+1) >> get >>= liftIO . (`shouldSatisfy` (==2))
+
+stateComp
+ :: ( IsStream t
+ , MonadAsync m
+ , Semigroup (t m ())
+ , MonadIO (t m)
+ , MonadState Int m
+ , MonadState Int (t m)
+ )
+ => t m ()
+stateComp = do
+ -- Each task in a concurrent composition inherits the state and maintains
+ -- its own modifications to it, not affecting the parent computation.
+ snapshot <> (modify (+1) >> (snapshot1 <> snapshot2))
+ -- The above modify statement does not affect our state because that is
+ -- used in a parallel composition. In a serial composition it will affect
+ -- our state.
+ get >>= liftIO . (`shouldSatisfy` (== (0 :: Int)))
+
+monadicStateSnapshot
+ :: ( IsStream t
+ , Semigroup (t (StateT Int IO) ())
+ , MonadIO (t (StateT Int IO))
+ , MonadState Int (t (StateT Int IO))
+ )
+ => (t (StateT Int IO) () -> SerialT (StateT Int IO) ()) -> IO ()
+monadicStateSnapshot t = void $ runStateT (runStream $ t stateComp) 0
+
+stateCompOp
+ :: ( AsyncT (StateT Int IO) ()
+ -> AsyncT (StateT Int IO) ()
+ -> AsyncT (StateT Int IO) ()
+ )
+ -> SerialT (StateT Int IO) ()
+stateCompOp op = do
+ -- Each task in a concurrent composition inherits the state and maintains
+ -- its own modifications to it, not affecting the parent computation.
+ asyncly (snapshot `op` (modify (+1) >> (snapshot1 `op` snapshot2)))
+ -- The above modify statement does not affect our state because that is
+ -- used in a parallel composition. In a serial composition it will affect
+ -- our state.
+ get >>= liftIO . (`shouldSatisfy` (== (0 :: Int)))
+
+monadicStateSnapshotOp
+ :: ( AsyncT (StateT Int IO) ()
+ -> AsyncT (StateT Int IO) ()
+ -> AsyncT (StateT Int IO) ()
+ )
+ -> IO ()
+monadicStateSnapshotOp op = void $ runStateT (runStream $ stateCompOp op) 0
+
takeCombined :: (Monad m, Semigroup (t m Int), Show a, Eq a, IsStream t)
=> Int -> (t m Int -> SerialT IO a) -> IO ()
takeCombined n t = do
let constr = S.fromFoldable
r <- (S.toList . t) $
- S.take n ((constr ([] :: [Int])) <> constr ([] :: [Int]))
+ S.take n (constr ([] :: [Int]) <> constr ([] :: [Int]))
r `shouldBe` []
checkFoldxStrictness :: IO ()
@@ -575,12 +681,10 @@ checkScanl'Strictness = do
`shouldReturn` "success"
foldlM'StrictCheck :: IORef Int -> SerialT IO Int -> IO ()
-foldlM'StrictCheck ref s =
- S.foldlM' (\_ _ -> writeIORef ref 1) () s
+foldlM'StrictCheck ref = S.foldlM' (\_ _ -> writeIORef ref 1) ()
foldxMStrictCheck :: IORef Int -> SerialT IO Int -> IO ()
-foldxMStrictCheck ref s =
- S.foldxM (\_ _ -> writeIORef ref 1) (return ()) return s
+foldxMStrictCheck ref = S.foldxM (\_ _ -> writeIORef ref 1) (return ()) return
checkFoldMStrictness :: (IORef Int -> SerialT IO Int -> IO ()) -> IO ()
checkFoldMStrictness f = do
@@ -590,8 +694,7 @@ checkFoldMStrictness f = do
readIORef ref `shouldReturn` 1
scanlM'StrictCheck :: IORef Int -> SerialT IO Int -> SerialT IO ()
-scanlM'StrictCheck ref s =
- S.scanlM' (\_ _ -> writeIORef ref 1) () s
+scanlM'StrictCheck ref = S.scanlM' (\_ _ -> writeIORef ref 1) ()
checkScanlMStrictness :: (IORef Int -> SerialT IO Int -> SerialT IO ()) -> IO ()
checkScanlMStrictness f = do
@@ -601,17 +704,16 @@ checkScanlMStrictness f = do
readIORef ref `shouldReturn` 1
takeInfinite :: IsStream t => (t IO Int -> SerialT IO Int) -> Spec
-takeInfinite t = do
+takeInfinite t =
it "take 1" $
- (runStream $ t $
- S.take 1 $ S.repeatM (print "hello" >> return (1::Int)))
+ runStream (t $ S.take 1 $ S.repeatM (print "hello" >> return (1::Int)))
`shouldReturn` ()
-- XXX need to test that we have promptly cleaned up everything after the error
-- XXX We can also check the output that we are expected to get before the
-- error occurs.
-data ExampleException = ExampleException String deriving (Eq, Show)
+newtype ExampleException = ExampleException String deriving (Eq, Show)
instance Exception ExampleException
@@ -624,11 +726,11 @@ simpleMonadError = do
it "simple runExceptT with error" $ do
(runExceptT $ runStream $ throwError "E") `shouldReturn` Left "E"
-}
- it "simple try" $ do
- (try $ runStream $ return ())
+ it "simple try" $
+ try (runStream $ return ())
`shouldReturn` (Right () :: Either ExampleException ())
- it "simple try with throw error" $ do
- (try $ runStream $ throwM $ ExampleException "E")
+ it "simple try with throw error" $
+ try (runStream $ throwM $ ExampleException "E")
`shouldReturn` (Left (ExampleException "E") :: Either ExampleException ())
composeWithMonadThrow
@@ -639,10 +741,10 @@ composeWithMonadThrow
=> (t IO Int -> SerialT IO Int) -> Spec
composeWithMonadThrow t = do
it "Compose throwM, nil" $
- (try $ tl (throwM (ExampleException "E") <> S.nil))
+ try (tl (throwM (ExampleException "E") <> S.nil))
`shouldReturn` (Left (ExampleException "E") :: Either ExampleException [Int])
it "Compose nil, throwM" $
- (try $ tl (S.nil <> throwM (ExampleException "E")))
+ try (tl (S.nil <> throwM (ExampleException "E")))
`shouldReturn` (Left (ExampleException "E") :: Either ExampleException [Int])
oneLevelNestedSum "serially" serially
oneLevelNestedSum "wSerially" wSerially
@@ -658,20 +760,20 @@ composeWithMonadThrow t = do
where
tl = S.toList . t
oneLevelNestedSum desc t1 =
- it ("One level nested sum " ++ desc) $ do
- let nested = (S.fromFoldable [1..10] <> throwM (ExampleException "E")
- <> S.fromFoldable [1..10])
- (try $ tl (S.nil <> t1 nested <> S.fromFoldable [1..10]))
+ it ("One level nested sum " <> desc) $ do
+ let nested = S.fromFoldable [1..10] <> throwM (ExampleException "E")
+ <> S.fromFoldable [1..10]
+ try (tl (S.nil <> t1 nested <> S.fromFoldable [1..10]))
`shouldReturn` (Left (ExampleException "E") :: Either ExampleException [Int])
oneLevelNestedProduct desc t1 =
- it ("One level nested product" ++ desc) $ do
+ it ("One level nested product" <> desc) $ do
let s1 = t $ foldMapWith (<>) return [1..4]
s2 = t1 $ foldMapWith (<>) return [5..8]
try $ tl (do
x <- adapt s1
y <- s2
- if (x + y > 10)
+ if x + y > 10
then throwM (ExampleException "E")
else return (x + y)
)
@@ -686,9 +788,9 @@ _composeWithMonadError
_composeWithMonadError t = do
let tl = S.toList . t
it "Compose throwError, nil" $
- (runExceptT $ tl (throwError "E" <> S.nil)) `shouldReturn` Left "E"
+ runExceptT (tl (throwError "E" <> S.nil)) `shouldReturn` Left "E"
it "Compose nil, error" $
- (runExceptT $ tl (S.nil <> throwError "E")) `shouldReturn` Left "E"
+ runExceptT (tl (S.nil <> throwError "E")) `shouldReturn` Left "E"
nestTwoSerial :: Expectation
nestTwoSerial =
@@ -745,54 +847,51 @@ nestTwoAsync :: Expectation
nestTwoAsync =
let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8]
- in (toListAsync (do
+ in sort <$> toListAsync (do
x <- s1
y <- s2
- return (x + y)
- ) >>= return . sort)
+ return (x + y))
`shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
nestTwoAsyncApp :: Expectation
nestTwoAsyncApp =
let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8]
- in (toListAsync ((+) <$> s1 <*> s2) >>= return . sort)
+ in sort <$> toListAsync ((+) <$> s1 <*> s2)
`shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
nestTwoWAsync :: Expectation
nestTwoWAsync =
let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8]
- in ((S.toList . wAsyncly) (do
+ in sort <$> (S.toList . wAsyncly) (do
x <- s1
y <- s2
- return (x + y)
- ) >>= return . sort)
+ return (x + y))
`shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
nestTwoParallel :: Expectation
nestTwoParallel =
let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8]
- in ((S.toList . parallely) (do
+ in sort <$> (S.toList . parallely) (do
x <- s1
y <- s2
- return (x + y)
- ) >>= return . sort)
+ return (x + y))
`shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
nestTwoWAsyncApp :: Expectation
nestTwoWAsyncApp =
let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8]
- in ((S.toList . wAsyncly) ((+) <$> s1 <*> s2) >>= return . sort)
+ in sort <$> (S.toList . wAsyncly) ((+) <$> s1 <*> s2)
`shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
nestTwoParallelApp :: Expectation
nestTwoParallelApp =
let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8]
- in ((S.toList . parallely) ((+) <$> s1 <*> s2) >>= return . sort)
+ in sort <$> (S.toList . parallely) ((+) <$> s1 <*> s2)
`shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
interleaveCheck :: IsStream t
@@ -802,7 +901,7 @@ interleaveCheck :: IsStream t
interleaveCheck t f =
it "Interleave four" $
(S.toList . t) ((singleton 0 `f` singleton 1) `f` (singleton 100 `f` singleton 101))
- `shouldReturn` ([0, 100, 1, 101])
+ `shouldReturn` [0, 100, 1, 101]
parallelCheck :: (IsStream t, Monad (t IO))
=> (t IO Int -> SerialT IO Int)
@@ -811,45 +910,45 @@ parallelCheck :: (IsStream t, Monad (t IO))
parallelCheck t f = do
it "Parallel ordering left associated" $
(S.toList . t) (((event 4 `f` event 3) `f` event 2) `f` event 1)
- `shouldReturn` ([1..4])
+ `shouldReturn` [1..4]
it "Parallel ordering right associated" $
(S.toList . t) (event 4 `f` (event 3 `f` (event 2 `f` event 1)))
- `shouldReturn` ([1..4])
+ `shouldReturn` [1..4]
- where event n = (S.yieldM $ threadDelay (n * 200000)) >> (return n)
+ where event n = S.yieldM (threadDelay (n * 200000)) >> return n
compose :: (IsStream t, Semigroup (t IO Int))
=> (t IO Int -> SerialT IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec
compose t z srt = do
-- XXX these should get covered by the property tests
it "Compose mempty, mempty" $
- (tl (z <> z)) `shouldReturn` ([] :: [Int])
+ tl (z <> z) `shouldReturn` ([] :: [Int])
it "Compose empty at the beginning" $
- (tl $ (z <> singleton 1)) `shouldReturn` [1]
+ tl (z <> singleton 1) `shouldReturn` [1]
it "Compose empty at the end" $
- (tl $ (singleton 1 <> z)) `shouldReturn` [1]
+ tl (singleton 1 <> z) `shouldReturn` [1]
it "Compose two" $
- (tl (singleton 0 <> singleton 1) >>= return . srt)
+ srt <$> tl (singleton 0 <> singleton 1)
`shouldReturn` [0, 1]
it "Compose many" $
- ((tl $ forEachWith (<>) [1..100] singleton) >>= return . srt)
+ srt <$> tl (forEachWith (<>) [1..100] singleton)
`shouldReturn` [1..100]
-- These are not covered by the property tests
it "Compose three - empty in the middle" $
- ((tl $ (singleton 0 <> z <> singleton 1)) >>= return . srt)
+ srt <$> tl (singleton 0 <> z <> singleton 1)
`shouldReturn` [0, 1]
it "Compose left associated" $
- ((tl $ (((singleton 0 <> singleton 1) <> singleton 2) <> singleton 3))
- >>= return . srt) `shouldReturn` [0, 1, 2, 3]
+ srt <$> tl (((singleton 0 <> singleton 1) <> singleton 2) <> singleton 3)
+ `shouldReturn` [0, 1, 2, 3]
it "Compose right associated" $
- ((tl $ (singleton 0 <> (singleton 1 <> (singleton 2 <> singleton 3))))
- >>= return . srt) `shouldReturn` [0, 1, 2, 3]
+ srt <$> tl (singleton 0 <> (singleton 1 <> (singleton 2 <> singleton 3)))
+ `shouldReturn` [0, 1, 2, 3]
it "Compose hierarchical (multiple levels)" $
- ((tl $ (((singleton 0 <> singleton 1) <> (singleton 2 <> singleton 3))
+ srt <$> tl (((singleton 0 <> singleton 1) <> (singleton 2 <> singleton 3))
<> ((singleton 4 <> singleton 5) <> (singleton 6 <> singleton 7)))
- ) >>= return . srt) `shouldReturn` [0..7]
+ `shouldReturn` [0..7]
where tl = S.toList . t
composeAndComposeSimple
@@ -865,20 +964,20 @@ composeAndComposeSimple
composeAndComposeSimple t1 t2 answer = do
let rfold = adapt . t2 . foldMapWith (<>) return
it "Compose right associated outer expr, right folded inner" $
- ((S.toList . t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9])))
- `shouldReturn` (answer !! 0)
+ (S.toList . t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9]))
+ `shouldReturn` head answer
it "Compose left associated outer expr, right folded inner" $
- ((S.toList . t1) ((rfold [1,2,3] <> rfold [4,5,6]) <> rfold [7,8,9]))
+ (S.toList . t1) ((rfold [1,2,3] <> rfold [4,5,6]) <> rfold [7,8,9])
`shouldReturn` (answer !! 1)
- let lfold xs = adapt $ t2 $ foldl (<>) mempty $ map return xs
+ let lfold xs = adapt $ t2 $ foldl (<>) mempty $ fmap return xs
it "Compose right associated outer expr, left folded inner" $
- ((S.toList . t1) (lfold [1,2,3] <> (lfold [4,5,6] <> lfold [7,8,9])))
+ (S.toList . t1) (lfold [1,2,3] <> (lfold [4,5,6] <> lfold [7,8,9]))
`shouldReturn` (answer !! 2)
it "Compose left associated outer expr, left folded inner" $
- ((S.toList . t1) ((lfold [1,2,3] <> lfold [4,5,6]) <> lfold [7,8,9]))
+ (S.toList . t1) ((lfold [1,2,3] <> lfold [4,5,6]) <> lfold [7,8,9])
`shouldReturn` (answer !! 3)
loops
@@ -888,10 +987,10 @@ loops
-> ([Int] -> [Int])
-> Spec
loops t tsrt hsrt = do
- it "Tail recursive loop" $ ((S.toList . adapt) (loopTail 0) >>= return . tsrt)
+ it "Tail recursive loop" $ (tsrt <$> (S.toList . adapt) (loopTail 0))
`shouldReturn` [0..3]
- it "Head recursive loop" $ ((S.toList . adapt) (loopHead 0) >>= return . hsrt)
+ it "Head recursive loop" $ (hsrt <$> (S.toList . adapt) (loopHead 0))
`shouldReturn` [0..3]
where
@@ -913,13 +1012,14 @@ bindAndComposeSimple
bindAndComposeSimple t1 t2 = do
-- XXX need a bind in the body of forEachWith instead of a simple return
it "Compose many (right fold) with bind" $
- ((S.toList . t1) (adapt . t2 $ forEachWith (<>) [1..10 :: Int] return)
- >>= return . sort) `shouldReturn` [1..10]
+ (sort <$> (S.toList . t1)
+ (adapt . t2 $ forEachWith (<>) [1..10 :: Int] return))
+ `shouldReturn` [1..10]
it "Compose many (left fold) with bind" $
- let forL xs k = foldl (<>) nil $ map k xs
- in ((S.toList . t1) (adapt . t2 $ forL [1..10 :: Int] return)
- >>= return . sort) `shouldReturn` [1..10]
+ let forL xs k = foldl (<>) nil $ fmap k xs
+ in (sort <$> (S.toList . t1) (adapt . t2 $ forL [1..10 :: Int] return))
+ `shouldReturn` [1..10]
bindAndComposeHierarchy
:: ( IsStream t1, Monad (t1 IO)
@@ -928,16 +1028,16 @@ bindAndComposeHierarchy
-> (t2 IO Int -> t2 IO Int)
-> ([t2 IO Int] -> t2 IO Int)
-> Spec
-bindAndComposeHierarchy t1 t2 g = do
+bindAndComposeHierarchy t1 t2 g =
it "Bind and compose nested" $
- ((S.toList . t1) bindComposeNested >>= return . sort)
+ (sort <$> (S.toList . t1) bindComposeNested)
`shouldReturn` (sort (
[12, 18]
- ++ replicate 3 13
- ++ replicate 3 17
- ++ replicate 6 14
- ++ replicate 6 16
- ++ replicate 7 15) :: [Int])
+ <> replicate 3 13
+ <> replicate 3 17
+ <> replicate 6 14
+ <> replicate 6 16
+ <> replicate 7 15) :: [Int])
where
@@ -960,9 +1060,9 @@ bindAndComposeHierarchy t1 t2 g = do
>>= \z -> return (x + y + z)
mixedOps :: Spec
-mixedOps = do
+mixedOps =
it "Compose many ops" $
- (toListSerial composeMixed >>= return . sort)
+ (sort <$> toListSerial composeMixed)
`shouldReturn` ([8,9,9,9,9,9,10,10,10,10,10,10,10,10,10,10,11,11
,11,11,11,11,11,11,11,11,12,12,12,12,12,13
] :: [Int])
@@ -972,8 +1072,8 @@ mixedOps = do
composeMixed = do
S.yieldM $ return ()
S.yieldM $ putStr ""
- x <- return 1
- y <- return 2
+ let x = 1
+ let y = 2
z <- do
x1 <- wAsyncly $ return 1 <> return 2
S.yieldM $ return ()
@@ -990,9 +1090,9 @@ mixedOps = do
return (x + y + z)
mixedOpsAheadly :: Spec
-mixedOpsAheadly = do
+mixedOpsAheadly =
it "Compose many ops" $
- (toListSerial composeMixed >>= return . sort)
+ (sort <$> toListSerial composeMixed)
`shouldReturn` ([8,9,9,9,9,9,10,10,10,10,10,10,10,10,10,10,11,11
,11,11,11,11,11,11,11,11,12,12,12,12,12,13
] :: [Int])
@@ -1002,8 +1102,8 @@ mixedOpsAheadly = do
composeMixed = do
S.yieldM $ return ()
S.yieldM $ putStr ""
- x <- return 1
- y <- return 2
+ let x = 1
+ let y = 2
z <- do
x1 <- wAsyncly $ return 1 <> return 2
S.yieldM $ return ()
diff --git a/test/MaxRate.hs b/test/MaxRate.hs
index 40d6dac..dc4c4f6 100644
--- a/test/MaxRate.hs
+++ b/test/MaxRate.hs
@@ -13,10 +13,10 @@ durationShouldBe d@(tMin, tMax) action = do
t0 <- getTime Monotonic
action
t1 <- getTime Monotonic
- let t = (fromIntegral $ toNanoSecs (t1 - t0)) / 1e9
+ let t = fromIntegral (toNanoSecs (t1 - t0)) / 1e9
-- tMax = fromNanoSecs (round $ d*10^9*1.2)
-- tMin = fromNanoSecs (round $ d*10^9*0.8)
- putStrLn $ "Expected: " ++ show d ++ " Took: " ++ show t
+ putStrLn $ "Expected: " <> show d <> " Took: " <> show t
(t <= tMax && t >= tMin) `shouldBe` True
toMicroSecs :: Num a => a -> a
@@ -30,11 +30,11 @@ measureRate' :: IsStream t
-> (Double, Double)
-> (Double, Double)
-> Spec
-measureRate' desc t rval consumerDelay producerDelay dur = do
- it (desc ++ " rate: " ++ show rval
- ++ ", consumer latency: " ++ show consumerDelay
- ++ ", producer latency: " ++ show producerDelay)
- $ durationShouldBe dur $ do
+measureRate' desc t rval consumerDelay producerDelay dur =
+ it (desc <> " rate: " <> show rval
+ <> ", consumer latency: " <> show consumerDelay
+ <> ", producer latency: " <> show producerDelay)
+ $ durationShouldBe dur $
runStream
$ (if consumerDelay > 0
then S.mapM $ \x ->
@@ -51,14 +51,14 @@ measureRate' desc t rval consumerDelay producerDelay dur = do
then return $ round $ toMicroSecs t1
else randomRIO ( round $ toMicroSecs t1
, round $ toMicroSecs t2)
- when (r > 0) $ do
+ when (r > 0) $ -- do
-- t1 <- getTime Monotonic
threadDelay r
-- t2 <- getTime Monotonic
-- let delta = fromIntegral (toNanoSecs (t2 - t1)) / 1000000000
- -- putStrLn $ "delay took: " ++ show delta
+ -- putStrLn $ "delay took: " <> show delta
-- when (delta > 2) $ do
- -- putStrLn $ "delay took high: " ++ show delta
+ -- putStrLn $ "delay took high: " <> show delta
return 1
measureRate :: IsStream t
@@ -82,37 +82,37 @@ main = hspec $ do
-- lower (1 or lower). For rate 1 we lose 1 second in the end and for rate
-- 10 0.1 second.
let rates = [1, 10, 100, 1000, 10000, 100000, 1000000]
- in describe "asyncly no consumer delay no producer delay" $ do
+ in describe "asyncly no consumer delay no producer delay" $
forM_ rates (\r -> measureRate "asyncly" asyncly r 0 0 range)
-- XXX try staggering the dispatches to achieve higher rates
let rates = [1, 10, 100, 1000, 10000, 25000]
- in describe "asyncly no consumer delay and 1 sec producer delay" $ do
+ in describe "asyncly no consumer delay and 1 sec producer delay" $
forM_ rates (\r -> measureRate "asyncly" asyncly r 0 1 range)
-- At lower rates (1/10) this is likely to vary quite a bit depending on
-- the spread of random producer latencies generated.
let rates = [1, 10, 100, 1000, 10000, 25000]
- in describe "asyncly no consumer delay and variable producer delay" $ do
+ in describe "asyncly no consumer delay and variable producer delay" $
forM_ rates $ \r ->
measureRate' "asyncly" asyncly r 0 (0.1, 3) range
let rates = [1, 10, 100, 1000, 10000, 100000, 1000000]
- in describe "wAsyncly no consumer delay no producer delay" $ do
+ in describe "wAsyncly no consumer delay no producer delay" $
forM_ rates (\r -> measureRate "wAsyncly" wAsyncly r 0 0 range)
let rates = [1, 10, 100, 1000, 10000, 25000]
- in describe "wAsyncly no consumer delay and 1 sec producer delay" $ do
+ in describe "wAsyncly no consumer delay and 1 sec producer delay" $
forM_ rates (\r -> measureRate "wAsyncly" wAsyncly r 0 1 range)
let rates = [1, 10, 100, 1000, 10000, 100000, 1000000]
- in describe "aheadly no consumer delay no producer delay" $ do
+ in describe "aheadly no consumer delay no producer delay" $
forM_ rates (\r -> measureRate "aheadly" aheadly r 0 0 range)
-- XXX after the change to stop workers when the heap is clearing
-- thi does not work well at a 25000 ops per second, need to fix.
let rates = [1, 10, 100, 1000, 10000, 12500]
- in describe "aheadly no consumer delay and 1 sec producer delay" $ do
+ in describe "aheadly no consumer delay and 1 sec producer delay" $
forM_ rates (\r -> measureRate "aheadly" aheadly r 0 1 range)
describe "asyncly with 1 sec producer delay and some consumer delay" $ do
diff --git a/test/Prop.hs b/test/Prop.hs
index 71d0241..419328d 100644
--- a/test/Prop.hs
+++ b/test/Prop.hs
@@ -2,16 +2,18 @@
module Main (main) where
-import Control.Exception (BlockedIndefinitelyOnMVar(..), catches,
- BlockedIndefinitelyOnSTM(..), Handler(..))
-import Control.Monad (when, forM_)
import Control.Applicative (ZipList(..))
import Control.Concurrent (MVar, takeMVar, putMVar, newEmptyMVar)
-import Control.Monad (replicateM, replicateM_)
+import Control.Exception
+ (BlockedIndefinitelyOnMVar(..), catches,
+ BlockedIndefinitelyOnSTM(..), Handler(..))
+import Control.Monad (when, forM_, replicateM, replicateM_)
+import Control.Monad.IO.Class (MonadIO(..))
import Data.Function ((&))
import Data.IORef (readIORef, modifyIORef, newIORef)
-import Data.List (sort, foldl', scanl', findIndices, findIndex, elemIndices,
- elemIndex, find, intersperse, foldl1')
+import Data.List
+ (sort, foldl', scanl', findIndices, findIndex, elemIndices,
+ elemIndex, find, intersperse, foldl1', (\\))
import Data.Maybe (mapMaybe)
import GHC.Word (Word8)
@@ -46,7 +48,28 @@ equals eq stream list = do
when (not $ stream `eq` list) $
monitor
(counterexample $
- "stream " ++ show stream ++ " /= list " ++ show list)
+ "stream " <> show stream
+ <> "\nlist " <> show list
+ )
+ assert (stream `eq` list)
+
+listEquals
+ :: (Show a, Eq a, MonadIO m)
+ => ([a] -> [a] -> Bool) -> [a] -> [a] -> PropertyM m ()
+listEquals eq stream list = do
+ when (not $ stream `eq` list) $ liftIO $ putStrLn $
+ "stream " <> show stream
+ <> "\nlist " <> show list
+ <> "\nstream \\\\ list " <> show (stream \\ list)
+ <> "\nlist \\\\ stream " <> show (list \\ stream)
+ when (not $ stream `eq` list) $
+ monitor
+ (counterexample $
+ "stream " <> show stream
+ <> "\nlist " <> show list
+ <> "\nstream \\\\ list " <> show (stream \\ list)
+ <> "\nlist \\\\ stream " <> show (list \\ stream)
+ )
assert (stream `eq` list)
constructWithReplicateM
@@ -59,10 +82,10 @@ constructWithReplicateM op len = withMaxSuccess maxTestCount $
let x = return (1 :: Int)
stream <- run $ (S.toList . op) (S.replicateM (fromIntegral len) x)
list <- run $ replicateM (fromIntegral len) x
- equals (==) stream list
+ listEquals (==) stream list
transformFromList
- :: Show b =>
+ :: (Eq b, Show b) =>
([a] -> t IO a)
-> ([b] -> [b] -> Bool)
-> ([a] -> [b])
@@ -73,15 +96,15 @@ transformFromList constr eq listOp op a =
monadicIO $ do
stream <- run ((S.toList . op) (constr a))
let list = listOp a
- equals eq stream list
+ listEquals eq stream list
mvarExcHandler :: String -> BlockedIndefinitelyOnMVar -> IO ()
-mvarExcHandler label BlockedIndefinitelyOnMVar = do
- error $ label ++ " " ++ "BlockedIndefinitelyOnMVar\n"
+mvarExcHandler label BlockedIndefinitelyOnMVar =
+ error $ label <> " " <> "BlockedIndefinitelyOnMVar\n"
stmExcHandler :: String -> BlockedIndefinitelyOnSTM -> IO ()
-stmExcHandler label BlockedIndefinitelyOnSTM = do
- error $ label ++ " " ++ "BlockedIndefinitelyOnSTM\n"
+stmExcHandler label BlockedIndefinitelyOnSTM =
+ error $ label <> " " <> "BlockedIndefinitelyOnSTM\n"
dbgMVar :: String -> IO () -> IO ()
dbgMVar label action =
@@ -92,10 +115,10 @@ dbgMVar label action =
-- | first n actions takeMVar and the last action performs putMVar n times
mvarSequenceOp :: MVar () -> Word8 -> Word8 -> IO Word8
mvarSequenceOp mv n x = do
- let msg = show x ++ "/" ++ show n
+ let msg = show x <> "/" <> show n
if x < n
- then dbgMVar ("take mvarSequenceOp " ++ msg) (takeMVar mv) >> return x
- else dbgMVar ("put mvarSequenceOp" ++ msg)
+ then dbgMVar ("take mvarSequenceOp " <> msg) (takeMVar mv) >> return x
+ else dbgMVar ("put mvarSequenceOp" <> msg)
(replicateM_ (fromIntegral n) (putMVar mv ())) >> return x
concurrentMapM
@@ -109,8 +132,8 @@ concurrentMapM constr eq op n =
let list = [0..n]
stream <- run $ do
mv <- newEmptyMVar :: IO (MVar ())
- (S.toList . (op n mv)) (constr list)
- equals eq stream list
+ (S.toList . op n mv) (constr list)
+ listEquals eq stream list
concurrentFromFoldable
:: IsStream t
@@ -123,8 +146,8 @@ concurrentFromFoldable eq op n =
let list = [0..n]
stream <- run $ do
mv <- newEmptyMVar :: IO (MVar ())
- (S.toList . op) (S.fromFoldableM (map (mvarSequenceOp mv n) list))
- equals eq stream list
+ (S.toList . op) (S.fromFoldableM (fmap (mvarSequenceOp mv n) list))
+ listEquals eq stream list
sourceUnfoldrM :: IsStream t => MVar () -> Word8 -> t IO Word8
sourceUnfoldrM mv n = S.unfoldrM step 0
@@ -132,11 +155,11 @@ sourceUnfoldrM mv n = S.unfoldrM step 0
-- argument must be integer to avoid overflow of word8 at 255
step :: Int -> IO (Maybe (Word8, Int))
step cnt = do
- let msg = show cnt ++ "/" ++ show n
+ let msg = show cnt <> "/" <> show n
if cnt > fromIntegral n
then return Nothing
else do
- dbgMVar ("put sourceUnfoldrM " ++ msg) (putMVar mv ())
+ dbgMVar ("put sourceUnfoldrM " <> msg) (putMVar mv ())
return (Just (fromIntegral cnt, cnt + 1))
-- Note that this test is not guaranteed to succeed, because there is no
@@ -152,7 +175,7 @@ concurrentUnfoldrM eq op n =
-- XXX we should test empty list case as well
let list = [0..n]
stream <- run $ do
- -- putStrLn $ "concurrentUnfoldrM: " ++ show n
+ -- putStrLn $ "concurrentUnfoldrM: " <> show n
mv <- newEmptyMVar :: IO (MVar ())
cnt <- newIORef 0
-- since unfoldr happens in parallel with the stream processing we
@@ -165,20 +188,16 @@ concurrentUnfoldrM eq op n =
-- instead.
i <- S.yieldM $ readIORef cnt
S.yieldM $ modifyIORef cnt (+1)
- let msg = show i ++ "/" ++ show n
- S.yieldM $ do
- if even i
- then do
- dbgMVar ("first take concurrentUnfoldrM " ++ msg)
+ let msg = show i <> "/" <> show n
+ S.yieldM $
+ when (even i) $ do
+ dbgMVar ("first take concurrentUnfoldrM " <> msg)
(takeMVar mv)
- if n > i
- then do
- dbgMVar ("second take concurrentUnfoldrM " ++ msg)
+ when (n > i) $
+ dbgMVar ("second take concurrentUnfoldrM " <> msg)
(takeMVar mv)
- else return ()
- else return ()
return x
- equals eq stream list
+ listEquals eq stream list
concurrentApplication :: IsStream t
=> ([Word8] -> [Word8] -> Bool)
@@ -190,33 +209,30 @@ concurrentApplication eq t n = withMaxSuccess maxTestCount $
-- XXX we should test empty list case as well
let list = [0..n]
stream <- run $ do
- -- putStrLn $ "concurrentApplication: " ++ show n
+ -- putStrLn $ "concurrentApplication: " <> show n
mv <- newEmptyMVar :: IO (MVar ())
-- since unfoldr happens in parallel with the stream processing we
-- can do two takeMVar in one iteration. If it is not parallel then
-- this will not work and the test will fail.
- (S.toList . t) $ do
+ (S.toList . t) $
sourceUnfoldrM mv n |&
- (S.mapM $ \x -> do
- let msg = show x ++ "/" ++ show n
- if even x
- then do
- dbgMVar ("first take concurrentApp " ++ msg)
+ S.mapM (\x -> do
+ let msg = show x <> "/" <> show n
+ when (even x) $ do
+ dbgMVar ("first take concurrentApp " <> msg)
(takeMVar mv)
- if n > x
- then dbgMVar ("second take concurrentApp " ++ msg)
+ when (n > x) $
+ dbgMVar ("second take concurrentApp " <> msg)
(takeMVar mv)
- else return ()
- else return ()
return x)
- equals eq stream list
+ listEquals eq stream list
sourceUnfoldrM1 :: IsStream t => Word8 -> t IO Word8
sourceUnfoldrM1 n = S.unfoldrM step 0
where
-- argument must be integer to avoid overflow of word8 at 255
step :: Int -> IO (Maybe (Word8, Int))
- step cnt = do
+ step cnt =
if cnt > fromIntegral n
then return Nothing
else return (Just (fromIntegral cnt, cnt + 1))
@@ -226,18 +242,18 @@ concurrentFoldlApplication n =
monadicIO $ do
-- XXX we should test empty list case as well
let list = [0..n]
- stream <- run $ do
+ stream <- run $
sourceUnfoldrM1 n |&. S.foldlM' (\xs x -> return (x : xs)) []
- equals (==) (reverse stream) list
+ listEquals (==) (reverse stream) list
concurrentFoldrApplication :: Word8 -> Property
concurrentFoldrApplication n =
monadicIO $ do
-- XXX we should test empty list case as well
let list = [0..n]
- stream <- run $ do
+ stream <- run $
sourceUnfoldrM1 n |&. S.foldrM (\x xs -> return (x : xs)) []
- equals (==) stream list
+ listEquals (==) stream list
transformCombineFromList
:: Semigroup (t IO Int)
@@ -256,7 +272,7 @@ transformCombineFromList constr eq listOp t op a b c =
stream <- run ((S.toList . t) $
constr a <> op (constr b <> constr c))
let list = a <> listOp (b <> c)
- equals eq stream list
+ listEquals eq stream list
foldFromList
:: ([Int] -> t IO Int)
@@ -264,7 +280,7 @@ foldFromList
-> ([Int] -> [Int] -> Bool)
-> [Int]
-> Property
-foldFromList constr op eq a = transformFromList constr eq id op a
+foldFromList constr op eq = transformFromList constr eq id op
eliminateOp
:: (Show a, Eq a)
@@ -300,8 +316,8 @@ functorOps
-> (t IO Int -> SerialT IO Int)
-> Spec
functorOps constr desc eq t = do
- prop (desc ++ " id") $ transformFromList constr eq id $ t
- prop (desc ++ " fmap (+1)") $ transformFromList constr eq (fmap (+1)) $ t . (fmap (+1))
+ prop (desc <> " id") $ transformFromList constr eq id t
+ prop (desc <> " fmap (+1)") $ transformFromList constr eq (fmap (+1)) $ t . fmap (+1)
transformOps
:: IsStream t
@@ -313,48 +329,48 @@ transformOps
transformOps constr desc eq t = do
let transform = transformFromList constr eq
-- Filtering
- prop (desc ++ " filter False") $
- transform (filter (const False)) $ t . (S.filter (const False))
- prop (desc ++ " filter True") $
- transform (filter (const True)) $ t . (S.filter (const True))
- prop (desc ++ " filter even") $
- transform (filter even) $ t . (S.filter even)
-
- prop (desc ++ " take maxBound") $
- transform (take maxBound) $ t . (S.take maxBound)
- prop (desc ++ " take 0") $ transform (take 0) $ t . (S.take 0)
- prop (desc ++ " take 1") $ transform (take 1) $ t . (S.take 1)
- prop (desc ++ " take 10") $ transform (take 10) $ t . (S.take 10)
-
- prop (desc ++ " takeWhile True") $
- transform (takeWhile (const True)) $ t . (S.takeWhile (const True))
- prop (desc ++ " takeWhile False") $
- transform (takeWhile (const False)) $ t . (S.takeWhile (const False))
- prop (desc ++ " takeWhile > 0") $
- transform (takeWhile (> 0)) $ t . (S.takeWhile (> 0))
+ prop (desc <> " filter False") $
+ transform (filter (const False)) $ t . S.filter (const False)
+ prop (desc <> " filter True") $
+ transform (filter (const True)) $ t . S.filter (const True)
+ prop (desc <> " filter even") $
+ transform (filter even) $ t . S.filter even
+
+ prop (desc <> " take maxBound") $
+ transform (take maxBound) $ t . S.take maxBound
+ prop (desc <> " take 0") $ transform (take 0) $ t . S.take 0
+ prop (desc <> " take 1") $ transform (take 1) $ t . S.take 1
+ prop (desc <> " take 10") $ transform (take 10) $ t . S.take 10
+
+ prop (desc <> " takeWhile True") $
+ transform (takeWhile (const True)) $ t . S.takeWhile (const True)
+ prop (desc <> " takeWhile False") $
+ transform (takeWhile (const False)) $ t . S.takeWhile (const False)
+ prop (desc <> " takeWhile > 0") $
+ transform (takeWhile (> 0)) $ t . S.takeWhile (> 0)
let f x = if odd x then Just (x + 100) else Nothing
- prop (desc ++ " mapMaybe") $ transform (mapMaybe f) $ t . (S.mapMaybe f)
+ prop (desc <> " mapMaybe") $ transform (mapMaybe f) $ t . S.mapMaybe f
- prop (desc ++ " drop maxBound") $
- transform (drop maxBound) $ t . (S.drop maxBound)
- prop (desc ++ " drop 0") $ transform (drop 0) $ t . (S.drop 0)
- prop (desc ++ " drop 1") $ transform (drop 1) $ t . (S.drop 1)
- prop (desc ++ " drop 10") $ transform (drop 10) $ t . (S.drop 10)
+ prop (desc <> " drop maxBound") $
+ transform (drop maxBound) $ t . S.drop maxBound
+ prop (desc <> " drop 0") $ transform (drop 0) $ t . S.drop 0
+ prop (desc <> " drop 1") $ transform (drop 1) $ t . S.drop 1
+ prop (desc <> " drop 10") $ transform (drop 10) $ t . S.drop 10
- prop (desc ++ " dropWhile True") $
- transform (dropWhile (const True)) $ t . (S.dropWhile (const True))
- prop (desc ++ " dropWhile False") $
- transform (dropWhile (const False)) $ t . (S.dropWhile (const False))
- prop (desc ++ " dropWhile > 0") $
- transform (dropWhile (> 0)) $ t . (S.dropWhile (> 0))
- prop (desc ++ " scan") $ transform (scanl' (+) 0) $ t . (S.scanl' (+) 0)
- prop (desc ++ " reverse") $ transform reverse $ t . S.reverse
+ prop (desc <> " dropWhile True") $
+ transform (dropWhile (const True)) $ t . S.dropWhile (const True)
+ prop (desc <> " dropWhile False") $
+ transform (dropWhile (const False)) $ t . S.dropWhile (const False)
+ prop (desc <> " dropWhile > 0") $
+ transform (dropWhile (> 0)) $ t . S.dropWhile (> 0)
+ prop (desc <> " scan") $ transform (scanl' (+) 0) $ t . S.scanl' (+) 0
+ prop (desc <> " reverse") $ transform reverse $ t . S.reverse
- prop (desc ++ " findIndices") $ transform (findIndices odd) $ t . (S.findIndices odd)
- prop (desc ++ " elemIndices") $ transform (elemIndices 3) $ t . (S.elemIndices 3)
+ prop (desc <> " findIndices") $ transform (findIndices odd) $ t . S.findIndices odd
+ prop (desc <> " elemIndices") $ transform (elemIndices 3) $ t . S.elemIndices 3
- prop (desc ++ " intersperseM") $ transform (intersperse 3) $ t . (S.intersperseM (return 3))
+ prop (desc <> " intersperseM") $ transform (intersperse 3) $ t . S.intersperseM (return 3)
concurrentOps
@@ -367,15 +383,15 @@ concurrentOps
concurrentOps constr desc eq t = do
let prop1 d p = prop d $ withMaxSuccess maxTestCount p
- prop1 (desc ++ " fromFoldableM") $ concurrentFromFoldable eq t
- prop1 (desc ++ " unfoldrM") $ concurrentUnfoldrM eq t
+ prop1 (desc <> " fromFoldableM") $ concurrentFromFoldable eq t
+ prop1 (desc <> " unfoldrM") $ concurrentUnfoldrM eq t
-- we pass it the length of the stream n and an mvar mv.
-- The stream is [0..n]. The threads communicate in such a way that the
-- actions coming first in the stream are dependent on the last action. So
-- if the stream is not processed concurrently it will block forever.
-- Note that if the size of the stream is bigger than the thread limit
-- then it will block even if it is concurrent.
- prop1 (desc ++ " mapM") $
+ prop1 (desc <> " mapM") $
concurrentMapM constr eq $ \n mv stream ->
t $ S.mapM (mvarSequenceOp mv n) stream
@@ -391,58 +407,58 @@ transformCombineOpsCommon
transformCombineOpsCommon constr desc eq t = do
let transform = transformCombineFromList constr eq
-- Filtering
- prop (desc ++ " filter False") $
+ prop (desc <> " filter False") $
transform (filter (const False)) t (S.filter (const False))
- prop (desc ++ " filter True") $
+ prop (desc <> " filter True") $
transform (filter (const True)) t (S.filter (const True))
- prop (desc ++ " filter even") $
+ prop (desc <> " filter even") $
transform (filter even) t (S.filter even)
- prop (desc ++ " filterM False") $
+ prop (desc <> " filterM False") $
transform (filter (const False)) t (S.filterM (const $ return False))
- prop (desc ++ " filterM True") $
+ prop (desc <> " filterM True") $
transform (filter (const True)) t (S.filterM (const $ return True))
- prop (desc ++ " filterM even") $
+ prop (desc <> " filterM even") $
transform (filter even) t (S.filterM (return . even))
- prop (desc ++ " take maxBound") $
+ prop (desc <> " take maxBound") $
transform (take maxBound) t (S.take maxBound)
- prop (desc ++ " take 0") $ transform (take 0) t (S.take 0)
+ prop (desc <> " take 0") $ transform (take 0) t (S.take 0)
- prop (desc ++ " takeWhile True") $
+ prop (desc <> " takeWhile True") $
transform (takeWhile (const True)) t (S.takeWhile (const True))
- prop (desc ++ " takeWhile False") $
+ prop (desc <> " takeWhile False") $
transform (takeWhile (const False)) t (S.takeWhile (const False))
- prop (desc ++ " takeWhileM True") $
+ prop (desc <> " takeWhileM True") $
transform (takeWhile (const True)) t (S.takeWhileM (const $ return True))
- prop (desc ++ " takeWhileM False") $
+ prop (desc <> " takeWhileM False") $
transform (takeWhile (const False)) t (S.takeWhileM (const $ return False))
- prop (desc ++ " drop maxBound") $
+ prop (desc <> " drop maxBound") $
transform (drop maxBound) t (S.drop maxBound)
- prop (desc ++ " drop 0") $ transform (drop 0) t (S.drop 0)
+ prop (desc <> " drop 0") $ transform (drop 0) t (S.drop 0)
- prop (desc ++ " dropWhile True") $
+ prop (desc <> " dropWhile True") $
transform (dropWhile (const True)) t (S.dropWhile (const True))
- prop (desc ++ " dropWhile False") $
+ prop (desc <> " dropWhile False") $
transform (dropWhile (const False)) t (S.dropWhile (const False))
- prop (desc ++ " dropWhileM True") $
+ prop (desc <> " dropWhileM True") $
transform (dropWhile (const True)) t (S.dropWhileM (const $ return True))
- prop (desc ++ " dropWhileM False") $
+ prop (desc <> " dropWhileM False") $
transform (dropWhile (const False)) t (S.dropWhileM (const $ return False))
- prop (desc ++ " mapM (+1)") $
- transform (map (+1)) t (S.mapM (\x -> return (x + 1)))
+ prop (desc <> " mapM (+1)") $
+ transform (fmap (+1)) t (S.mapM (\x -> return (x + 1)))
- prop (desc ++ " scan") $ transform (scanl' (flip const) 0) t
+ prop (desc <> " scan") $ transform (scanl' (flip const) 0) t
(S.scanl' (flip const) 0)
- prop (desc ++ " scanlM'") $ transform (scanl' (flip const) 0) t
+ prop (desc <> " scanlM'") $ transform (scanl' (flip const) 0) t
(S.scanlM' (\_ a -> return a) 0)
- prop (desc ++ " reverse") $ transform reverse t S.reverse
+ prop (desc <> " reverse") $ transform reverse t S.reverse
- prop (desc ++ " intersperseM") $
+ prop (desc <> " intersperseM") $
transform (intersperse 3) t (S.intersperseM $ return 3)
transformCombineOpsOrdered
@@ -455,40 +471,36 @@ transformCombineOpsOrdered
transformCombineOpsOrdered constr desc eq t = do
let transform = transformCombineFromList constr eq
-- Filtering
- prop (desc ++ " take 1") $ transform (take 1) t (S.take 1)
+ prop (desc <> " take 1") $ transform (take 1) t (S.take 1)
#ifdef DEVBUILD
- prop (desc ++ " take 2") $ transform (take 2) t (S.take 2)
- prop (desc ++ " take 3") $ transform (take 3) t (S.take 3)
- prop (desc ++ " take 4") $ transform (take 4) t (S.take 4)
- prop (desc ++ " take 5") $ transform (take 5) t (S.take 5)
+ prop (desc <> " take 2") $ transform (take 2) t (S.take 2)
+ prop (desc <> " take 3") $ transform (take 3) t (S.take 3)
+ prop (desc <> " take 4") $ transform (take 4) t (S.take 4)
+ prop (desc <> " take 5") $ transform (take 5) t (S.take 5)
#endif
- prop (desc ++ " take 10") $ transform (take 10) t (S.take 10)
+ prop (desc <> " take 10") $ transform (take 10) t (S.take 10)
- prop (desc ++ " takeWhile > 0") $
+ prop (desc <> " takeWhile > 0") $
transform (takeWhile (> 0)) t (S.takeWhile (> 0))
- prop (desc ++ " drop 1") $ transform (drop 1) t (S.drop 1)
- prop (desc ++ " drop 10") $ transform (drop 10) t (S.drop 10)
+ prop (desc <> " drop 1") $ transform (drop 1) t (S.drop 1)
+ prop (desc <> " drop 10") $ transform (drop 10) t (S.drop 10)
- prop (desc ++ " dropWhile > 0") $
+ prop (desc <> " dropWhile > 0") $
transform (dropWhile (> 0)) t (S.dropWhile (> 0))
- prop (desc ++ " scan") $ transform (scanl' (+) 0) t (S.scanl' (+) 0)
+ prop (desc <> " scan") $ transform (scanl' (+) 0) t (S.scanl' (+) 0)
-- XXX this does not fail when the SVar is shared, need to fix.
- prop (desc ++ " concurrent application") $
- transform (& (map (+1))) t (|& (S.map (+1)))
+ prop (desc <> " concurrent application") $
+ transform (& fmap (+1)) t (|& S.map (+1))
- prop (desc ++ " findIndices") $
+ prop (desc <> " findIndices") $
transform (findIndices odd) t (S.findIndices odd)
- prop (desc ++ " elemIndices") $
+ prop (desc <> " elemIndices") $
transform (elemIndices 0) t (S.elemIndices 0)
-wrapMaybe :: Eq a1 => ([a1] -> a2) -> [a1] -> Maybe a2
-wrapMaybe f =
- \x ->
- if x == []
- then Nothing
- else Just (f x)
+wrapMaybe :: ([a1] -> a2) -> [a1] -> Maybe a2
+wrapMaybe f x = if null x then Nothing else Just (f x)
eliminationOps
:: ([Int] -> t IO Int)
@@ -497,31 +509,31 @@ eliminationOps
-> Spec
eliminationOps constr desc t = do
-- Elimination
- prop (desc ++ " null") $ eliminateOp constr null $ S.null . t
- prop (desc ++ " foldl'") $
- eliminateOp constr (foldl' (+) 0) $ (S.foldl' (+) 0) . t
- prop (desc ++ " foldl1'") $
- eliminateOp constr (wrapMaybe $ foldl1' (+)) $ (S.foldl1' (+)) . t
- prop (desc ++ " foldr1") $
- eliminateOp constr (wrapMaybe $ foldr1 (+)) $ (S.foldr1 (+)) . t
- prop (desc ++ " all") $ eliminateOp constr (all even) $ (S.all even) . t
- prop (desc ++ " any") $ eliminateOp constr (any even) $ (S.any even) . t
- prop (desc ++ " and") $ eliminateOp constr (and . map (> 0)) $
+ prop (desc <> " null") $ eliminateOp constr null $ S.null . t
+ prop (desc <> " foldl'") $
+ eliminateOp constr (foldl' (+) 0) $ S.foldl' (+) 0 . t
+ prop (desc <> " foldl1'") $
+ eliminateOp constr (wrapMaybe $ foldl1' (+)) $ S.foldl1' (+) . t
+ prop (desc <> " foldr1") $
+ eliminateOp constr (wrapMaybe $ foldr1 (+)) $ S.foldr1 (+) . t
+ prop (desc <> " all") $ eliminateOp constr (all even) $ S.all even . t
+ prop (desc <> " any") $ eliminateOp constr (any even) $ S.any even . t
+ prop (desc <> " and") $ eliminateOp constr (and . fmap (> 0)) $
(S.and . S.map (> 0)) . t
- prop (desc ++ " or") $ eliminateOp constr (or . map (> 0)) $
+ prop (desc <> " or") $ eliminateOp constr (or . fmap (> 0)) $
(S.or . S.map (> 0)) . t
- prop (desc ++ " length") $ eliminateOp constr length $ S.length . t
- prop (desc ++ " sum") $ eliminateOp constr sum $ S.sum . t
- prop (desc ++ " product") $ eliminateOp constr product $ S.product . t
+ prop (desc <> " length") $ eliminateOp constr length $ S.length . t
+ prop (desc <> " sum") $ eliminateOp constr sum $ S.sum . t
+ prop (desc <> " product") $ eliminateOp constr product $ S.product . t
- prop (desc ++ " maximum") $ eliminateOp constr (wrapMaybe maximum) $ S.maximum . t
- prop (desc ++ " minimum") $ eliminateOp constr (wrapMaybe minimum) $ S.minimum . t
+ prop (desc <> " maximum") $ eliminateOp constr (wrapMaybe maximum) $ S.maximum . t
+ prop (desc <> " minimum") $ eliminateOp constr (wrapMaybe minimum) $ S.minimum . t
- prop (desc ++ " findIndex") $ eliminateOp constr (findIndex odd) $ (S.findIndex odd) . t
- prop (desc ++ " elemIndex") $ eliminateOp constr (elemIndex 3) $ (S.elemIndex 3) . t
+ prop (desc <> " findIndex") $ eliminateOp constr (findIndex odd) $ S.findIndex odd . t
+ prop (desc <> " elemIndex") $ eliminateOp constr (elemIndex 3) $ S.elemIndex 3 . t
- prop (desc ++ " find") $ eliminateOp constr (find even) $ (S.find even) . t
- prop (desc ++ " lookup") $
+ prop (desc <> " find") $ eliminateOp constr (find even) $ S.find even . t
+ prop (desc <> " lookup") $
eliminateOp constr (lookup 3 . flip zip [1..]) $
S.lookup 3 . S.zipWith (\a b -> (b, a)) (S.fromList [(1::Int)..]) . t
@@ -533,18 +545,18 @@ serialEliminationOps
-> (t IO Int -> SerialT IO Int)
-> Spec
serialEliminationOps constr desc t = do
- prop (desc ++ " head") $ eliminateOp constr (wrapMaybe head) $ S.head . t
- prop (desc ++ " tail") $ eliminateOp constr (wrapMaybe tail) $ \x -> do
+ prop (desc <> " head") $ eliminateOp constr (wrapMaybe head) $ S.head . t
+ prop (desc <> " tail") $ eliminateOp constr (wrapMaybe tail) $ \x -> do
r <- S.tail (t x)
case r of
Nothing -> return Nothing
- Just s -> S.toList s >>= return . Just
- prop (desc ++ " last") $ eliminateOp constr (wrapMaybe last) $ S.last . t
- prop (desc ++ " init") $ eliminateOp constr (wrapMaybe init) $ \x -> do
+ Just s -> Just <$> S.toList s
+ prop (desc <> " last") $ eliminateOp constr (wrapMaybe last) $ S.last . t
+ prop (desc <> " init") $ eliminateOp constr (wrapMaybe init) $ \x -> do
r <- S.init (t x)
case r of
Nothing -> return Nothing
- Just s -> S.toList s >>= return . Just
+ Just s -> Just <$> S.toList s
transformOpsWord8
:: ([Word8] -> t IO Word8)
@@ -552,8 +564,8 @@ transformOpsWord8
-> (t IO Word8 -> SerialT IO Word8)
-> Spec
transformOpsWord8 constr desc t = do
- prop (desc ++ " elem") $ elemOp constr t S.elem elem
- prop (desc ++ " elem") $ elemOp constr t S.notElem notElem
+ prop (desc <> " elem") $ elemOp constr t S.elem elem
+ prop (desc <> " elem") $ elemOp constr t S.notElem notElem
-- XXX concatenate streams of multiple elements rather than single elements
semigroupOps
@@ -568,8 +580,8 @@ semigroupOps
-> (t IO Int -> SerialT IO Int)
-> Spec
semigroupOps desc eq t = do
- prop (desc ++ " <>") $ foldFromList (foldMapWith (<>) singleton) t eq
- prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend singleton) t eq
+ prop (desc <> " <>") $ foldFromList (foldMapWith (<>) singleton) t eq
+ prop (desc <> " mappend") $ foldFromList (foldMapWith mappend singleton) t eq
applicativeOps
:: Applicative (t IO)
@@ -580,9 +592,9 @@ applicativeOps
-> Property
applicativeOps constr eq t (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
- stream <- run ((S.toList . t) ((,) <$> (constr a) <*> (constr b)))
+ stream <- run ((S.toList . t) ((,) <$> constr a <*> constr b))
let list = (,) <$> a <*> b
- equals eq stream list
+ listEquals eq stream list
zipApplicative
:: (IsStream t, Applicative (t IO))
@@ -593,13 +605,13 @@ zipApplicative
-> Property
zipApplicative constr eq t (a, b) = withMaxSuccess maxTestCount $
monadicIO $ do
- stream1 <- run ((S.toList . t) ((,) <$> (constr a) <*> (constr b)))
- stream2 <- run ((S.toList . t) (pure (,) <*> (constr a) <*> (constr b)))
+ stream1 <- run ((S.toList . t) ((,) <$> constr a <*> constr b))
+ stream2 <- run ((S.toList . t) (pure (,) <*> constr a <*> constr b))
stream3 <- run ((S.toList . t) (S.zipWith (,) (constr a) (constr b)))
let list = getZipList $ (,) <$> ZipList a <*> ZipList b
- equals eq stream1 list
- equals eq stream2 list
- equals eq stream3 list
+ listEquals eq stream1 list
+ listEquals eq stream2 list
+ listEquals eq stream3 list
zipMonadic
:: IsStream t
@@ -613,9 +625,9 @@ zipMonadic constr eq t (a, b) = withMaxSuccess maxTestCount $
stream1 <-
run
((S.toList . t)
- (S.zipWithM (\x y -> return (x, y)) (constr a) (constr b)))
+ (S.zipWithM (curry return) (constr a) (constr b)))
let list = getZipList $ (,) <$> ZipList a <*> ZipList b
- equals eq stream1 list
+ listEquals eq stream1 list
zipAsyncMonadic
:: IsStream t
@@ -629,14 +641,14 @@ zipAsyncMonadic constr eq t (a, b) = withMaxSuccess maxTestCount $
stream1 <-
run
((S.toList . t)
- (S.zipWithM (\x y -> return (x, y)) (constr a) (constr b)))
+ (S.zipWithM (curry return) (constr a) (constr b)))
stream2 <-
run
((S.toList . t)
- (S.zipAsyncWithM (\x y -> return (x, y)) (constr a) (constr b)))
+ (S.zipAsyncWithM (curry return) (constr a) (constr b)))
let list = getZipList $ (,) <$> ZipList a <*> ZipList b
- equals eq stream1 list
- equals eq stream2 list
+ listEquals eq stream1 list
+ listEquals eq stream2 list
monadThen
:: Monad (t IO)
@@ -646,9 +658,9 @@ monadThen
-> ([Int], [Int])
-> Property
monadThen constr eq t (a, b) = withMaxSuccess maxTestCount $ monadicIO $ do
- stream <- run ((S.toList . t) ((constr a) >> (constr b)))
+ stream <- run ((S.toList . t) (constr a >> constr b))
let list = a >> b
- equals eq stream list
+ listEquals eq stream list
monadBind
:: Monad (t IO)
@@ -662,19 +674,19 @@ monadBind constr eq t (a, b) = withMaxSuccess maxTestCount $
stream <-
run
((S.toList . t)
- ((constr a) >>= \x -> (constr b) >>= return . (+ x)))
- let list = a >>= \x -> b >>= return . (+ x)
- equals eq stream list
+ (constr a >>= \x -> (+ x) <$> constr b))
+ let list = a >>= \x -> (+ x) <$> b
+ listEquals eq stream list
constructWithIterate :: IsStream t => (t IO Int -> SerialT IO Int) -> Spec
constructWithIterate t = do
it "iterate" $
- (S.toList . t . (S.take 100) $ (S.iterate (+ 1) (0 :: Int)))
- `shouldReturn` (take 100 $ iterate (+ 1) 0)
+ (S.toList . t . S.take 100) (S.iterate (+ 1) (0 :: Int))
+ `shouldReturn` take 100 (iterate (+ 1) 0)
it "iterateM" $ do
- let addM = (\ y -> return (y + 1))
- S.toList . t . (S.take 100) $ S.iterateM addM (0 :: Int)
- `shouldReturn` (take 100 $ iterate (+ 1) 0)
+ let addM y = return (y + 1)
+ S.toList . t . S.take 100 $ S.iterateM addM (0 :: Int)
+ `shouldReturn` take 100 (iterate (+ 1) 0)
main :: IO ()
main = hspec
@@ -705,43 +717,43 @@ main = hspec
let mapOps spec = mapM_ (\(desc, f) -> describe desc $ spec f)
let serialOps :: IsStream t => ((SerialT IO a -> t IO a) -> Spec) -> Spec
- serialOps spec = mapOps spec $ (makeOps serially)
+ serialOps spec = mapOps spec $ makeOps serially
#ifndef COVERAGE_BUILD
- ++ [("rate AvgRate 0.00000001", serially . avgRate 0.00000001)]
- ++ [("maxBuffer -1", serially . maxBuffer (-1))]
+ <> [("rate AvgRate 0.00000001", serially . avgRate 0.00000001)]
+ <> [("maxBuffer -1", serially . maxBuffer (-1))]
#endif
let wSerialOps :: IsStream t => ((WSerialT IO a -> t IO a) -> Spec) -> Spec
wSerialOps spec = mapOps spec $ makeOps wSerially
#ifndef COVERAGE_BUILD
- ++ [("rate AvgRate 0.00000001", wSerially . avgRate 0.00000001)]
- ++ [("maxBuffer (-1)", wSerially . maxBuffer (-1))]
+ <> [("rate AvgRate 0.00000001", wSerially . avgRate 0.00000001)]
+ <> [("maxBuffer (-1)", wSerially . maxBuffer (-1))]
#endif
let asyncOps :: IsStream t => ((AsyncT IO a -> t IO a) -> Spec) -> Spec
asyncOps spec = mapOps spec $ makeOps asyncly
#ifndef COVERAGE_BUILD
- ++ [("maxBuffer (-1)", asyncly . maxBuffer (-1))]
+ <> [("maxBuffer (-1)", asyncly . maxBuffer (-1))]
#endif
let wAsyncOps :: IsStream t => ((WAsyncT IO a -> t IO a) -> Spec) -> Spec
wAsyncOps spec = mapOps spec $ makeOps wAsyncly
#ifndef COVERAGE_BUILD
- ++ [("maxBuffer (-1)", wAsyncly . maxBuffer (-1))]
+ <> [("maxBuffer (-1)", wAsyncly . maxBuffer (-1))]
#endif
let aheadOps :: IsStream t => ((AheadT IO a -> t IO a) -> Spec) -> Spec
aheadOps spec = mapOps spec $ makeOps aheadly
#ifndef COVERAGE_BUILD
- ++ [("maxBuffer (-1)", aheadly . maxBuffer (-1))]
+ <> [("maxBuffer (-1)", aheadly . maxBuffer (-1))]
#endif
let parallelOps :: IsStream t => ((ParallelT IO a -> t IO a) -> Spec) -> Spec
parallelOps spec = mapOps spec $ makeOps parallely
#ifndef COVERAGE_BUILD
- ++ [("rate AvgRate 0.00000001", parallely . avgRate 0.00000001)]
- ++ [("maxBuffer (-1)", parallely . maxBuffer (-1))]
+ <> [("rate AvgRate 0.00000001", parallely . avgRate 0.00000001)]
+ <> [("maxBuffer (-1)", parallely . maxBuffer (-1))]
#endif
let zipSerialOps :: IsStream t => ((ZipSerialM IO a -> t IO a) -> Spec) -> Spec
zipSerialOps spec = mapOps spec $ makeOps zipSerially
#ifndef COVERAGE_BUILD
- ++ [("rate AvgRate 0.00000001", zipSerially . avgRate 0.00000001)]
- ++ [("maxBuffer (-1)", zipSerially . maxBuffer (-1))]
+ <> [("rate AvgRate 0.00000001", zipSerially . avgRate 0.00000001)]
+ <> [("maxBuffer (-1)", zipSerially . maxBuffer (-1))]
#endif
-- Note, the "pure" of applicative Zip streams generates and infinite
-- stream and therefore maxBuffer (-1) must not be used for that case.
@@ -920,9 +932,9 @@ main = hspec
aheadOps $ prop "ahead" . concurrentApplication (==)
parallelOps $ prop "parallel" . concurrentApplication sortEq
- prop "concurrent foldr application" $ withMaxSuccess maxTestCount $
+ prop "concurrent foldr application" $ withMaxSuccess maxTestCount
concurrentFoldrApplication
- prop "concurrent foldl application" $ withMaxSuccess maxTestCount $
+ prop "concurrent foldl application" $ withMaxSuccess maxTestCount
concurrentFoldlApplication
-- These tests are specifically targeted towards detecting illegal sharing
diff --git a/test/loops.hs b/test/loops.hs
index cccf9e8..eb23ea2 100644
--- a/test/loops.hs
+++ b/test/loops.hs
@@ -2,35 +2,36 @@ import Streamly
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
import Streamly.Prelude (nil, yieldM)
+main :: IO ()
main = do
hSetBuffering stdout LineBuffering
- putStrLn $ "\nloopTail:\n"
+ putStrLn "\nloopTail:\n"
runStream $ do
x <- loopTail 0
yieldM $ print (x :: Int)
- putStrLn $ "\nloopHead:\n"
+ putStrLn "\nloopHead:\n"
runStream $ do
x <- loopHead 0
yieldM $ print (x :: Int)
- putStrLn $ "\nloopTailA:\n"
+ putStrLn "\nloopTailA:\n"
runStream $ do
x <- loopTailA 0
yieldM $ print (x :: Int)
- putStrLn $ "\nloopHeadA:\n"
+ putStrLn "\nloopHeadA:\n"
runStream $ do
x <- loopHeadA 0
yieldM $ print (x :: Int)
- putStrLn $ "\nwSerial:\n"
+ putStrLn "\nwSerial:\n"
runStream $ do
x <- (return 0 <> return 1) `wSerial` (return 100 <> return 101)
yieldM $ print (x :: Int)
- putStrLn $ "\nParallel interleave:\n"
+ putStrLn "\nParallel interleave:\n"
runStream $ do
x <- (return 0 <> return 1) `wAsync` (return 100 <> return 101)
yieldM $ print (x :: Int)
diff --git a/test/nested-loops.hs b/test/nested-loops.hs
index 6bde54a..bb4d453 100644
--- a/test/nested-loops.hs
+++ b/test/nested-loops.hs
@@ -4,6 +4,7 @@ import System.Random (randomIO)
import Streamly
import Streamly.Prelude (nil, yieldM)
+main :: IO ()
main = runStream $ do
yieldM $ hSetBuffering stdout LineBuffering
x <- loop "A " 2
@@ -19,6 +20,6 @@ main = runStream $ do
loop :: String -> Int -> SerialT IO String
loop name n = do
rnd <- yieldM (randomIO :: IO Int)
- let result = (name ++ show rnd)
- repeat = if n > 1 then loop name (n - 1) else nil
- in (return result) `wAsync` repeat
+ let result = name <> show rnd
+ repeatIt = if n > 1 then loop name (n - 1) else nil
+ in return result `wAsync` repeatIt
diff --git a/test/parallel-loops.hs b/test/parallel-loops.hs
index 4e8819b..d66e5f2 100644
--- a/test/parallel-loops.hs
+++ b/test/parallel-loops.hs
@@ -4,6 +4,7 @@ import System.Random (randomIO)
import Streamly
import qualified Streamly.Prelude as S
+main :: IO ()
main = do
hSetBuffering stdout LineBuffering
runStream $ do