summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharendra <>2018-07-14 02:38:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-07-14 02:38:00 (GMT)
commit8d94fbe7755c526f9858d54e628d4ad4cdcdb2a0 (patch)
treeca488e0a810fd9d331b55e06db4826c2858f6ef6
parent47d20b95b7320b6a2abef57e459410ebe858598f (diff)
version 0.4.10.4.1
-rw-r--r--Changelog.md8
-rw-r--r--README.md6
-rwxr-xr-xbench.sh2
-rw-r--r--benchmark/BaseStreams.hs4
-rw-r--r--benchmark/StreamDOps.hs6
-rw-r--r--benchmark/StreamKOps.hs27
-rw-r--r--src/Streamly.hs81
-rw-r--r--src/Streamly/Streams/Prelude.hs88
-rw-r--r--src/Streamly/Streams/StreamD.hs19
-rw-r--r--src/Streamly/Streams/StreamK.hs48
-rw-r--r--src/Streamly/Streams/Zip.hs50
-rw-r--r--stack-7.10.yaml4
-rw-r--r--stack.yaml8
-rw-r--r--streamly.cabal81
-rw-r--r--test/Main.hs157
15 files changed, 363 insertions, 226 deletions
diff --git a/Changelog.md b/Changelog.md
index d4b7fbc..b2bdf17 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -1,3 +1,9 @@
+## 0.4.1
+
+### Bug Fixes
+
+* foldxM was not fully strict, fixed.
+
## 0.4.0
### Breaking changes
@@ -13,6 +19,8 @@
### Enhancements
* Add concurrency control primitives `maxThreads` and `maxBuffer`.
+* Concurrency of a stream with bounded concurrency when used with `take` is now
+ limited by the number elements demanded by `take`.
* Significant performance improvements utilizing stream fusion optimizations.
* Add `yield` to construct a singleton stream from a pure value
* Add `repeat` to generate an infinite stream by repeating a pure value
diff --git a/README.md b/README.md
index 65afde0..1039faa 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,11 @@
# Streamly
+[![Hackage](https://img.shields.io/hackage/v/streamly.svg?style=flat)](https://hackage.haskell.org/package/streamly)
+[![Gitter chat](https://badges.gitter.im/composewell/gitter.svg)](https://gitter.im/composewell/streamly)
+[![Build Status](https://travis-ci.org/composewell/streamly.svg?branch=master)](https://travis-ci.org/composewell/streamly)
+[![Windows Build status](https://ci.appveyor.com/api/projects/status/ajxg0c79raou9ned?svg=true)](https://ci.appveyor.com/project/harendra-kumar/streamly)
+[![Coverage Status](https://coveralls.io/repos/composewell/streamly/badge.svg?branch=master&service=github)](https://coveralls.io/github/composewell/streamly?branch=master)
+
## Stream`ing` `Concurrent`ly
Streamly, short for streaming concurrently, provides monadic streams, with a
diff --git a/bench.sh b/bench.sh
index 3f4b1d4..6ed2a18 100755
--- a/bench.sh
+++ b/bench.sh
@@ -133,7 +133,7 @@ run_bench () {
--include-first-iter \
--min-samples 3 \
--min-duration 0 \
- --match exact
+ --match exact \
--csvraw=$OUTPUT_FILE \
-v 2 \
--measure-with $BENCH_PROG $GAUGE_ARGS || die "Benchmarking failed"
diff --git a/benchmark/BaseStreams.hs b/benchmark/BaseStreams.hs
index 0544f58..6a17935 100644
--- a/benchmark/BaseStreams.hs
+++ b/benchmark/BaseStreams.hs
@@ -37,7 +37,7 @@ main = do
, benchIO "unfoldrM" D.toNull D.sourceUnfoldrM
, benchIO "fromEnum" D.toNull D.sourceFromEnum
- , benchIO "fromFoldable" D.toNull D.sourceFromFoldable
+ , benchIO "fromList" D.toNull D.sourceFromList
-- , benchIO "fromFoldableM" D.sourceFromFoldableM
]
, bgroup "elimination"
@@ -64,7 +64,7 @@ main = do
-- , benchIO "fromEnum" K.toNull K.sourceFromEnum
, benchIO "fromFoldable" K.toNull K.sourceFromFoldable
- , benchIO "fromFoldableM" K.toNull K.sourceFromFoldableM
+ -- , benchIO "fromFoldableM" K.toNull K.sourceFromFoldableM
-- appends
, benchIO "foldMapWith" K.toNull K.sourceFoldMapWith
diff --git a/benchmark/StreamDOps.hs b/benchmark/StreamDOps.hs
index 421790b..4d16038 100644
--- a/benchmark/StreamDOps.hs
+++ b/benchmark/StreamDOps.hs
@@ -96,9 +96,9 @@ sourceUnfoldrM n = S.unfoldrM step n
sourceFromEnum :: Monad m => Int -> Stream m Int
sourceFromEnum n = S.enumFromStepN n 1 value
-{-# INLINE sourceFromFoldable #-}
-sourceFromFoldable :: Monad m => Int -> Stream m Int
-sourceFromFoldable n = S.fromList [n..n+value]
+{-# INLINE sourceFromList #-}
+sourceFromList :: Monad m => Int -> Stream m Int
+sourceFromList n = S.fromList [n..n+value]
{-# INLINE source #-}
source :: Monad m => Int -> Stream m Int
diff --git a/benchmark/StreamKOps.hs b/benchmark/StreamKOps.hs
index d9899e2..3d0fb3e 100644
--- a/benchmark/StreamKOps.hs
+++ b/benchmark/StreamKOps.hs
@@ -13,10 +13,9 @@ import Prelude
(Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=),
subtract, undefined, Maybe(..), not)
-import qualified Streamly.Streams.StreamK as S hiding (runStream)
--- import qualified Streamly.Streams.Serial as S
-import qualified Streamly as S
-import qualified Streamly.Prelude as P
+import qualified Streamly.Streams.StreamK as S
+import qualified Streamly.Streams.Prelude as S
+import qualified Streamly.SVar as S
value, maxValue :: Int
value = 1000000
@@ -68,7 +67,7 @@ mapM :: S.MonadAsync m => Stream m Int -> m ()
-- Stream generation and elimination
-------------------------------------------------------------------------------
-type Stream m a = S.SerialT m a
+type Stream m a = S.Stream m a
{-# INLINE sourceUnfoldr #-}
sourceUnfoldr :: Int -> Stream m Int
@@ -98,17 +97,19 @@ sourceFromEnum n = S.enumFromStepN n 1 value
sourceFromFoldable :: Int -> Stream m Int
sourceFromFoldable n = S.fromFoldable [n..n+value]
+{-
{-# INLINE sourceFromFoldableM #-}
sourceFromFoldableM :: S.MonadAsync m => Int -> Stream m Int
-sourceFromFoldableM n = P.fromFoldableM (Prelude.fmap return [n..n+value])
+sourceFromFoldableM n = S.fromFoldableM (Prelude.fmap return [n..n+value])
+-}
{-# INLINE sourceFoldMapWith #-}
-sourceFoldMapWith :: Monad m => Int -> Stream m Int
-sourceFoldMapWith n = S.foldMapWith (S.<>) return [n..n+value]
+sourceFoldMapWith :: Int -> Stream m Int
+sourceFoldMapWith n = S.foldMapWith (S.serial) S.yield [n..n+value]
{-# INLINE sourceFoldMapWithM #-}
sourceFoldMapWithM :: Monad m => Int -> Stream m Int
-sourceFoldMapWithM n = S.foldMapWith (S.<>) (S.yieldM . return) [n..n+value]
+sourceFoldMapWithM n = S.foldMapWith (S.serial) (S.yieldM . return) [n..n+value]
{-# INLINE source #-}
source :: S.MonadAsync m => Int -> Stream m Int
@@ -160,15 +161,15 @@ filterAllOut = transform . S.filter (> maxValue)
filterAllIn = transform . S.filter (<= maxValue)
takeOne = transform . S.take 1
takeAll = transform . S.take maxValue
-takeWhileTrue = transform . P.takeWhile (<= maxValue)
-dropAll = transform . P.drop maxValue
-dropWhileTrue = transform . P.dropWhile (<= maxValue)
+takeWhileTrue = transform . S.takeWhile (<= maxValue)
+dropAll = transform . S.drop maxValue
+dropWhileTrue = transform . S.dropWhile (<= maxValue)
-------------------------------------------------------------------------------
-- Zipping and concat
-------------------------------------------------------------------------------
-zip src = transform $ (P.zipWith (,) src src)
+zip src = transform $ (S.zipWith (,) src src)
concat _n = return ()
-------------------------------------------------------------------------------
diff --git a/src/Streamly.hs b/src/Streamly.hs
index 503d279..37795cb 100644
--- a/src/Streamly.hs
+++ b/src/Streamly.hs
@@ -55,6 +55,14 @@
-- import Streamly
-- @
+{-# LANGUAGE CPP #-}
+
+#if __GLASGOW_HASKELL__ >= 800
+{-# OPTIONS_GHC -Wno-orphans #-}
+#endif
+
+#include "Streamly/Streams/inline.h"
+
module Streamly
(
MonadAsync
@@ -168,6 +176,79 @@ import Streamly.Streams.SVar (maxThreads, maxBuffer)
import Streamly.SVar (MonadAsync)
import Data.Semigroup (Semigroup(..))
+import qualified Streamly.Streams.StreamD as D
+import qualified Streamly.Streams.StreamK as K
+
+-- XXX This should perhaps be moved to Prelude.
+
+------------------------------------------------------------------------------
+-- Eliminating a stream
+------------------------------------------------------------------------------
+
+-- | Run a streaming composition, discard the results. By default it interprets
+-- the stream as 'SerialT', to run other types of streams use the type adapting
+-- combinators for example @runStream . 'asyncly'@.
+--
+-- @since 0.2.0
+{-# INLINE_EARLY runStream #-}
+runStream :: Monad m => SerialT m a -> m ()
+runStream m = D.runStream $ D.fromStreamK (toStream m)
+{-# RULES "runStream fallback to CPS" [1]
+ forall a. D.runStream (D.fromStreamK a) = K.runStream a #-}
+
+-- | Same as 'runStream'
+--
+-- @since 0.1.0
+{-# DEPRECATED runStreaming "Please use runStream instead." #-}
+runStreaming :: (Monad m, IsStream t) => t m a -> m ()
+runStreaming = runStream . K.adapt
+
+-- | Same as @runStream@.
+--
+-- @since 0.1.0
+{-# DEPRECATED runStreamT "Please use runStream instead." #-}
+runStreamT :: Monad m => SerialT m a -> m ()
+runStreamT = runStream
+
+-- | Same as @runStream . wSerially@.
+--
+-- @since 0.1.0
+{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-}
+runInterleavedT :: Monad m => WSerialT m a -> m ()
+runInterleavedT = runStream . K.adapt
+
+-- | Same as @runStream . parallely@.
+--
+-- @since 0.1.0
+{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-}
+runParallelT :: Monad m => ParallelT m a -> m ()
+runParallelT = runStream . K.adapt
+
+-- | Same as @runStream . asyncly@.
+--
+-- @since 0.1.0
+{-# DEPRECATED runAsyncT "Please use 'runStream . asyncly' instead." #-}
+runAsyncT :: Monad m => AsyncT m a -> m ()
+runAsyncT = runStream . K.adapt
+
+-- | Same as @runStream . zipping@.
+--
+-- @since 0.1.0
+{-# DEPRECATED runZipStream "Please use 'runStream . zipSerially instead." #-}
+runZipStream :: Monad m => ZipSerialM m a -> m ()
+runZipStream = runStream . K.adapt
+
+-- | Same as @runStream . zippingAsync@.
+--
+-- @since 0.1.0
+{-# DEPRECATED runZipAsync "Please use 'runStream . zipAsyncly instead." #-}
+runZipAsync :: Monad m => ZipAsyncM m a -> m ()
+runZipAsync = runStream . K.adapt
+
+------------------------------------------------------------------------------
+-- Documentation
+------------------------------------------------------------------------------
+
-- $serial
--
-- Serial streams compose serially or non-concurrently. In a composed stream,
diff --git a/src/Streamly/Streams/Prelude.hs b/src/Streamly/Streams/Prelude.hs
index ec3054d..62b5229 100644
--- a/src/Streamly/Streams/Prelude.hs
+++ b/src/Streamly/Streams/Prelude.hs
@@ -9,12 +9,6 @@
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} -- XXX
-#if __GLASGOW_HASKELL__ >= 800
-{-# OPTIONS_GHC -Wno-orphans #-}
-#endif
-
-#include "inline.h"
-
-- |
-- Module : Streamly.Streams.Prelude
-- Copyright : (c) 2017 Harendra Kumar
@@ -27,97 +21,17 @@
--
module Streamly.Streams.Prelude
(
- -- * Elimination
- runStream
- , runStreaming -- deprecated
- , runStreamT -- deprecated
- , runInterleavedT -- deprecated
- , runParallelT -- deprecated
- , runAsyncT -- deprecated
- , runZipStream -- deprecated
- , runZipAsync -- deprecated
-
-- * Fold Utilities
- , foldWith
+ foldWith
, foldMapWith
, forEachWith
)
where
import Streamly.Streams.StreamK (IsStream(..))
-import Streamly.Streams.Serial (SerialT, WSerialT)
-import Streamly.Streams.Parallel (ParallelT)
-import Streamly.Streams.Async (AsyncT)
-import Streamly.Streams.Zip (ZipSerialM, ZipAsyncM)
-
-import qualified Streamly.Streams.StreamD as D
import qualified Streamly.Streams.StreamK as K
------------------------------------------------------------------------------
--- Eliminating a stream
-------------------------------------------------------------------------------
-
--- | Run a streaming composition, discard the results. By default it interprets
--- the stream as 'SerialT', to run other types of streams use the type adapting
--- combinators for example @runStream . 'asyncly'@.
---
--- @since 0.2.0
-{-# INLINE_EARLY runStream #-}
-runStream :: Monad m => SerialT m a -> m ()
-runStream m = D.runStream $ D.fromStreamK (toStream m)
-{-# RULES "runStream fallback to CPS" [1]
- forall a. D.runStream (D.fromStreamK a) = K.runStream a #-}
-
--- | Same as 'runStream'
---
--- @since 0.1.0
-{-# DEPRECATED runStreaming "Please use runStream instead." #-}
-runStreaming :: (Monad m, IsStream t) => t m a -> m ()
-runStreaming = runStream . K.adapt
-
--- | Same as @runStream@.
---
--- @since 0.1.0
-{-# DEPRECATED runStreamT "Please use runStream instead." #-}
-runStreamT :: Monad m => SerialT m a -> m ()
-runStreamT = runStream
-
--- | Same as @runStream . wSerially@.
---
--- @since 0.1.0
-{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-}
-runInterleavedT :: Monad m => WSerialT m a -> m ()
-runInterleavedT = runStream . K.adapt
-
--- | Same as @runStream . parallely@.
---
--- @since 0.1.0
-{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-}
-runParallelT :: Monad m => ParallelT m a -> m ()
-runParallelT = runStream . K.adapt
-
--- | Same as @runStream . asyncly@.
---
--- @since 0.1.0
-{-# DEPRECATED runAsyncT "Please use 'runStream . asyncly' instead." #-}
-runAsyncT :: Monad m => AsyncT m a -> m ()
-runAsyncT = runStream . K.adapt
-
--- | Same as @runStream . zipping@.
---
--- @since 0.1.0
-{-# DEPRECATED runZipStream "Please use 'runStream . zipSerially instead." #-}
-runZipStream :: Monad m => ZipSerialM m a -> m ()
-runZipStream = runStream . K.adapt
-
--- | Same as @runStream . zippingAsync@.
---
--- @since 0.1.0
-{-# DEPRECATED runZipAsync "Please use 'runStream . zipAsyncly instead." #-}
-runZipAsync :: Monad m => ZipAsyncM m a -> m ()
-runZipAsync = runStream . K.adapt
-
-------------------------------------------------------------------------------
-- Fold Utilities
------------------------------------------------------------------------------
diff --git a/src/Streamly/Streams/StreamD.hs b/src/Streamly/Streams/StreamD.hs
index 001dc2e..6b6d66e 100644
--- a/src/Streamly/Streams/StreamD.hs
+++ b/src/Streamly/Streams/StreamD.hs
@@ -538,22 +538,15 @@ takeWhile f = takeWhileM (return . f)
{-# INLINE_NORMAL drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a
-drop n (Stream step state) = Stream step' (state, Just n)
+drop n (Stream step state) = Stream step' (state, n)
where
{-# INLINE_LATE step' #-}
- step' gst (st, Just i)
- | i > 0 = do
- r <- step (rstState gst) st
- case r of
- Yield _ s -> step' (rstState gst) (s, Just (i - 1))
- Stop -> return Stop
- | otherwise = step' gst (st, Nothing)
-
- step' gst (st, Nothing) = do
+ step' gst (st, i) = do
r <- step (rstState gst) st
- return $ case r of
- Yield x s -> Yield x (s, Nothing)
- Stop -> Stop
+ case r of
+ Yield _ s | i > 0 -> step' gst (s, i - 1)
+ Yield x s -> return $ Yield x (s, 0)
+ Stop -> return Stop
data DropWhileState s a
= DropWhileDrop s
diff --git a/src/Streamly/Streams/StreamK.hs b/src/Streamly/Streams/StreamK.hs
index f4d1ccb..749c562 100644
--- a/src/Streamly/Streams/StreamK.hs
+++ b/src/Streamly/Streams/StreamK.hs
@@ -116,6 +116,10 @@ module Streamly.Streams.StreamK
-- ** Map and Filter
, mapMaybe
+ -- ** Zipping
+ , zipWith
+ , zipWithM
+
-- * Semigroup Style Composition
, serial
@@ -137,7 +141,7 @@ import Data.Semigroup (Semigroup(..))
import Prelude
hiding (foldl, foldr, last, map, mapM, mapM_, repeat, sequence,
take, filter, all, any, takeWhile, drop, dropWhile, minimum,
- maximum, elem, notElem, null, head, tail)
+ maximum, elem, notElem, null, head, tail, zipWith)
import qualified Prelude
import Streamly.SVar
@@ -521,7 +525,7 @@ foldxM step begin done m = go begin (toStream m)
go !acc m1 =
let stop = acc >>= done
single a = acc >>= \b -> step b a >>= done
- yieldk a r = acc >>= \b -> go (step b a) r
+ yieldk a r = acc >>= \b -> step b a >>= \x -> go (return x) r
in (unStream m1) defState stop single yieldk
-- | Like 'foldl'' but with a monadic step function.
@@ -812,6 +816,46 @@ mapMaybe f m = go (toStream m)
in unStream m1 (rstState st) stp single yieldk
------------------------------------------------------------------------------
+-- Serial Zipping
+------------------------------------------------------------------------------
+
+{-# INLINE zipWithS #-}
+zipWithS :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
+zipWithS f m1 m2 = go m1 m2
+ where
+ go mx my = Stream $ \st stp sng yld -> do
+ let merge a ra =
+ let single2 b = sng (f a b)
+ yield2 b rb = yld (f a b) (go ra rb)
+ in unStream my (rstState st) stp single2 yield2
+ let single1 a = merge a nil
+ yield1 a ra = merge a ra
+ unStream mx (rstState st) stp single1 yield1
+
+-- | Zip two streams serially using a pure zipping function.
+--
+-- @since 0.1.0
+{-# INLINABLE zipWith #-}
+zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
+zipWith f m1 m2 = fromStream $ zipWithS f (toStream m1) (toStream m2)
+
+-- | Zip two streams serially using a monadic zipping function.
+--
+-- @since 0.1.0
+zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
+zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
+ where
+ go mx my = Stream $ \st stp sng yld -> do
+ let merge a ra =
+ let runIt x = unStream x (rstState st) stp sng yld
+ single2 b = f a b >>= sng
+ yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb)
+ in unStream my (rstState st) stp single2 yield2
+ let single1 a = merge a nil
+ yield1 a ra = merge a ra
+ unStream mx (rstState st) stp single1 yield1
+
+------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
diff --git a/src/Streamly/Streams/Zip.hs b/src/Streamly/Streams/Zip.hs
index f89cf6b..387907a 100644
--- a/src/Streamly/Streams/Zip.hs
+++ b/src/Streamly/Streams/Zip.hs
@@ -20,8 +20,8 @@
--
module Streamly.Streams.Zip
(
- zipWith
- , zipWithM
+ K.zipWith
+ , K.zipWithM
, zipAsyncWith
, zipAsyncWithM
@@ -51,46 +51,6 @@ import qualified Streamly.Streams.StreamK as K
#include "Instances.hs"
------------------------------------------------------------------------------
--- Serial Zipping
-------------------------------------------------------------------------------
-
-{-# INLINE zipWithS #-}
-zipWithS :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
-zipWithS f m1 m2 = go m1 m2
- where
- go mx my = Stream $ \st stp sng yld -> do
- let merge a ra =
- let single2 b = sng (f a b)
- yield2 b rb = yld (f a b) (go ra rb)
- in unStream my (rstState st) stp single2 yield2
- let single1 a = merge a K.nil
- yield1 a ra = merge a ra
- unStream mx (rstState st) stp single1 yield1
-
--- | Zip two streams serially using a pure zipping function.
---
--- @since 0.1.0
-{-# INLINABLE zipWith #-}
-zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
-zipWith f m1 m2 = fromStream $ zipWithS f (toStream m1) (toStream m2)
-
--- | Zip two streams serially using a monadic zipping function.
---
--- @since 0.1.0
-zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
-zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
- where
- go mx my = Stream $ \st stp sng yld -> do
- let merge a ra =
- let runIt x = unStream x (rstState st) stp sng yld
- single2 b = f a b >>= sng
- yield2 b rb = f a b >>= \x -> runIt (x `K.cons` go ra rb)
- in unStream my (rstState st) stp single2 yield2
- let single1 a = merge a K.nil
- yield1 a ra = merge a ra
- unStream mx (rstState st) stp single1 yield1
-
-------------------------------------------------------------------------------
-- Serially Zipping Streams
------------------------------------------------------------------------------
@@ -157,7 +117,7 @@ instance Monad m => Functor (ZipSerialM m) where
instance Monad m => Applicative (ZipSerialM m) where
pure = ZipSerialM . K.repeat
- m1 <*> m2 = fromStream $ zipWith id (toStream m1) (toStream m2)
+ m1 <*> m2 = fromStream $ K.zipWith id (toStream m1) (toStream m2)
------------------------------------------------------------------------------
-- Parallel Zipping
@@ -172,7 +132,7 @@ zipAsyncWith :: (IsStream t, MonadAsync m)
zipAsyncWith f m1 m2 = fromStream $ Stream $ \st stp sng yld -> do
ma <- mkAsync' (rstState st) m1
mb <- mkAsync' (rstState st) m2
- unStream (toStream (zipWith f ma mb)) (rstState st) stp sng yld
+ unStream (toStream (K.zipWith f ma mb)) (rstState st) stp sng yld
-- | Zip two streams asyncly (i.e. both the elements being zipped are generated
-- concurrently) using a monadic zipping function.
@@ -183,7 +143,7 @@ zipAsyncWithM :: (IsStream t, MonadAsync m)
zipAsyncWithM f m1 m2 = fromStream $ Stream $ \st stp sng yld -> do
ma <- mkAsync' (rstState st) m1
mb <- mkAsync' (rstState st) m2
- unStream (toStream (zipWithM f ma mb)) (rstState st) stp sng yld
+ unStream (toStream (K.zipWithM f ma mb)) (rstState st) stp sng yld
------------------------------------------------------------------------------
-- Parallely Zipping Streams
diff --git a/stack-7.10.yaml b/stack-7.10.yaml
index 2709235..f4c2a56 100644
--- a/stack-7.10.yaml
+++ b/stack-7.10.yaml
@@ -15,5 +15,5 @@ extra-deps:
flags: {}
extra-package-dbs: []
# For mac ports installed SDL library on Mac OS X
-#extra-include-dirs:
-#- /opt/local/include
+extra-include-dirs:
+- /opt/local/include
diff --git a/stack.yaml b/stack.yaml
index eb67b84..6ad3f4a 100644
--- a/stack.yaml
+++ b/stack.yaml
@@ -1,17 +1,13 @@
-#resolver: lts-11.0
-resolver: nightly-2018-07-06
+resolver: lts-12.0
packages:
- '.'
-#- location: ../bench-graph
-# extra-dep: true
allow-newer: true
extra-deps:
- SDL-0.6.6.0
- gauge-0.2.3
- - bench-graph-0.1.0
+ - bench-graph-0.1.1
- Chart-1.9
- Chart-diagrams-1.9
- - Unique-0.4.7.2
- SVGFonts-1.6.0.3
flags: {}
diff --git a/streamly.cabal b/streamly.cabal
index 4be674a..7dd3bba 100644
--- a/streamly.cabal
+++ b/streamly.cabal
@@ -1,5 +1,5 @@
name: streamly
-version: 0.4.0
+version: 0.4.1
synopsis: Beautiful Streaming, Concurrent and Reactive Composition
description:
Streamly, short for streaming concurrently, provides monadic streams, with a
@@ -274,12 +274,11 @@ test-suite parallel-loops
-- Benchmarks
-------------------------------------------------------------------------------
-benchmark base
+benchmark linear
type: exitcode-stdio-1.0
hs-source-dirs: benchmark
- main-is: BaseStreams.hs
- other-modules: StreamDOps
- , StreamKOps
+ main-is: Linear.hs
+ other-modules: LinearOps
default-language: Haskell2010
ghc-options: -O2 -Wall
if flag(dev)
@@ -295,22 +294,18 @@ benchmark base
-Wredundant-constraints
-Wnoncanonical-monad-instances
-Wnoncanonical-monadfail-instances
- if flag(dev)
- buildable: True
- build-depends:
- streamly
- , base >= 4.8 && < 5
- , deepseq >= 1.4.0 && < 1.5
- , random >= 1.0 && < 2.0
- , gauge >= 0.2.3 && < 0.3
- else
- buildable: False
+ build-depends:
+ streamly
+ , base >= 4.8 && < 5
+ , deepseq >= 1.4.0 && < 1.5
+ , random >= 1.0 && < 2.0
+ , gauge >= 0.2.3 && < 0.3
-benchmark linear
+benchmark nested
type: exitcode-stdio-1.0
hs-source-dirs: benchmark
- main-is: Linear.hs
- other-modules: LinearOps
+ main-is: Nested.hs
+ other-modules: NestedOps
default-language: Haskell2010
ghc-options: -O2 -Wall
if flag(dev)
@@ -333,11 +328,25 @@ benchmark linear
, random >= 1.0 && < 2.0
, gauge >= 0.2.3 && < 0.3
-benchmark nested
+-------------------------------------------------------------------------------
+-- Internal benchmarks for unexposed modules
+-------------------------------------------------------------------------------
+
+-- We have to copy the streamly library modules here because there is no
+-- way to use unexposed modules from the library.
+
+benchmark base
type: exitcode-stdio-1.0
- hs-source-dirs: benchmark
- main-is: Nested.hs
- other-modules: NestedOps
+ hs-source-dirs: benchmark, src
+ main-is: BaseStreams.hs
+ other-modules: Streamly.SVar
+ , Streamly.Streams.StreamK
+ , Streamly.Streams.StreamD
+ , Streamly.Streams.Prelude
+
+ , StreamDOps
+ , StreamKOps
+
default-language: Haskell2010
ghc-options: -O2 -Wall
if flag(dev)
@@ -353,12 +362,30 @@ benchmark nested
-Wredundant-constraints
-Wnoncanonical-monad-instances
-Wnoncanonical-monadfail-instances
+
build-depends:
- streamly
- , base >= 4.8 && < 5
- , deepseq >= 1.4.0 && < 1.5
- , random >= 1.0 && < 2.0
- , gauge >= 0.2.3 && < 0.3
+ base >= 4.8 && < 5
+ , deepseq >= 1.4.0 && < 1.5
+ , random >= 1.0 && < 2.0
+ , gauge >= 0.2.3 && < 0.3
+
+ , ghc-prim >= 0.2 && < 0.6
+ , containers >= 0.5 && < 0.6
+ , heaps >= 0.3 && < 0.4
+
+ -- concurrency
+ , atomic-primops >= 0.8 && < 0.9
+ , lockfree-queue >= 0.2.3 && < 0.3
+
+ , exceptions >= 0.8 && < 0.11
+ , monad-control >= 1.0 && < 2
+ , mtl >= 2.2 && < 3
+ , transformers >= 0.4 && < 0.6
+ , transformers-base >= 0.4 && < 0.5
+
+ if impl(ghc < 8.0)
+ build-depends:
+ semigroups >= 0.18 && < 0.19
executable chart-linear
default-language: Haskell2010
diff --git a/test/Main.hs b/test/Main.hs
index 45e3960..97410bf 100644
--- a/test/Main.hs
+++ b/test/Main.hs
@@ -1,16 +1,19 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
module Main (main) where
import Control.Concurrent (threadDelay)
-import Control.Exception (Exception, try)
+import Control.Exception (Exception, try, ErrorCall(..), catch, throw)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.Error.Class (throwError, MonadError)
import Control.Monad.Trans.Except (runExceptT, ExceptT)
import Data.Foldable (forM_, fold)
import Data.List (sort)
+
+import Data.IORef
import Test.Hspec
import Streamly
@@ -89,30 +92,6 @@ main = hspec $ do
`shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
---------------------------------------------------------------------------
- -- Semigroup/Monoidal Composition strict ordering checks
- ---------------------------------------------------------------------------
-
- -- test both (<>) and mappend to make sure we are using correct instance
- -- for Monoid that is using the right version of semigroup. Instance
- -- deriving can cause us to pick wrong instances sometimes.
-
- describe "WSerial interleaved (<>) ordering check" $ interleaveCheck wSerially (<>)
- describe "WSerial interleaved mappend ordering check" $ interleaveCheck wSerially mappend
-
- -- describe "WAsync interleaved (<>) ordering check" $ interleaveCheck wAsyncly (<>)
- -- describe "WAsync interleaved mappend ordering check" $ interleaveCheck wAsyncly mappend
-
- describe "Async (<>) time order check" $ parallelCheck asyncly (<>)
- describe "Async mappend time order check" $ parallelCheck asyncly mappend
-
- -- XXX this keeps failing intermittently, need to investigate
- -- describe "WAsync (<>) time order check" $ parallelCheck wAsyncly (<>)
- -- describe "WAsync mappend time order check" $ parallelCheck wAsyncly mappend
-
- describe "Parallel (<>) time order check" $ parallelCheck parallely (<>)
- describe "Parallel mappend time order check" $ parallelCheck parallely mappend
-
- ---------------------------------------------------------------------------
-- Monoidal Compositions, multiset equality checks
---------------------------------------------------------------------------
@@ -384,6 +363,50 @@ main = hspec $ do
describe "take on infinite concurrent stream" $ takeInfinite wAsyncly
describe "take on infinite concurrent stream" $ takeInfinite aheadly
+ ---------------------------------------------------------------------------
+ -- Folds are strict enough
+ ---------------------------------------------------------------------------
+
+ it "foldx is strict enough" checkFoldxStrictness
+ it "foldl' is strict enough" checkFoldl'Strictness
+ it "scanx is strict enough" checkScanxStrictness
+ it "scanl' is strict enough" checkScanl'Strictness
+ it "foldxM is strict enough" (checkFoldMStrictness foldxMStrictCheck)
+ it "foldlM' is strict enough" (checkFoldMStrictness foldlM'StrictCheck)
+ it "scanlM' is strict enough" (checkScanlMStrictness scanlM'StrictCheck)
+
+ ---------------------------------------------------------------------------
+ -- Slower tests are at the end
+ ---------------------------------------------------------------------------
+
+ ---------------------------------------------------------------------------
+ -- Semigroup/Monoidal Composition strict ordering checks
+ ---------------------------------------------------------------------------
+
+ -- test both (<>) and mappend to make sure we are using correct instance
+ -- for Monoid that is using the right version of semigroup. Instance
+ -- deriving can cause us to pick wrong instances sometimes.
+
+ describe "WSerial interleaved (<>) ordering check" $ interleaveCheck wSerially (<>)
+ describe "WSerial interleaved mappend ordering check" $ interleaveCheck wSerially mappend
+
+ -- describe "WAsync interleaved (<>) ordering check" $ interleaveCheck wAsyncly (<>)
+ -- describe "WAsync interleaved mappend ordering check" $ interleaveCheck wAsyncly mappend
+
+ describe "Async (<>) time order check" $ parallelCheck asyncly (<>)
+ describe "Async mappend time order check" $ parallelCheck asyncly mappend
+
+ -- XXX this keeps failing intermittently, need to investigate
+ -- describe "WAsync (<>) time order check" $ parallelCheck wAsyncly (<>)
+ -- describe "WAsync mappend time order check" $ parallelCheck wAsyncly mappend
+
+ describe "Parallel (<>) time order check" $ parallelCheck parallely (<>)
+ describe "Parallel mappend time order check" $ parallelCheck parallely mappend
+
+ ---------------------------------------------------------------------------
+ -- Thread limits
+ ---------------------------------------------------------------------------
+
it "asyncly crosses thread limit (2000 threads)" $
runStream (asyncly $ fold $
replicate 2000 $ S.yieldM $ threadDelay 1000000)
@@ -394,6 +417,90 @@ main = hspec $ do
replicate 4000 $ S.yieldM $ threadDelay 1000000)
`shouldReturn` ()
+
+checkFoldxStrictness :: IO ()
+checkFoldxStrictness = do
+ let s = return (1 :: Int) `S.consM` error "failure"
+ catch (S.foldx (\_ a -> if a == 1 then error "success" else "done")
+ "begin" id s)
+ (\e -> case e of
+ ErrorCall err -> return err
+ _ -> throw e)
+ `shouldReturn` "success"
+
+checkFoldl'Strictness :: IO ()
+checkFoldl'Strictness = do
+ let s = return (1 :: Int) `S.consM` error "failure"
+ catch (S.foldl' (\_ a -> if a == 1 then error "success" else "done")
+ "begin" s)
+ (\e -> case e of
+ ErrorCall err -> return err
+ _ -> throw e)
+ `shouldReturn` "success"
+
+checkScanxStrictness :: IO ()
+checkScanxStrictness = do
+ let s = return (1 :: Int) `S.consM` error "failure"
+ catch
+ (runStream (
+ S.scanx (\_ a ->
+ if a == 1
+ then error "success"
+ else "done")
+ "begin" id s
+ )
+ >> return "finished"
+ )
+ (\e -> case e of
+ ErrorCall err -> return err
+ _ -> throw e)
+ `shouldReturn` "success"
+
+checkScanl'Strictness :: IO ()
+checkScanl'Strictness = do
+ let s = return (1 :: Int) `S.consM` error "failure"
+ catch
+ (runStream
+ (S.scanl'
+ (\_ a ->
+ if a == 1
+ then error "success"
+ else "done")
+ "begin"
+ s)
+ >> return "finished"
+ )
+ (\e -> case e of
+ ErrorCall err -> return err
+ _ -> throw e)
+ `shouldReturn` "success"
+
+foldlM'StrictCheck :: IORef Int -> SerialT IO Int -> IO ()
+foldlM'StrictCheck ref s =
+ S.foldlM' (\_ _ -> writeIORef ref 1) () s
+
+foldxMStrictCheck :: IORef Int -> SerialT IO Int -> IO ()
+foldxMStrictCheck ref s =
+ S.foldxM (\_ _ -> writeIORef ref 1) (return ()) return s
+
+checkFoldMStrictness :: (IORef Int -> SerialT IO Int -> IO ()) -> IO ()
+checkFoldMStrictness f = do
+ ref <- newIORef 0
+ let s = return 1 `S.consM` error "x"
+ catch (f ref s) (\(_ :: ErrorCall) -> return ())
+ readIORef ref `shouldReturn` 1
+
+scanlM'StrictCheck :: IORef Int -> SerialT IO Int -> SerialT IO ()
+scanlM'StrictCheck ref s =
+ S.scanlM' (\_ _ -> writeIORef ref 1) () s
+
+checkScanlMStrictness :: (IORef Int -> SerialT IO Int -> SerialT IO ()) -> IO ()
+checkScanlMStrictness f = do
+ ref <- newIORef 0
+ let s = return 1 `S.consM` error "x"
+ catch (runStream $ f ref s) (\(_ :: ErrorCall) -> return ())
+ readIORef ref `shouldReturn` 1
+
takeInfinite :: IsStream t => (t IO Int -> SerialT IO Int) -> Spec
takeInfinite t = do
it "take 1" $