summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharendra <>2018-06-05 14:01:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-06-05 14:01:00 (GMT)
commitd5e7e088186f1857539128e34749d0c89d00e13a (patch)
treec331b4924689a16ae1bca11c67dd9e6e204fa23a
parentaeaeab63dd1c6dd49653ea63caf99bd4c2a3666c (diff)
version 0.2.10.2.1
-rw-r--r--Changelog.md7
-rw-r--r--src/Streamly/Core.hs8
-rw-r--r--src/Streamly/Prelude.hs34
-rw-r--r--streamly.cabal2
-rw-r--r--test/Prop.hs94
5 files changed, 123 insertions, 22 deletions
diff --git a/Changelog.md b/Changelog.md
index 720e2e8..932ab83 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -1,3 +1,10 @@
+## 0.2.1
+
+### Bug Fixes
+* Fixed a bug that caused some transformation ops to return incorrect results
+ when used with concurrent streams. The affected ops are `take`, `filter`,
+ `takeWhile`, `drop`, `dropWhile`, and `reverse`.
+
## 0.2.0
### Breaking changes
diff --git a/src/Streamly/Core.hs b/src/Streamly/Core.hs
index 2609976..7471d36 100644
--- a/src/Streamly/Core.hs
+++ b/src/Streamly/Core.hs
@@ -815,18 +815,18 @@ instance MonadTrans Stream where
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal f m =
- Stream $ \svr stp sng yld ->
+ Stream $ \_ stp sng yld ->
let single = local f . sng
yield a r = local f $ yld a (withLocal f r)
- in (runStream m) svr (local f stp) single yield
+ in (runStream m) Nothing (local f stp) single yield
-- XXX handle and test cross thread state transfer
withCatchError
:: MonadError e m
=> Stream m a -> (e -> Stream m a) -> Stream m a
withCatchError m h =
- Stream $ \svr stp sng yld ->
- let run x = runStream x svr stp sng yield
+ Stream $ \_ stp sng yld ->
+ let run x = runStream x Nothing stp sng yield
handle r = r `catchError` \e -> run $ h e
yield a r = yld a (withCatchError r h)
in handle $ run m
diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs
index 31f088c..f791ec1 100644
--- a/src/Streamly/Prelude.hs
+++ b/src/Streamly/Prelude.hs
@@ -426,9 +426,9 @@ toList = foldrM (\a xs -> return (a : xs)) []
take :: IsStream t => Int -> t m a -> t m a
take n m = fromStream $ go n (toStream m)
where
- go n1 m1 = Stream $ \ctx stp sng yld ->
+ go n1 m1 = Stream $ \_ stp sng yld ->
let yield a r = yld a (go (n1 - 1) r)
- in if n1 <= 0 then stp else (S.runStream m1) ctx stp sng yield
+ in if n1 <= 0 then stp else (S.runStream m1) Nothing stp sng yield
-- | Include only those elements that pass a predicate.
--
@@ -437,12 +437,12 @@ take n m = fromStream $ go n (toStream m)
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter p m = fromStream $ go (toStream m)
where
- go m1 = Stream $ \ctx stp sng yld ->
+ go m1 = Stream $ \_ stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yield a r | p a = yld a (go r)
- | otherwise = (S.runStream r) ctx stp single yield
- in (S.runStream m1) ctx stp single yield
+ | otherwise = (S.runStream r) Nothing stp single yield
+ in (S.runStream m1) Nothing stp single yield
-- | End the stream as soon as the predicate fails on an element.
--
@@ -451,12 +451,12 @@ filter p m = fromStream $ go (toStream m)
takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
takeWhile p m = fromStream $ go (toStream m)
where
- go m1 = Stream $ \ctx stp sng yld ->
+ go m1 = Stream $ \_ stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yield a r | p a = yld a (go r)
| otherwise = stp
- in (S.runStream m1) ctx stp single yield
+ in (S.runStream m1) Nothing stp single yield
-- | Discard first 'n' elements from the stream and take the rest.
--
@@ -464,13 +464,13 @@ takeWhile p m = fromStream $ go (toStream m)
drop :: IsStream t => Int -> t m a -> t m a
drop n m = fromStream $ go n (toStream m)
where
- go n1 m1 = Stream $ \ctx stp sng yld ->
+ go n1 m1 = Stream $ \_ stp sng yld ->
let single _ = stp
- yield _ r = (S.runStream $ go (n1 - 1) r) ctx stp sng yld
+ yield _ r = (S.runStream $ go (n1 - 1) r) Nothing stp sng yld
-- Somehow "<=" check performs better than a ">"
in if n1 <= 0
- then (S.runStream m1) ctx stp sng yld
- else (S.runStream m1) ctx stp single yield
+ then (S.runStream m1) Nothing stp sng yld
+ else (S.runStream m1) Nothing stp single yield
-- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream.
@@ -480,12 +480,12 @@ drop n m = fromStream $ go n (toStream m)
dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
dropWhile p m = fromStream $ go (toStream m)
where
- go m1 = Stream $ \ctx stp sng yld ->
+ go m1 = Stream $ \_ stp sng yld ->
let single a | p a = stp
| otherwise = sng a
- yield a r | p a = (S.runStream r) ctx stp single yield
+ yield a r | p a = (S.runStream r) Nothing stp single yield
| otherwise = yld a r
- in (S.runStream m1) ctx stp single yield
+ in (S.runStream m1) Nothing stp single yield
-- | Determine whether all elements of a stream satisfy a predicate.
--
@@ -599,12 +599,12 @@ length = foldl (\n _ -> n + 1) 0 id
reverse :: (IsStream t) => t m a -> t m a
reverse m = fromStream $ go S.nil (toStream m)
where
- go rev rest = Stream $ \svr stp sng yld ->
- let run x = S.runStream x svr stp sng yld
+ go rev rest = Stream $ \_ stp sng yld ->
+ let run x = S.runStream x Nothing stp sng yld
stop = run rev
single a = run $ a `S.cons` rev
yield a r = run $ go (a `S.cons` rev) r
- in S.runStream rest svr stop single yield
+ in S.runStream rest Nothing stop single yield
-- XXX replace the recursive "go" with continuation
-- | Determine the minimum element in a stream.
diff --git a/streamly.cabal b/streamly.cabal
index 56b1cd9..09d4af6 100644
--- a/streamly.cabal
+++ b/streamly.cabal
@@ -1,5 +1,5 @@
name: streamly
-version: 0.2.0
+version: 0.2.1
synopsis: Beautiful Streaming, Concurrent and Reactive Composition
description:
Streamly, short for streaming concurrently, is a simple yet powerful
diff --git a/test/Prop.hs b/test/Prop.hs
index fa9bf28..fbd8811 100644
--- a/test/Prop.hs
+++ b/test/Prop.hs
@@ -59,6 +59,24 @@ transformFromList constr eq listOp op a =
let list = listOp a
equals eq stream list
+transformCombineFromList
+ :: Semigroup (t IO Int)
+ => ([Int] -> t IO Int)
+ -> ([Int] -> [Int] -> Bool)
+ -> ([Int] -> [Int])
+ -> (t IO Int -> SerialT IO Int)
+ -> (t IO Int -> t IO Int)
+ -> [Int]
+ -> [Int]
+ -> [Int]
+ -> Property
+transformCombineFromList constr eq listOp t op a b c =
+ monadicIO $ do
+ stream <- run ((A.toList . t) $
+ constr a <> op (constr b <> constr c))
+ let list = a <> listOp (b <> c)
+ equals eq stream list
+
foldFromList
:: ([Int] -> t IO Int)
-> (t IO Int -> SerialT IO Int)
@@ -149,6 +167,69 @@ transformOps constr desc t eq = do
prop (desc ++ " scan") $ transform (scanl' (+) 0) $ t . (A.scanl' (+) 0)
prop (desc ++ "reverse") $ transform reverse $ t . A.reverse
+-- XXX add tests for MonadReader and MonadError etc. In case an SVar is
+-- accidentally passed through them.
+transformCombineOpsCommon
+ :: (IsStream t, Semigroup (t IO Int))
+ => ([Int] -> t IO Int)
+ -> String
+ -> (t IO Int -> SerialT IO Int)
+ -> ([Int] -> [Int] -> Bool)
+ -> Spec
+transformCombineOpsCommon constr desc t eq = do
+ let transform = transformCombineFromList constr eq
+ -- Filtering
+ prop (desc ++ " filter False") $
+ transform (filter (const False)) t (A.filter (const False))
+ prop (desc ++ " filter True") $
+ transform (filter (const True)) t (A.filter (const True))
+ prop (desc ++ " filter even") $
+ transform (filter even) t (A.filter even)
+
+ prop (desc ++ " take maxBound") $
+ transform (take maxBound) t (A.take maxBound)
+ prop (desc ++ " take 0") $ transform (take 0) t (A.take 0)
+
+ prop (desc ++ " takeWhile True") $
+ transform (takeWhile (const True)) t (A.takeWhile (const True))
+ prop (desc ++ " takeWhile False") $
+ transform (takeWhile (const False)) t (A.takeWhile (const False))
+
+ prop (desc ++ " drop maxBound") $
+ transform (drop maxBound) t (A.drop maxBound)
+ prop (desc ++ " drop 0") $ transform (drop 0) t (A.drop 0)
+
+ prop (desc ++ " dropWhile True") $
+ transform (dropWhile (const True)) t (A.dropWhile (const True))
+ prop (desc ++ " dropWhile False") $
+ transform (dropWhile (const False)) t (A.dropWhile (const False))
+ prop (desc ++ " scan") $ transform (scanl' (flip const) 0) t
+ (A.scanl' (flip const) 0)
+ prop (desc ++ " reverse") $ transform reverse t A.reverse
+
+transformCombineOpsOrdered
+ :: (IsStream t, Semigroup (t IO Int))
+ => ([Int] -> t IO Int)
+ -> String
+ -> (t IO Int -> SerialT IO Int)
+ -> ([Int] -> [Int] -> Bool)
+ -> Spec
+transformCombineOpsOrdered constr desc t eq = do
+ let transform = transformCombineFromList constr eq
+ -- Filtering
+ prop (desc ++ " take 1") $ transform (take 1) t (A.take 1)
+ prop (desc ++ " take 10") $ transform (take 10) t (A.take 10)
+
+ prop (desc ++ " takeWhile > 0") $
+ transform (takeWhile (> 0)) t (A.takeWhile (> 0))
+
+ prop (desc ++ " drop 1") $ transform (drop 1) t (A.drop 1)
+ prop (desc ++ " drop 10") $ transform (drop 10) t (A.drop 10)
+
+ prop (desc ++ " dropWhile > 0") $
+ transform (dropWhile (> 0)) t (A.dropWhile (> 0))
+ prop (desc ++ " scan") $ transform (scanl' (+) 0) t (A.scanl' (+) 0)
+
wrapMaybe :: Eq a1 => ([a1] -> a2) -> [a1] -> Maybe a2
wrapMaybe f =
\x ->
@@ -418,6 +499,19 @@ main = hspec $ do
transformOpsWord8 folded "wAsyncly folded" wAsyncly
transformOpsWord8 folded "parallely folded" parallely
+ describe "Stream transform and combine operations" $ do
+ transformCombineOpsCommon A.fromFoldable "serially" serially (==)
+ transformCombineOpsCommon A.fromFoldable "wSerially" wSerially sortEq
+ transformCombineOpsCommon A.fromFoldable "zipSerially" zipSerially (==)
+ transformCombineOpsCommon A.fromFoldable "zipAsyncly" zipAsyncly (==)
+ transformCombineOpsCommon A.fromFoldable "asyncly" asyncly sortEq
+ transformCombineOpsCommon A.fromFoldable "wAsyncly" wAsyncly sortEq
+ transformCombineOpsCommon A.fromFoldable "parallely" parallely sortEq
+
+ transformCombineOpsOrdered A.fromFoldable "serially" serially (==)
+ transformCombineOpsOrdered A.fromFoldable "zipSerially" zipSerially (==)
+ transformCombineOpsOrdered A.fromFoldable "zipAsyncly" zipAsyncly (==)
+
describe "Stream elimination operations" $ do
eliminationOps A.fromFoldable "serially" serially
eliminationOps A.fromFoldable "wSerially" wSerially