summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharendra <>2017-12-05 15:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2017-12-05 15:16:00 (GMT)
commit9c09b5a4974f1be31231c759ba70e00ae4d8446a (patch)
treedf62f8d9817d9cf5f67d66d6a4abae48e4fde94c
version 0.1.00.1.0
-rw-r--r--Changelog.md3
-rw-r--r--LICENSE27
-rw-r--r--README.md172
-rw-r--r--benchmark/Main.hs301
-rw-r--r--examples/loops.hs88
-rw-r--r--examples/nested-loops.hs22
-rw-r--r--examples/parallel-loops.hs20
-rw-r--r--src/Streamly.hs249
-rw-r--r--src/Streamly/Core.hs651
-rw-r--r--src/Streamly/Examples.hs60
-rw-r--r--src/Streamly/Examples/AcidRainGame.hs46
-rw-r--r--src/Streamly/Examples/CirclingSquare.hs90
-rw-r--r--src/Streamly/Examples/ListDirRecursive.hs19
-rw-r--r--src/Streamly/Examples/MergeSortedStreams.hs41
-rw-r--r--src/Streamly/Examples/SearchEngineQuery.hs19
-rw-r--r--src/Streamly/Prelude.hs430
-rw-r--r--src/Streamly/Streams.hs985
-rw-r--r--src/Streamly/Time.hs65
-rw-r--r--src/Streamly/Tutorial.hs1042
-rw-r--r--stack-7.10.yaml16
-rw-r--r--stack-8.0.yaml17
-rw-r--r--stack.yaml14
-rw-r--r--streamly.cabal234
-rw-r--r--test/Main.hs618
24 files changed, 5229 insertions, 0 deletions
diff --git a/Changelog.md b/Changelog.md
new file mode 100644
index 0000000..1318780
--- /dev/null
+++ b/Changelog.md
@@ -0,0 +1,3 @@
+## 0.1.0
+
+* Initial release
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..08ee369
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2017, Harendra Kumar
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its contributors
+may be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..1d8d0ff
--- /dev/null
+++ b/README.md
@@ -0,0 +1,172 @@
+# Streamly
+
+[![Gitter chat](https://badges.gitter.im/composewell/gitter.svg)](https://gitter.im/composewell/streamly)
+[![Build Status](https://travis-ci.org/composewell/streamly.svg?branch=master)](https://travis-ci.org/composewell/streamly)
+[![Windows Build status](https://ci.appveyor.com/api/projects/status/ajxg0c79raou9ned?svg=true)](https://ci.appveyor.com/project/harendra-kumar/streamly)
+[![Coverage Status](https://coveralls.io/repos/composewell/streamly/badge.svg?branch=master&service=github)](https://coveralls.io/github/composewell/streamly?branch=master)
+
+## Stream`ing` `Concurrent`ly
+
+Streamly is a monad transformer unifying non-determinism
+([list-t](https://hackage.haskell.org/package/list-t)/[logict](https://hackage.haskell.org/package/logict)),
+concurrency ([async](https://hackage.haskell.org/package/async)),
+streaming ([conduit](https://hackage.haskell.org/package/conduit)\/[pipes](https://hackage.haskell.org/package/pipes)),
+and FRP ([Yampa](https://hackage.haskell.org/package/Yampa)\/[reflex](https://hackage.haskell.org/package/reflex))
+functionality in a concise and intuitive API.
+High level concurrency makes concurrent applications almost indistinguishable
+from non-concurrent ones. By changing a single combinator you can control
+whether the code runs serially or concurrently. It naturally integrates
+concurrency with streaming rather than adding it as an afterthought.
+Moreover, it interworks with the popular streaming libraries.
+
+See the haddock documentation for full reference. It is recommended to read
+the comprehensive tutorial module `Streamly.Tutorial` first. Also see
+`Streamly.Examples` for some working examples.
+
+## Non-determinism
+
+The monad instance composes like a list monad.
+
+``` haskell
+loops = $ do
+ x <- each [1,2]
+ y <- each [3,4]
+ liftIO $ putStrLn $ show (x, y)
+
+main = runStreaming $ serially $ loops
+```
+```
+(1,3)
+(1,4)
+(2,3)
+(2,4)
+```
+
+## Magical Concurrency
+
+To run the above code with demand-driven concurrency i.e. each iteration in the
+loops can run concurrently depending on the consumer rate:
+
+``` haskell
+main = runStreaming $ asyncly $ loops
+```
+
+To run it with full parallelism irrespective of demand:
+
+``` haskell
+main = runStreaming $ parallely $ loops
+```
+
+To run it serially but interleaving the outer and inner loop iterations:
+
+``` haskell
+main = runStreaming $ interleaving $ loops
+```
+
+You can fold multiple streams or IO actions using parallel combinators like
+`<|`, `<|>`. For example, to concurrently generate the squares and then
+concurrently sum the square roots of all combinations:
+
+``` haskell
+main = do
+ print $ sum $ asyncly $ do
+ -- Squaring is concurrent (<|)
+ x2 <- forEachWith (<|) [1..100] $ \x -> return $ x * x
+ y2 <- forEachWith (<|) [1..100] $ \y -> return $ y * y
+ -- sqrt is concurrent (asyncly)
+ return $ sqrt (x2 + y2)
+```
+
+Of course, the actions running in parallel could be arbitrary IO actions. To
+concurrently list the contents of a directory tree recursively:
+
+``` haskell
+import Path.IO (listDir, getCurrentDir)
+import Streamly
+
+main = runStreaming $ serially $ getCurrentDir >>= readdir
+ where readdir d = do
+ (dirs, files) <- lift $ listDir d
+ liftIO $ mapM_ putStrLn $ map show files
+ -- read the subdirs concurrently
+ foldMapWith (<|>) readdir dirs
+```
+
+In the above examples we do not think in terms of threads, locking or
+synchronization, rather we think in terms of what can run in parallel, the rest
+is taken care of automatically. With `asyncly` and `<|` the programmer does not
+have to worry about how many threads are to be created they are automatically
+adjusted based on the demand of the consumer.
+
+The concurrency facilities provided by streamly can be compared with
+[OpenMP](https://en.wikipedia.org/wiki/OpenMP) and
+[Cilk](https://en.wikipedia.org/wiki/Cilk) but with a more declarative
+expression. Concurrency support does not compromise performance in
+non-concurrent cases, the performance of the library is at par or better than
+most of the existing streaming libraries.
+
+## Streaming
+
+Streaming is effortless, simple and straightforward. Streamly data type behaves
+just like a list and combinators are provided in `Streamly.Prelude` to
+transform or fold streamly streams. Unlike other libraries and like `streaming`
+library the combinators explicitly consume a stream and produce a stream,
+therefore, no special operator is needed to join stream stages, just a forward
+(`$`) or reverse (`&`) function application operator is enough.
+
+```haskell
+import Streamly
+import Streamly.Prelude as S
+import Data.Function ((&))
+
+main = S.each [1..10]
+ & fmap (+ 1)
+ & S.drop 2
+ & S.filter even
+ & fmap (* 3)
+ & S.takeWhile (< 25)
+ & S.mapM (\x -> putStrLn ("saw " ++ show x) >> return x)
+ & S.toList . serially
+ >>= print
+```
+
+Fold style combinators can be used to fold purely or monadically. You can also
+use the beautiful `foldl` library for folding.
+
+```haskell
+main = S.each [1..10]
+ & serially
+ & S.foldl (+) 0 id
+ >>= print
+```
+
+Streams can be combined together in multiple ways:
+
+```haskell
+return 1 <> return 2 -- serial, combine atoms
+S.each [1..10] <> S.each [11..20] -- serial
+S.each [1..10] <| S.each [11..20] -- demand driven parallel
+S.each [1..10] <=> S.each [11..20] -- serial but interleaved
+S.each [1..10] <|> S.each [11..20] -- fully parallel
+```
+
+As we have already seen streams can be combined using monadic composition in a
+non-deterministic manner. This allows arbitrary manipulation and combining of
+streams. See `Streamly.Examples.MergeSortedStreams` for a more complicated
+example.
+
+## Reactive Programming (FRP)
+
+Streamly is a foundation for first class reactive programming as well by virtue
+of integrating concurrency and streaming. See `Streamly.Examples.AcidRainGame`
+and `Streamly.Examples.CirclingSquare` for an SDL based animation example.
+
+## Contributing
+
+The code is available under BSD-3 license [on
+github](https://github.com/composewell/streamly). Join the [gitter
+chat](https://gitter.im/composewell/streamly) channel for discussions. All
+contributions are welcome!
+
+This library was originally inspired by the `transient` package authored by
+Alberto G. Corona.
diff --git a/benchmark/Main.hs b/benchmark/Main.hs
new file mode 100644
index 0000000..0e64889
--- /dev/null
+++ b/benchmark/Main.hs
@@ -0,0 +1,301 @@
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE RankNTypes #-}
+
+module Main where
+
+import Control.Applicative (Alternative(..))
+import Control.Exception (assert)
+import Control.Monad (guard)
+import Criterion.Main (defaultMain, bgroup, bench, nfIO)
+import Data.Function ((&))
+
+import qualified Streamly as A
+import qualified Streamly.Prelude as A
+
+#ifdef EXTRA_BENCHMARKS
+import Control.Monad.IO.Class (MonadIO (liftIO))
+import Data.Atomics (atomicModifyIORefCAS)
+import Data.IORef (IORef, newIORef, writeIORef)
+import System.IO.Unsafe (unsafePerformIO)
+
+import qualified Conduit.Simple as S
+import qualified Control.Monad.Logic as LG
+import qualified Data.Machine as M
+#if MIN_VERSION_transient(0,5,1)
+import qualified Transient.Internals as T
+import qualified Transient.Indeterminism as T
+#endif
+import qualified ListT as LT
+#endif
+
+main :: IO ()
+main = do
+ -- XXX due to a GHC bug passing bind as an argument causes perf
+ -- degradation, so we should keep that in account when comparing.
+ let as = streamly_serial
+ ai = streamly_interleaved
+ aa = streamly_async
+ ap = streamly_parallel
+ defaultMain [
+ bgroup "streamly"
+ [ bench "function style all serial" $ nfIO streamly_function_style
+
+ , bgroup "serial bind"
+ [ bench "serial" $ nfIO (as (A.<>))
+ , bench "fair serial" $ nfIO (as (A.<=>))
+ , bench "left parallel" $ nfIO (as (A.<|))
+ , bench "fair parallel" $ nfIO (as (A.<|>))
+ ]
+
+ , bgroup "interleaved bind"
+ [ bench "serial" $ nfIO (ai (A.<>))
+ , bench "fair serial" $ nfIO (ai (A.<=>))
+ , bench "left parallel" $ nfIO (ai (A.<|))
+ , bench "fair parallel" $ nfIO (ai (A.<|>))
+ ]
+
+ , bgroup "async bind"
+ [ bench "serial" $ nfIO (aa (A.<>))
+ , bench "fair serial" $ nfIO (aa (A.<=>))
+ , bench "left parallel" $ nfIO (aa (A.<|))
+ , bench "fair parallel" $ nfIO (aa (A.<|>))
+ ]
+
+ , bgroup "parallel bind"
+ [ bench "serial" $ nfIO (ap (A.<>))
+ , bench "fair serial" $ nfIO (ap (A.<=>))
+ , bench "left parallel" $ nfIO (ap (A.<|))
+ , bench "fair parallel" $ nfIO (ap (A.<|>))
+ ]
+
+ -- Benchmark smallest possible actions composed together
+ , bgroup "serial bind nil"
+ [ bench "serial" $ nfIO (streamly_nil (A.<>))
+ , bench "fair serial" $ nfIO (streamly_nil (A.<=>))
+ , bench "left parallel" $ nfIO (streamly_nil (A.<|))
+ , bench "fair parallel" $ nfIO (streamly_nil (A.<|>))
+ ]
+ ]
+#ifdef EXTRA_BENCHMARKS
+#if MIN_VERSION_transient(0,5,1)
+ , bgroup "others"
+ [ bench "transient" $ nfIO transient_basic
+ , bench "transient-nil" $ nfIO transient_nil
+#endif
+ , bench "logict" $ nfIO logict_basic
+ , bench "list-t" $ nfIO list_t_basic
+ , bench "simple-conduit" $ nfIO simple_conduit_basic
+ , bench "simple-conduit-bind" $ nfIO simple_conduit_bind
+ , bench "machines" $ nfIO machines_basic
+ ]
+#endif
+ ]
+
+{-# INLINABLE map #-}
+map :: Monad m => (a -> Int) -> a -> m Int
+map f x = return $ f x
+
+{-# INLINABLE filter #-}
+filter :: (Monad m, Alternative m) => (a -> Bool) -> a -> m a
+filter cond x = guard (not $ cond x) >> return x
+
+amap :: Monad (s IO) => (Int -> Int) -> Int -> s IO Int
+amap = Main.map
+
+afilter :: (Alternative (s IO), Monad (s IO)) => (Int -> Bool) -> Int -> s IO Int
+afilter = Main.filter
+
+{-# INLINE streamly_basic #-}
+streamly_basic
+ :: (Alternative (t IO), Monad (t IO), A.Streaming t)
+ => (forall a. t IO a -> IO [a])
+ -> (t IO Int -> t IO Int -> t IO Int)
+ -> IO Int
+streamly_basic tl g = do
+ xs <- tl $ do
+ A.drop 100 (A.forEachWith g [1..100000 :: Int] $ \x ->
+ afilter even x >>= amap (+1))
+ >>= amap (+1)
+ >>= afilter (\y -> y `mod` 2 == 0)
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length xs)
+
+{-# INLINE streamly_function_style #-}
+streamly_function_style :: IO Int
+streamly_function_style = do
+ xs <- A.toList $ A.serially $
+ A.each [1..100000 :: Int]
+ & A.filter even
+ & fmap (+1)
+ & A.drop 100
+ & fmap (+1)
+ & A.filter (\y -> y `mod` 2 == 0)
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length xs)
+
+{-# INLINE streamly_serial #-}
+streamly_serial
+ :: (A.StreamT IO Int -> A.StreamT IO Int -> A.StreamT IO Int)
+ -> IO Int
+streamly_serial = streamly_basic (A.toList . A.serially)
+
+{-# INLINE streamly_interleaved #-}
+streamly_interleaved
+ :: (A.InterleavedT IO Int -> A.InterleavedT IO Int -> A.InterleavedT IO Int)
+ -> IO Int
+streamly_interleaved = streamly_basic (A.toList . A.interleaving)
+
+{-# INLINE streamly_async #-}
+streamly_async
+ :: (A.AsyncT IO Int -> A.AsyncT IO Int -> A.AsyncT IO Int)
+ -> IO Int
+streamly_async = streamly_basic (A.toList . A.asyncly)
+
+{-# INLINE streamly_parallel #-}
+streamly_parallel
+ :: (A.ParallelT IO Int -> A.ParallelT IO Int -> A.ParallelT IO Int)
+ -> IO Int
+streamly_parallel = streamly_basic (A.toList . A.parallely)
+
+{-# INLINE streamly_nil #-}
+streamly_nil :: (A.StreamT IO Int -> A.StreamT IO Int -> A.StreamT IO Int)
+ -> IO Int
+streamly_nil f = do
+ xs <- (A.toList . A.serially) $ do
+ (A.forEachWith f [1..100000:: Int] $
+ \x -> return x >>= return . id)
+ assert (Prelude.length xs == 100000) $
+ return (Prelude.length xs)
+
+#ifdef EXTRA_BENCHMARKS
+#if MIN_VERSION_transient(0,5,1)
+
+{-# NOINLINE count #-}
+count :: IORef Int
+count = unsafePerformIO $ newIORef 0
+
+drop :: (MonadIO m, Alternative m) => Int -> Int -> m Int
+drop num x = do
+
+ mn <- liftIO $ atomicModifyIORefCAS count $ \n ->
+ if n < num then (n + 1, False) else (n, True)
+ guard mn
+ return x
+
+tmap :: (a -> Int) -> a -> T.TransIO Int
+tmap = Main.map
+
+tfilter :: (a -> Bool) -> a -> T.TransIO a
+tfilter = Main.filter
+
+tdrop :: Int -> Int -> T.TransIO Int
+tdrop = Main.drop
+
+transient_basic :: IO (Maybe Int)
+
+transient_basic = T.keep' $ T.threads 0 $ do
+ liftIO $ writeIORef count 0
+ xs <- T.group 49900 $ do
+ T.choose [1..100000 :: Int]
+ >>= tfilter even
+ >>= tmap (+1)
+ >>= tdrop 100
+ >>= tmap (+1)
+ >>= tfilter (\x -> x `mod` 2 == 0)
+
+ assert (Prelude.length xs == 49900) $
+ T.exit (Prelude.length xs)
+
+transient_nil :: IO (Maybe Int)
+transient_nil = T.keep' $ T.threads 0 $ do
+ xs <- T.group 49900 $ do
+ T.choose [1..100000 :: Int]
+ assert (Prelude.length xs == 49900) $
+ T.exit (Prelude.length xs)
+#endif
+
+lfilter :: (Int -> Bool) -> Int -> LT.ListT IO Int
+lfilter = Main.filter
+
+lmap :: (Int -> Int) -> Int -> LT.ListT IO Int
+lmap = Main.map
+
+ldrop :: Int -> Int -> LT.ListT IO Int
+ldrop = Main.drop
+
+list_t_basic :: IO Int
+list_t_basic = do
+ writeIORef count 0
+ xs <- LT.toList $ do
+ LT.fromFoldable [1..100000 :: Int]
+ >>= lfilter even
+ >>= lmap (+1)
+ >>= ldrop 100
+ >>= lmap (+1)
+ >>= lfilter (\x -> x `mod` 2 == 0)
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length xs)
+
+lgfilter :: (Int -> Bool) -> Int -> LG.LogicT IO Int
+lgfilter = Main.filter
+
+lgmap :: (Int -> Int) -> Int -> LG.LogicT IO Int
+lgmap = Main.map
+
+lgdrop :: Int -> Int -> LG.LogicT IO Int
+lgdrop = Main.drop
+
+logict_basic :: IO Int
+logict_basic = do
+ writeIORef count 0
+ --xs <- LG.observeManyT 2900 $ do
+ xs <- LG.observeAllT $ do
+ LG.msum $ Prelude.map return [1..100000]
+ >>= lgfilter even
+ >>= lgmap (+1)
+ >>= lgdrop 100
+ >>= lgmap (+1)
+ >>= lgfilter (\x -> x `mod` 2 == 0)
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length xs)
+
+simple_conduit_basic :: IO Int
+simple_conduit_basic = do
+ xs <- S.sourceList [1..100000]
+ S.$= S.filterC even
+ S.$= S.mapC ((+1) :: Int -> Int)
+ S.$= S.dropC 100
+ S.$= S.mapC ((+1) :: Int -> Int)
+ S.$= S.filterC (\x -> x `mod` 2 == 0)
+ S.$$ S.sinkList
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length (xs :: [Int]))
+
+smap :: Monad (s IO) => (Int -> Int) -> Int -> s IO Int
+smap = Main.map
+
+sfilter :: (Alternative (s IO), Monad (s IO)) => (Int -> Bool) -> Int -> s IO Int
+sfilter = Main.filter
+
+{-# INLINE simple_conduit_bind #-}
+simple_conduit_bind :: IO Int
+simple_conduit_bind = do
+ xs <- S.sinkList $ do
+ S.dropC 100 (S.sourceList [1..100000 :: Int] >>= \x ->
+ sfilter even x >>= smap (+1))
+ >>= smap (+1)
+ >>= sfilter (\y -> y `mod` 2 == 0)
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length xs)
+
+machines_basic :: IO Int
+machines_basic = do
+ xs <- M.runT $ M.source [1..100000]
+ M.~> M.filtered even
+ M.~> M.mapping (+1)
+ M.~> M.dropping 100
+ M.~> M.mapping (+1)
+ M.~> M.filtered (\x -> x `mod` 2 == 0)
+ assert (Prelude.length xs == 49900) $
+ return (Prelude.length (xs ::[Int]))
+#endif
diff --git a/examples/loops.hs b/examples/loops.hs
new file mode 100644
index 0000000..d0f149b
--- /dev/null
+++ b/examples/loops.hs
@@ -0,0 +1,88 @@
+import Streamly
+import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
+
+main = do
+ liftIO $ hSetBuffering stdout LineBuffering
+
+ putStrLn $ "\nloopTail:\n"
+ runStreamT $ do
+ x <- loopTail 0
+ liftIO $ print (x :: Int)
+
+ putStrLn $ "\nloopHead:\n"
+ runStreamT $ do
+ x <- loopHead 0
+ liftIO $ print (x :: Int)
+
+ putStrLn $ "\nloopTailA:\n"
+ runStreamT $ do
+ x <- loopTailA 0
+ liftIO $ print (x :: Int)
+
+ putStrLn $ "\nloopHeadA:\n"
+ runStreamT $ do
+ x <- loopHeadA 0
+ liftIO $ print (x :: Int)
+
+ putStrLn $ "\ninterleave:\n"
+ runStreamT $ do
+ x <- return 0 <> return 1 <=> return 100 <> return 101
+ liftIO $ print (x :: Int)
+
+ putStrLn $ "\nParallel interleave:\n"
+ runStreamT $ do
+ x <- return 0 <> return 1 <|> return 100 <> return 101
+ liftIO $ print (x :: Int)
+
+ where
+
+-------------------------------------------------------------------------------
+-- Serial (single-threaded) stream generator loops
+-------------------------------------------------------------------------------
+
+ -- In a <> composition the action on the left is executed and only after it
+ -- finished then the action on the right is executed. In other words the
+ -- actions are run serially.
+
+ -- Generates a value and then loops. Can be used to generate an infinite
+ -- stream. Interleaves the generator and the consumer.
+ loopTail :: Int -> StreamT IO Int
+ loopTail x = do
+ liftIO $ putStrLn "LoopTail..."
+ return x <> (if x < 3 then loopTail (x + 1) else empty)
+
+ -- Loops and then generates a value. The consumer can run only after the
+ -- loop has finished. An infinite generator will not let the consumer run
+ -- at all.
+ loopHead :: Int -> StreamT IO Int
+ loopHead x = do
+ liftIO $ putStrLn "LoopHead..."
+ (if x < 3 then loopHead (x + 1) else empty) <> return x
+
+-------------------------------------------------------------------------------
+-- Concurrent (multi-threaded) adaptive demand-based stream generator loops
+-------------------------------------------------------------------------------
+
+ -- In a <| composition the action on the left is executed first. However,
+ -- if it is not fast enough to generate results at the consumer's speed
+ -- then the action on the right is also spawned concurrently. In other
+ -- words, both actions may run concurrently based on the need.
+
+ loopTailA :: Int -> StreamT IO Int
+ loopTailA x = do
+ liftIO $ putStrLn "LoopTailA..."
+ return x <| (if x < 3 then loopTailA (x + 1) else empty)
+
+ loopHeadA :: Int -> StreamT IO Int
+ loopHeadA x = do
+ liftIO $ putStrLn "LoopHeadA..."
+ (if x < 3 then loopHeadA (x + 1) else empty) <| return x
+
+-------------------------------------------------------------------------------
+-- Parallel (fairly scheduled, multi-threaded) stream generator loops
+-------------------------------------------------------------------------------
+
+ -- In a <|> composition both actions are run concurrently in a fair
+ -- manner, no one action is preferred over another. Both actions are
+ -- spawned right away in their own independent threads. In other words, the
+ -- actions will run concurrently.
diff --git a/examples/nested-loops.hs b/examples/nested-loops.hs
new file mode 100644
index 0000000..ea886c5
--- /dev/null
+++ b/examples/nested-loops.hs
@@ -0,0 +1,22 @@
+import Control.Applicative ((<|>), empty)
+import Control.Concurrent (myThreadId)
+import Control.Monad.IO.Class (liftIO)
+import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
+import System.Random (randomIO)
+import Streamly
+
+main = runStreamT $ do
+ liftIO $ hSetBuffering stdout LineBuffering
+ x <- loop "A " 2
+ y <- loop "B " 2
+ liftIO $ myThreadId >>= putStr . show
+ >> putStr " "
+ >> print (x, y)
+
+ where
+
+ loop name n = do
+ rnd <- liftIO (randomIO :: IO Int)
+ let result = (name ++ show rnd)
+ repeat = if n > 1 then loop name (n - 1) else empty
+ in (return result) <|> repeat
diff --git a/examples/parallel-loops.hs b/examples/parallel-loops.hs
new file mode 100644
index 0000000..61e1794
--- /dev/null
+++ b/examples/parallel-loops.hs
@@ -0,0 +1,20 @@
+import Control.Applicative ((<|>))
+import Control.Concurrent (myThreadId, threadDelay)
+import Control.Monad.IO.Class (liftIO)
+import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
+import System.Random (randomIO)
+import Streamly
+
+main = runStreamT $ do
+ liftIO $ hSetBuffering stdout LineBuffering
+ x <- loop "A" <|> loop "B"
+ liftIO $ myThreadId >>= putStr . show
+ >> putStr " "
+ >> print x
+
+ where
+
+ loop name = do
+ liftIO $ threadDelay 1000000
+ rnd <- liftIO (randomIO :: IO Int)
+ return (name, rnd) <|> loop name
diff --git a/src/Streamly.hs b/src/Streamly.hs
new file mode 100644
index 0000000..68efd82
--- /dev/null
+++ b/src/Streamly.hs
@@ -0,0 +1,249 @@
+-- |
+-- Module : Streamly
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+
+module Streamly
+ (
+ -- * Background
+ -- $background
+
+ -- * Overview
+ -- $overview
+
+ MonadAsync
+ , Streaming
+
+ -- * Product Style Composition
+ -- $product
+ , StreamT
+ , InterleavedT
+ , AsyncT
+ , ParallelT
+
+ -- * Zip Style Composition
+ -- $zipping
+ , ZipStream
+ , ZipAsync
+
+ -- * Sum Style Composition
+ -- $sum
+ , (<=>)
+ , (<|)
+
+ -- * Transformation
+ , async
+
+ -- * Stream Type Adapters
+ -- $adapters
+ , serially
+ , interleaving
+ , asyncly
+ , parallely
+ , zipping
+ , zippingAsync
+ , adapt
+
+ -- * Running Streams
+ , runStreaming
+ , runStreamT
+ , runInterleavedT
+ , runAsyncT
+ , runParallelT
+ , runZipStream
+ , runZipAsync
+
+ -- * Fold Utilities
+ -- $foldutils
+ , foldWith
+ , foldMapWith
+ , forEachWith
+
+ -- * Re-exports
+ , Monoid (..)
+ , Semigroup (..)
+ , Alternative (..)
+ , MonadPlus (..)
+ , MonadIO (..)
+ , MonadTrans (..)
+ )
+where
+
+import Streamly.Streams
+import Data.Semigroup (Semigroup(..))
+import Control.Applicative (Alternative(..))
+import Control.Monad (MonadPlus(..))
+import Control.Monad.IO.Class (MonadIO (..))
+import Control.Monad.Trans.Class (MonadTrans (..))
+
+-- $background
+--
+-- Streamly provides a monad transformer that extends the product style
+-- composition of monads to streams of many elements of the same type; it is a
+-- functional programming equivalent of nested loops from imperative
+-- programming. Composing each element in one stream with each element in the
+-- other stream generalizes the monadic product of single elements. You can
+-- think of the IO monad as a special case of the more general @StreamT IO@
+-- monad; with single element streams. List transformers and logic programming
+-- monads also provide a similar product style composition of streams, however
+-- streamly generalizes it with the time dimension; allowing streams to be
+-- composed in an asynchronous and concurrent fashion in many different ways.
+-- It also provides multiple alternative ways of composing streams e.g.
+-- serial, interleaved or concurrent.
+--
+-- The seemingly simple addition of asynchronicity and concurrency to product
+-- style streaming composition unifies a number of disparate abstractions into
+-- one powerful and elegant abstraction. A wide variety of programming
+-- problems can be solved elegantly with this abstraction. In particular, it
+-- unifies three major programming domains namely non-deterministic (logic)
+-- programming, concurrent programming and functional reactive programming. In
+-- other words, you can do everything with this one abstraction that you could
+-- with list transformers (e.g.
+-- <https://hackage.haskell.org/package/list-t list-t>), logic programming
+-- monads (e.g. <https://hackage.haskell.org/package/logict logict>),
+-- streaming libraries (a lot of what
+-- <https://hackage.haskell.org/package/conduit conduit> or
+-- <https://hackage.haskell.org/package/pipes pipes> can do), concurrency
+-- libraries (e.g. <https://hackage.haskell.org/package/async async>) and FRP
+-- libraries (e.g. <https://hackage.haskell.org/package/Yampa Yampa> or
+-- <https://hackage.haskell.org/package/reflex reflex>).
+
+-- $overview
+--
+-- Streamly provides six distinct stream types i.e. 'StreamT', 'InterleavedT',
+-- 'AsyncT' and 'ParallelT', 'ZipStream' and 'ZipAsync', each representing a
+-- stream of elements. All these types have the same underlying representation
+-- and can be adapted from one to another using type adaptor combinators
+-- described later. Each of these types belongs to the 'Streaming' type class
+-- which helps converting the specific type to and from the underlying generic
+-- stream type.
+--
+-- The types 'StreamT', 'InterleavedT', 'AsyncT' and 'ParallelT' are 'Monad'
+-- transformers with the monadic bind operation combining streams in a product
+-- style in much the same way as a list monad or a list transformer i.e. each
+-- element from one stream is combined with every element of the other stream.
+-- However, the applicative and monadic composition of these types differ in
+-- terms of the ordering and time sequence in which the elements from two
+-- streams are combined. 'StreamT' and 'InterleavedT' compose streams serially
+-- whereas 'AsyncT' and 'ParallelT' are their concurrent counterparts. See the
+-- documentation of the respective types for more details.
+--
+-- The types 'ZipStream' and 'ZipAsync' provide 'Applicative' instances to zip
+-- two streams together i.e. each element in one stream is combined with the
+-- corresponding element in the other stream. 'ZipStream' generates the streams
+-- being zipped serially whereas 'ZipAsync' produces both the elements being
+-- zipped concurrently.
+--
+-- Two streams of the same type can be combined using a sum style composition
+-- to generate a stream of the same type where the output stream would contain
+-- all elements of both the streams. However, the sequence in which the
+-- elements in the resulting stream are produced depends on the combining
+-- operator. Four distinct sum style operators, '<>', '<=>', '<|' and '<|>'
+-- combine two streams in different ways, each corresponding to the one of the
+-- four ways of combining monadically. See the respective section below for
+-- more details.
+--
+-- Concurrent composition types 'AsyncT', 'ParallelT', 'ZipAsync' and
+-- concurrent composition operators '<|' and '<|>' require the underlying monad
+-- of the streaming monad transformer to be 'MonadAsync'.
+--
+-- For more details please see the "Streamly.Tutorial" and "Streamly.Examples"
+-- (the latter is available only when built with the 'examples' build flag).
+
+-- A simple inline example here illustrating applicative, monad and alternative
+-- compositions.
+
+-- $product
+--
+-- Streams that compose serially or non-concurrently come in two flavors i.e.
+-- 'StreamT' and 'InterleavedT'. Both of these serial flavors have
+-- corresponding concurrent equivalents, those are 'AsyncT' and 'ParallelT'
+-- respectively.
+
+-- $zipping
+--
+-- 'ZipStream' and 'ZipAsync', provide 'Applicative' instances for zipping the
+-- corresponding elements of two streams together. Note that these types are
+-- not monads.
+
+-- $sum
+--
+-- Just like product style composition there are four distinct ways to combine
+-- streams in sum style each directly corresponding to one of the product style
+-- composition.
+--
+-- The standard semigroup append '<>' operator appends two streams serially,
+-- this style corresponds to the 'StreamT' style of monadic composition.
+--
+-- @
+-- main = ('toList' . 'serially' $ (return 1 <> return 2) <> (return 3 <> return 4)) >>= print
+-- @
+-- @
+-- [1,2,3,4]
+-- @
+--
+-- The standard 'Alternative' operator '<|>' fairly interleaves two streams in
+-- parallel, this operator corresponds to the 'ParallelT' style.
+--
+-- @
+-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<|\> (return 3 <> return 4)) >>= print
+-- @
+-- @
+-- [1,3,2,4]
+-- @
+--
+-- Unlike '<|', this operator cannot be used to fold infinite containers since
+-- that might accumulate too many partially drained streams. To be clear, it
+-- can combine infinite streams but not infinite number of streams.
+--
+-- Two additional sum style composition operators that streamly introduces are
+-- described below.
+
+-- $adapters
+--
+-- Code using streamly is usually written such that it is agnostic of any
+-- specific streaming type. We use a type variable (polymorphic type) with the
+-- 'Streaming' class constraint. Finally, when running the monad we can specify
+-- the actual type that we want to use to interpret the code. However, in
+-- certain cases we may want to use a specific type to force a certain type of
+-- composition. These combinators can be used to convert the stream types from
+-- one to another at no cost as all the types have the same underlying
+-- representation.
+--
+-- If you see an @ambiguous type variable@ error then most likely it is because
+-- you have not specified the stream type. You either need a type annotation or
+-- one of the following combinators to specify what type of stream you mean.
+--
+-- This code:
+--
+-- @
+-- main = ('toList' $ (return 1 <> return 2)) >>= print
+-- @
+--
+-- will result in a type error like this:
+--
+-- @
+-- Ambiguous type variable ‘t0’ arising from a use of ...
+-- @
+--
+-- To fix the error just tell 'toList' what kind of stream are we feeding it:
+--
+-- @
+-- main = ('toList' $ 'serially' $ (return 1 <> return 2)) >>= print
+-- @
+-- @
+-- main = ('toList' $ (return 1 <> return 2 :: StreamT IO Int)) >>= print
+-- @
+--
+-- Note that using the combinators is easier as you do not have to think about
+-- the specific types, they are just inferred.
+--
+
+-- $foldutils
+--
+-- These are some convenience functions to fold any 'Foldable' container using
+-- one of the sum composition operators to convert it into a streamly stream.
diff --git a/src/Streamly/Core.hs b/src/Streamly/Core.hs
new file mode 100644
index 0000000..5663ec5
--- /dev/null
+++ b/src/Streamly/Core.hs
@@ -0,0 +1,651 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE UndecidableInstances #-} -- XXX
+
+-- |
+-- Module : Streamly.Core
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+--
+--
+module Streamly.Core
+ (
+ MonadAsync
+
+ -- * Streams
+ , Stream (..)
+
+ -- * Construction
+ , scons
+ , snil
+
+ -- * Composition
+ , interleave
+
+ -- * Concurrent Stream Vars (SVars)
+ , SVar
+ , SVarSched (..)
+ , SVarTag (..)
+ , SVarStyle (..)
+ , newEmptySVar
+ , newStreamVar1
+ , newStreamVar2
+ , joinStreamVar2
+ , fromStreamVar
+ , toStreamVar
+
+ -- * Concurrent Streams
+ , parAlt
+ , parLeft
+ )
+where
+
+import Control.Applicative (Alternative (..))
+import Control.Concurrent (ThreadId, forkIO,
+ myThreadId, threadDelay)
+import Control.Concurrent.MVar (MVar, newEmptyMVar, tryTakeMVar,
+ tryPutMVar, takeMVar)
+import Control.Exception (SomeException (..))
+import qualified Control.Exception.Lifted as EL
+import Control.Monad (MonadPlus(..), mzero, when)
+import Control.Monad.Base (MonadBase (..), liftBaseDefault)
+import Control.Monad.Catch (MonadThrow, throwM)
+import Control.Monad.Error.Class (MonadError(..))
+import Control.Monad.IO.Class (MonadIO(..))
+import Control.Monad.Reader.Class (MonadReader(..))
+import Control.Monad.State.Class (MonadState(..))
+import Control.Monad.Trans.Class (MonadTrans (lift))
+import Control.Monad.Trans.Control (MonadBaseControl, liftBaseWith)
+import Data.Atomics (atomicModifyIORefCAS,
+ atomicModifyIORefCAS_)
+import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, pushL,
+ tryPopR, nullQ)
+import Data.Functor (void)
+import Data.IORef (IORef, modifyIORef, newIORef,
+ readIORef)
+import Data.Maybe (isNothing)
+import Data.Semigroup (Semigroup(..))
+import Data.Set (Set)
+import qualified Data.Set as S
+
+------------------------------------------------------------------------------
+-- Parent child thread communication type
+------------------------------------------------------------------------------
+
+-- | Events that a child thread may send to a parent thread.
+data ChildEvent a =
+ ChildYield a
+ | ChildStop ThreadId (Maybe SomeException)
+
+------------------------------------------------------------------------------
+-- State threaded around the monad for thread management
+------------------------------------------------------------------------------
+
+-- | Conjunction is used for monadic/product style composition. Disjunction is
+-- used for fold/sum style composition. We need to distiguish the two types of
+-- SVars so that the scheduling of the two is independent.
+data SVarTag = Conjunction | Disjunction deriving Eq
+
+-- | For fairly interleaved parallel composition the sched policy is FIFO
+-- whereas for left biased parallel composition it is LIFO.
+data SVarSched = LIFO | FIFO deriving Eq
+
+-- | Identify the type of the SVar. Two computations using the same style can
+-- be scheduled on the same SVar.
+data SVarStyle = SVarStyle SVarTag SVarSched deriving Eq
+
+-- | An SVar or a Stream Var is a conduit to the output from multiple streams
+-- running concurrently and asynchronously. An SVar can be thought of as an
+-- asynchronous IO handle. We can write any number of streams to an SVar in a
+-- non-blocking manner and then read them back at any time at any pace. The
+-- SVar would run the streams asynchronously and accumulate results. An SVar
+-- may not really execute the stream completely and accumulate all the results.
+-- However, it ensures that the reader can read the results at whatever paces
+-- it wants to read. The SVar monitors and adapts to the consumer's pace.
+--
+-- An SVar is a mini scheduler, it has an associated runqueue that holds the
+-- stream tasks to be picked and run by a pool of worker threads. It has an
+-- associated output queue where the output stream elements are placed by the
+-- worker threads. A doorBell is used by the worker threads to intimate the
+-- consumer thread about availability of new results in the output queue. More
+-- workers are added to the SVar by 'fromStreamVar' on demand if the output
+-- produced is not keeping pace with the consumer. On bounded SVars, workers
+-- block on the output queue to provide throttling of the producer when the
+-- consumer is not pulling fast enough. The number of workers may even get
+-- reduced depending on the consuming pace.
+--
+-- New work is enqueued either at the time of creation of the SVar or as a
+-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
+-- already enqueued computations get evaluated. See 'joinStreamVar2'.
+--
+data SVar m a =
+ SVar { outputQueue :: IORef [ChildEvent a]
+ , doorBell :: MVar Bool -- wakeup mechanism for outQ
+ , enqueue :: Stream m a -> IO ()
+ , runqueue :: m ()
+ , runningThreads :: IORef (Set ThreadId)
+ , queueEmpty :: m Bool
+ , svarStyle :: SVarStyle
+ }
+
+------------------------------------------------------------------------------
+-- The stream type
+------------------------------------------------------------------------------
+
+-- TBD use a functor instead of the bare type a?
+-- XXX remove the Maybe, use "empty" as the base case
+
+-- | Represents a monadic stream of values of type 'a' constructed using
+-- actions in monad 'm'. Streams can be composed sequentially or in parallel;
+-- in product style compositions (monadic bind multiplies streams in a ListT
+-- fashion) or in sum style compositions like 'Semigroup', 'Monoid',
+-- 'Alternative' or variants of these.
+newtype Stream m a =
+ Stream {
+ runStream :: forall r.
+ Maybe (SVar m a) -- local state
+ -> m r -- stop
+ -> (a -> Maybe (Stream m a) -> m r) -- yield
+ -> m r
+ }
+
+-- | A monad that can perform asynchronous/concurrent IO operations. Streams
+-- that can be composed concurrently require the underlying monad to be
+-- 'MonadAsync'.
+type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
+
+scons :: a -> Maybe (Stream m a) -> Stream m a
+scons a r = Stream $ \_ _ yld -> yld a r
+
+snil :: Stream m a
+snil = Stream $ \_ stp _ -> stp
+
+------------------------------------------------------------------------------
+-- Semigroup
+------------------------------------------------------------------------------
+
+-- | '<>' concatenates two streams sequentially i.e. the first stream is
+-- exhausted completely before yielding any element from the second stream.
+instance Semigroup (Stream m a) where
+ m1 <> m2 = go m1
+ where
+ go (Stream m) = Stream $ \_ stp yld ->
+ let stop = (runStream m2) Nothing stp yld
+ yield a Nothing = yld a (Just m2)
+ yield a (Just r) = yld a (Just (go r))
+ in m Nothing stop yield
+
+------------------------------------------------------------------------------
+-- Monoid
+------------------------------------------------------------------------------
+
+instance Monoid (Stream m a) where
+ mempty = Stream $ \_ stp _ -> stp
+ mappend = (<>)
+
+------------------------------------------------------------------------------
+-- Interleave
+------------------------------------------------------------------------------
+
+-- | Same as '<=>'.
+interleave :: Stream m a -> Stream m a -> Stream m a
+interleave m1 m2 = Stream $ \_ stp yld -> do
+ let stop = (runStream m2) Nothing stp yld
+ yield a Nothing = yld a (Just m2)
+ yield a (Just r) = yld a (Just (interleave m2 r))
+ (runStream m1) Nothing stop yield
+
+------------------------------------------------------------------------------
+-- Spawning threads and collecting result in streamed fashion
+------------------------------------------------------------------------------
+
+{-# INLINE doFork #-}
+doFork :: MonadBaseControl IO m
+ => m ()
+ -> (SomeException -> m ())
+ -> m ThreadId
+doFork action exHandler =
+ EL.mask $ \restore ->
+ liftBaseWith $ \runInIO -> forkIO $ do
+ -- XXX test the exception handling
+ _ <- runInIO $ EL.catch (restore action) exHandler
+ -- XXX restore state here?
+ return ()
+
+-- XXX exception safety of all atomic/MVar operations
+
+{-# INLINE send #-}
+send :: MonadIO m => SVar m a -> ChildEvent a -> m ()
+send sv msg = liftIO $ do
+ atomicModifyIORefCAS_ (outputQueue sv) $ \es -> msg : es
+ -- XXX need a memory barrier? The wake up must happen only after the
+ -- store has finished otherwise we can have lost wakeup problems.
+ void $ tryPutMVar (doorBell sv) True
+
+{-# INLINE sendStop #-}
+sendStop :: MonadIO m => SVar m a -> m ()
+sendStop sv = liftIO myThreadId >>= \tid -> send sv (ChildStop tid Nothing)
+
+-- Note: Left associated compositions can grow this queue to a large size
+{-# INLINE enqueueLIFO #-}
+enqueueLIFO :: IORef [Stream m a] -> Stream m a -> IO ()
+enqueueLIFO q m = atomicModifyIORefCAS_ q $ \ ms -> m : ms
+
+runqueueLIFO :: MonadIO m => SVar m a -> IORef [Stream m a] -> m ()
+runqueueLIFO sv q = run
+
+ where
+
+ run = do
+ work <- dequeue
+ case work of
+ Nothing -> sendStop sv
+ Just m -> (runStream m) (Just sv) run yield
+
+ sendit a = send sv (ChildYield a)
+ yield a Nothing = sendit a >> run
+ yield a (Just r) = sendit a >> (runStream r) (Just sv) run yield
+
+ dequeue = liftIO $ atomicModifyIORefCAS q $ \ ms ->
+ case ms of
+ [] -> ([], Nothing)
+ x : xs -> (xs, Just x)
+
+{-# INLINE enqueueFIFO #-}
+enqueueFIFO :: LinkedQueue (Stream m a) -> Stream m a -> IO ()
+enqueueFIFO = pushL
+
+runqueueFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m ()
+runqueueFIFO sv q = run
+
+ where
+
+ run = do
+ work <- dequeue
+ case work of
+ Nothing -> sendStop sv
+ Just m -> (runStream m) (Just sv) run yield
+
+ dequeue = liftIO $ tryPopR q
+ sendit a = send sv (ChildYield a)
+ yield a Nothing = sendit a >> run
+ yield a (Just r) = sendit a >> liftIO (enqueueFIFO q r) >> run
+
+-- Thread tracking is needed for two reasons:
+--
+-- 1) Killing threads on exceptions. Threads may not be allowed to go away by
+-- themselves because they may run for significant times before going away or
+-- worse they may be stuck in IO and never go away.
+--
+-- 2) To know when all threads are done.
+
+{-# NOINLINE addThread #-}
+addThread :: MonadIO m => SVar m a -> ThreadId -> m ()
+addThread sv tid =
+ liftIO $ modifyIORef (runningThreads sv) $ (\s -> S.insert tid s)
+
+{-# INLINE delThread #-}
+delThread :: MonadIO m => SVar m a -> ThreadId -> m ()
+delThread sv tid =
+ liftIO $ modifyIORef (runningThreads sv) $ (\s -> S.delete tid s)
+
+{-# INLINE allThreadsDone #-}
+allThreadsDone :: MonadIO m => SVar m a -> m Bool
+allThreadsDone sv = liftIO $ do
+ readIORef (runningThreads sv) >>= return . S.null
+
+{-# NOINLINE handleChildException #-}
+handleChildException :: MonadIO m => SVar m a -> SomeException -> m ()
+handleChildException sv e = do
+ tid <- liftIO myThreadId
+ send sv (ChildStop tid (Just e))
+
+{-# NOINLINE pushWorker #-}
+pushWorker :: MonadAsync m => SVar m a -> m ()
+pushWorker sv =
+ doFork (runqueue sv) (handleChildException sv) >>= addThread sv
+
+-- XXX When the queue is LIFO we can put a limit on the number of dispatches.
+-- Also, if a worker blocks on the output queue we can decide if we want to
+-- block or make it go away entirely, depending on the number of workers and
+-- the type of the queue.
+{-# INLINE sendWorkerWait #-}
+sendWorkerWait :: MonadAsync m => SVar m a -> m ()
+sendWorkerWait sv = do
+ case svarStyle sv of
+ SVarStyle _ LIFO -> liftIO $ threadDelay 200
+ SVarStyle _ FIFO -> liftIO $ threadDelay 0
+
+ output <- liftIO $ readIORef (outputQueue sv)
+ when (null output) $ do
+ done <- queueEmpty sv
+ if (not done)
+ then (pushWorker sv) >> sendWorkerWait sv
+ else void (liftIO $ takeMVar (doorBell sv))
+
+-- | Pull a stream from an SVar.
+{-# NOINLINE fromStreamVar #-}
+fromStreamVar :: MonadAsync m => SVar m a -> Stream m a
+fromStreamVar sv = Stream $ \_ stp yld -> do
+ -- XXX if reading the IORef is costly we can use a flag in the SVar to
+ -- indicate we are done.
+ done <- allThreadsDone sv
+ if done
+ then stp
+ else do
+ res <- liftIO $ tryTakeMVar (doorBell sv)
+ when (isNothing res) $ sendWorkerWait sv
+ list <- liftIO $ atomicModifyIORefCAS (outputQueue sv) $ \x -> ([], x)
+ -- To avoid lock overhead we read all events at once instead of reading
+ -- one at a time. We just reverse the list to process the events in the
+ -- order they arrived. Maybe we can use a queue instead?
+ (runStream $ processEvents (reverse list)) Nothing stp yld
+
+ where
+
+ handleException e tid = do
+ delThread sv tid
+ -- XXX implement kill async exception handling
+ -- liftIO $ readIORef (runningThreads sv) >>= mapM_ killThread
+ throwM e
+
+ {-# INLINE processEvents #-}
+ processEvents [] = Stream $ \_ stp yld -> do
+ done <- allThreadsDone sv
+ if not done
+ then (runStream (fromStreamVar sv)) Nothing stp yld
+ else stp
+
+ processEvents (ev : es) = Stream $ \_ stp yld -> do
+ let continue = (runStream (processEvents es)) Nothing stp yld
+ yield a = yld a (Just (processEvents es))
+
+ case ev of
+ ChildYield a -> yield a
+ ChildStop tid e ->
+ case e of
+ Nothing -> delThread sv tid >> continue
+ Just ex -> handleException ex tid
+
+getFifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
+getFifoSVar ctype = do
+ outQ <- newIORef []
+ outQMv <- newEmptyMVar
+ running <- newIORef S.empty
+ q <- newQ
+ let sv =
+ SVar { outputQueue = outQ
+ , doorBell = outQMv
+ , runningThreads = running
+ , runqueue = runqueueFIFO sv q
+ , enqueue = pushL q
+ , queueEmpty = liftIO $ nullQ q
+ , svarStyle = ctype
+ }
+ in return sv
+
+getLifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
+getLifoSVar ctype = do
+ outQ <- newIORef []
+ outQMv <- newEmptyMVar
+ running <- newIORef S.empty
+ q <- newIORef []
+ let checkEmpty = liftIO (readIORef q) >>= return . null
+ let sv =
+ SVar { outputQueue = outQ
+ , doorBell = outQMv
+ , runningThreads = running
+ , runqueue = runqueueLIFO sv q
+ , enqueue = enqueueLIFO q
+ , queueEmpty = checkEmpty
+ , svarStyle = ctype
+ }
+ in return sv
+
+-- | Create a new empty SVar.
+newEmptySVar :: MonadAsync m => SVarStyle -> m (SVar m a)
+newEmptySVar style = do
+ sv <- liftIO $
+ case style of
+ SVarStyle _ FIFO -> do
+ c <- getFifoSVar style
+ return c
+ SVarStyle _ LIFO -> do
+ c <- getLifoSVar style
+ return c
+ return sv
+
+-- | Create a new SVar and enqueue one stream computation on it.
+newStreamVar1 :: MonadAsync m => SVarStyle -> Stream m a -> m (SVar m a)
+newStreamVar1 style m = do
+ sv <- newEmptySVar style
+ -- Note: We must have all the work on the queue before sending the
+ -- pushworker, otherwise the pushworker may exit before we even get a
+ -- chance to push.
+ liftIO $ (enqueue sv) m
+ pushWorker sv
+ return sv
+
+-- | Create a new SVar and enqueue two stream computations on it.
+newStreamVar2 :: MonadAsync m
+ => SVarStyle -> Stream m a -> Stream m a -> m (SVar m a)
+newStreamVar2 style m1 m2 = do
+ -- Note: We must have all the work on the queue before sending the
+ -- pushworker, otherwise the pushworker may exit before we even get a
+ -- chance to push.
+ sv <- liftIO $
+ case style of
+ SVarStyle _ FIFO -> do
+ c <- getFifoSVar style
+ (enqueue c) m1 >> (enqueue c) m2
+ return c
+ SVarStyle _ LIFO -> do
+ c <- getLifoSVar style
+ (enqueue c) m2 >> (enqueue c) m1
+ return c
+ pushWorker sv
+ return sv
+
+-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
+-- be read back from the SVar using 'fromSVar'.
+toStreamVar :: MonadAsync m => SVar m a -> Stream m a -> m ()
+toStreamVar sv m = do
+ liftIO $ (enqueue sv) m
+ done <- allThreadsDone sv
+ -- XXX there may be a race here unless we are running in the consumer
+ -- thread. This is safe only when called from the consumer thread or when
+ -- no consumer is present.
+ when done $ pushWorker sv
+
+------------------------------------------------------------------------------
+-- Running streams concurrently
+------------------------------------------------------------------------------
+
+-- Concurrency rate control. Our objective is to create more threads on demand
+-- if the consumer is running faster than us. As soon as we encounter an
+-- Alternative composition we create a push pull pair of threads. We use a
+-- channel for communication between the consumer pulling from the channel and
+-- the producer who pushing to the channel. The producer creates more threads
+-- if no output is seen on the channel, that is the consumer is running faster.
+-- However this mechanism can be problematic if the initial production latency
+-- is high, we may end up creating too many threads. So we need some way to
+-- monitor and use the latency as well.
+--
+-- TBD We may run computations at the lower level of the composition tree
+-- serially even if they are composed using a parallel combinator. We can use
+-- <> in place of <| and <=> in place of <|>. If we find that a parallel
+-- channel immediately above a computation becomes empty we can switch to
+-- parallelizing the computation. For that we can use a state flag to fork the
+-- rest of the computation at any point of time inside the Monad bind operation
+-- if the consumer is running at a faster speed.
+--
+-- TBD the alternative composition allows us to dispatch a chunkSize of only 1.
+-- If we have to dispatch in arbitrary chunksizes we will need to compose the
+-- parallel actions using a data constructor (Free Alternative) instead so that
+-- we can divide it in chunks of arbitrary size before dispatch. If the stream
+-- is composed of hierarchically composed grains of different sizes then we can
+-- always switch to a desired granularity depending on the consumer speed.
+--
+-- TBD for pure work (when we are not in the IO monad) we can divide it into
+-- just the number of CPUs.
+
+{-# NOINLINE withNewSVar2 #-}
+withNewSVar2 :: MonadAsync m
+ => SVarStyle -> Stream m a -> Stream m a -> Stream m a
+withNewSVar2 style m1 m2 = Stream $ \_ stp yld -> do
+ sv <- newStreamVar2 style m1 m2
+ (runStream (fromStreamVar sv)) Nothing stp yld
+
+-- | Join two computations on the currently running 'SVar' queue for concurrent
+-- execution. The 'SVarStyle' required by the current composition context is
+-- passed as one of the parameters. If the style does not match with the style
+-- of the current 'SVar' we create a new 'SVar' and schedule the computations
+-- on that. The newly created SVar joins as one of the computations on the
+-- current SVar queue.
+--
+-- When we are using parallel composition, an SVar is passed around as a state
+-- variable. We try to schedule a new parallel computation on the SVar passed
+-- to us. The first time, when no SVar exists, a new SVar is created.
+-- Subsequently, 'joinStreamVar2' may get called when a computation already
+-- scheduled on the SVar is further evaluated. For example, when (a \<|> b) is
+-- evaluated it calls a 'joinStreamVar2' to put 'a' and 'b' on the current scheduler
+-- queue. However, if the scheduling and composition style of the new
+-- computation being scheduled is different than the style of the current SVar,
+-- then we create a new SVar and schedule it on that.
+--
+-- For example:
+--
+-- * (x \<|> y) \<|> (t \<|> u) -- all of them get scheduled on the same SVar
+-- * (x \<|> y) \<|> (t \<| u) -- @t@ and @u@ get scheduled on a new child SVar
+-- because of the scheduling policy change.
+-- * if we 'adapt' a stream of type 'AsyncT' to a stream of type
+-- 'ParallelT', we create a new SVar at the transitioning bind.
+-- * When the stream is switching from disjunctive composition to conjunctive
+-- composition and vice-versa we create a new SVar to isolate the scheduling
+-- of the two.
+--
+{-# INLINE joinStreamVar2 #-}
+joinStreamVar2 :: MonadAsync m
+ => SVarStyle -> Stream m a -> Stream m a -> Stream m a
+joinStreamVar2 style m1 m2 = Stream $ \st stp yld -> do
+ case st of
+ Just sv | svarStyle sv == style ->
+ liftIO ((enqueue sv) m2) >> (runStream m1) st stp yld
+ _ -> (runStream (withNewSVar2 style m1 m2)) Nothing stp yld
+
+------------------------------------------------------------------------------
+-- Semigroup and Monoid style compositions for parallel actions
+------------------------------------------------------------------------------
+
+{-
+-- | Same as '<>|'.
+parAhead :: Stream m a -> Stream m a -> Stream m a
+parAhead = undefined
+
+-- | Sequential composition similar to '<>' except that it can execute the
+-- action on the right in parallel ahead of time. Returns the results in
+-- sequential order like '<>' from left to right.
+(<>|) :: Stream m a -> Stream m a -> Stream m a
+(<>|) = parAhead
+-}
+
+-- | Same as '<|>'. Since this schedules all the composed streams fairly you
+-- cannot fold infinite number of streams using this operation.
+{-# INLINE parAlt #-}
+parAlt :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
+parAlt = joinStreamVar2 (SVarStyle Disjunction FIFO)
+
+-- | Same as '<|'. Since this schedules the left side computation first you can
+-- right fold an infinite container using this operator. However a left fold
+-- will not work well as it first unpeels the whole structure before scheduling
+-- a computation requiring an amount of memory proportional to the size of the
+-- structure.
+{-# INLINE parLeft #-}
+parLeft :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
+parLeft = joinStreamVar2 (SVarStyle Disjunction LIFO)
+
+-------------------------------------------------------------------------------
+-- Instances (only used for deriving newtype instances)
+-------------------------------------------------------------------------------
+
+-- Stream type is not exposed, these instances are only for deriving instances
+-- for the newtype wrappers based on Stream.
+
+-- Dummy Instances, defined to enable the definition of other instances that
+-- require a Monad constraint. Must be defined by the newtypes.
+
+instance Monad m => Functor (Stream m) where
+ fmap = undefined
+
+instance Monad m => Applicative (Stream m) where
+ pure = undefined
+ (<*>) = undefined
+
+instance Monad m => Monad (Stream m) where
+ return = pure
+ (>>=) = undefined
+
+------------------------------------------------------------------------------
+-- Alternative & MonadPlus
+------------------------------------------------------------------------------
+
+-- | `empty` represents an action that takes non-zero time to complete. Since
+-- all actions take non-zero time, an `Alternative` composition ('<|>') is a
+-- monoidal composition executing all actions in parallel, it is similar to
+-- '<>' except that it runs all the actions in parallel and interleaves their
+-- results fairly.
+instance MonadAsync m => Alternative (Stream m) where
+ empty = mempty
+ (<|>) = parAlt
+
+instance MonadAsync m => MonadPlus (Stream m) where
+ mzero = empty
+ mplus = (<|>)
+
+-------------------------------------------------------------------------------
+-- Transformer
+-------------------------------------------------------------------------------
+
+instance MonadTrans Stream where
+ lift mx = Stream $ \_ _ yld -> mx >>= (\a -> (yld a Nothing))
+
+instance (MonadBase b m, Monad m) => MonadBase b (Stream m) where
+ liftBase = liftBaseDefault
+
+------------------------------------------------------------------------------
+-- Standard transformer instances
+------------------------------------------------------------------------------
+
+instance MonadIO m => MonadIO (Stream m) where
+ liftIO = lift . liftIO
+
+instance MonadThrow m => MonadThrow (Stream m) where
+ throwM = lift . throwM
+
+-- XXX handle and test cross thread state transfer
+instance MonadError e m => MonadError e (Stream m) where
+ throwError = lift . throwError
+ catchError m h = Stream $ \st stp yld ->
+ let handle r = r `catchError` \e -> (runStream (h e)) st stp yld
+ yield a Nothing = yld a Nothing
+ yield a (Just r) = yld a (Just (catchError r h))
+ in handle $ (runStream m) st stp yield
+
+instance MonadReader r m => MonadReader r (Stream m) where
+ ask = lift ask
+ local f m = Stream $ \st stp yld ->
+ let yield a Nothing = local f $ yld a Nothing
+ yield a (Just r) = local f $ yld a (Just (local f r))
+ in (runStream m) st (local f stp) yield
+
+instance MonadState s m => MonadState s (Stream m) where
+ get = lift get
+ put x = lift (put x)
+ state k = lift (state k)
diff --git a/src/Streamly/Examples.hs b/src/Streamly/Examples.hs
new file mode 100644
index 0000000..e0e3890
--- /dev/null
+++ b/src/Streamly/Examples.hs
@@ -0,0 +1,60 @@
+{-# LANGUAGE CPP #-}
+-- |
+-- Module : Streamly.Examples
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+--
+-- To run these examples:
+--
+-- You need to build the library with the "examples" flag on e.g.
+-- @stack build --flag streamly:examples@. To include the SDL examples as well
+-- use @stack build --flag streamly:examples-sdl@. You will have to make sure
+-- that you have the SDL OS package installed on your system and the headers
+-- are visible to Haskell build tool.
+--
+-- You can directly evaluate the respective file and its main function using
+-- ghc, like this (this may not work when built with @examples-sdl@ flag):
+--
+-- @
+-- \$ stack ghc -- -e acidRainGame src\/Streamly\/Examples\/AcidRainGame.hs
+-- @
+--
+-- Alternatively, you can create a file calling the main function and compile
+-- it:
+--
+-- @
+-- \$ cat ex.hs
+-- import Streamly.Examples
+-- main = acidRainGame
+-- \$ stack ghc ex.hs
+-- @
+--
+-- Alternatively, you can just import "Streamly.Examples" and evaluate the
+-- respective function in GHCi.
+--
+module Streamly.Examples
+ (
+ -- Reactive Programming
+ acidRainGame
+#ifdef EXAMPLES_SDL
+ , circlingSquare
+#endif
+
+ -- Concurrent Programming
+ , listDirRecursive
+ , mergeSortedStreams
+ , searchEngineQuery
+ )
+where
+
+import Streamly.Examples.AcidRainGame
+#ifdef EXAMPLES_SDL
+import Streamly.Examples.CirclingSquare
+#endif
+import Streamly.Examples.ListDirRecursive
+import Streamly.Examples.MergeSortedStreams
+import Streamly.Examples.SearchEngineQuery
diff --git a/src/Streamly/Examples/AcidRainGame.hs b/src/Streamly/Examples/AcidRainGame.hs
new file mode 100644
index 0000000..8f4ce02
--- /dev/null
+++ b/src/Streamly/Examples/AcidRainGame.hs
@@ -0,0 +1,46 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+-- This example is adapted from Gabriel Gonzalez's pipes-concurrency package.
+-- https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html
+
+module Streamly.Examples.AcidRainGame where
+
+import Streamly
+import Control.Concurrent (threadDelay)
+import Control.Monad (when)
+import Control.Monad.State (MonadState, get, modify, runStateT)
+import Data.Semigroup (cycle1)
+
+data Event = Harm Int | Heal Int | Quit deriving (Show)
+
+userAction :: MonadIO m => StreamT m Event
+userAction = cycle1 $ liftIO askUser
+ where
+ askUser = do
+ command <- getLine
+ case command of
+ "potion" -> return (Heal 10)
+ "quit" -> return Quit
+ _ -> putStrLn "What?" >> askUser
+
+acidRain :: MonadIO m => StreamT m Event
+acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1)
+
+game :: (MonadAsync m, MonadState Int m) => StreamT m ()
+game = do
+ event <- userAction <|> acidRain
+ case event of
+ Harm n -> modify $ \h -> h - n
+ Heal n -> modify $ \h -> h + n
+ Quit -> fail "quit"
+
+ h <- get
+ when (h <= 0) $ fail "You die!"
+ liftIO $ putStrLn $ "Health = " ++ show h
+
+acidRainGame :: IO ()
+acidRainGame = do
+ putStrLn "Your health is deteriorating due to acid rain,\
+ \ type \"potion\" or \"quit\""
+ _ <- runStateT (runStreamT game) 60
+ return ()
diff --git a/src/Streamly/Examples/CirclingSquare.hs b/src/Streamly/Examples/CirclingSquare.hs
new file mode 100644
index 0000000..08151d6
--- /dev/null
+++ b/src/Streamly/Examples/CirclingSquare.hs
@@ -0,0 +1,90 @@
+-- Adapted from the Yampa package.
+-- Displays a square moving in a circle. To move the position drag the mouse.
+--
+-- Requires the SDL package, assuming streamly has already been built, you can
+-- compile it like this:
+-- stack ghc --package SDL circle-mouse.hs
+
+module Streamly.Examples.CirclingSquare where
+
+import Data.IORef
+import Graphics.UI.SDL as SDL
+import Streamly
+import Streamly.Time
+
+------------------------------------------------------------------------------
+-- SDL Graphics Init
+------------------------------------------------------------------------------
+
+sdlInit :: IO ()
+sdlInit = do
+ SDL.init [InitVideo]
+
+ let width = 640
+ height = 480
+ _ <- SDL.setVideoMode width height 16 [SWSurface]
+ SDL.setCaption "Test" ""
+
+------------------------------------------------------------------------------
+-- Display a box at a given coordinates
+------------------------------------------------------------------------------
+
+display :: (Double, Double) -> IO ()
+display (playerX, playerY) = do
+ screen <- getVideoSurface
+
+ -- Paint screen green
+ let format = surfaceGetPixelFormat screen
+ bgColor <- mapRGB format 55 60 64
+ _ <- fillRect screen Nothing bgColor
+
+ -- Paint small red square, at an angle 'angle' with respect to the center
+ foreC <- mapRGB format 212 108 73
+ let side = 10
+ x = round playerX
+ y = round playerY
+ _ <- fillRect screen (Just (Rect x y side side)) foreC
+
+ -- Double buffering
+ SDL.flip screen
+
+------------------------------------------------------------------------------
+-- Wait and update Controller Position if it changes
+------------------------------------------------------------------------------
+
+refreshRate :: Int
+refreshRate = 40
+
+updateController :: IORef (Double, Double) -> IO ()
+updateController ref = periodic refreshRate $ do
+ e <- pollEvent
+ case e of
+ MouseMotion x y _ _ -> do
+ writeIORef ref (fromIntegral x, fromIntegral y)
+ _ -> return ()
+
+------------------------------------------------------------------------------
+-- Periodically refresh the output display
+------------------------------------------------------------------------------
+
+updateDisplay :: IORef (Double, Double) -> IO ()
+updateDisplay cref = withClock clock refreshRate displaySquare
+
+ where
+
+ clock = do
+ t <- SDL.getTicks
+ return ((fromIntegral t) * 1000)
+
+ speed = 8
+ radius = 30
+ displaySquare time = do
+ (x, y) <- readIORef cref
+ let t = (fromIntegral time) * speed / 1000000
+ in display (x + cos t * radius, y + sin t * radius)
+
+circlingSquare :: IO ()
+circlingSquare = do
+ sdlInit
+ cref <- newIORef (0,0)
+ runStreamT $ liftIO (updateController cref) <|> liftIO (updateDisplay cref)
diff --git a/src/Streamly/Examples/ListDirRecursive.hs b/src/Streamly/Examples/ListDirRecursive.hs
new file mode 100644
index 0000000..e52ac7e
--- /dev/null
+++ b/src/Streamly/Examples/ListDirRecursive.hs
@@ -0,0 +1,19 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+module Streamly.Examples.ListDirRecursive where
+
+import Path.IO (listDir, getCurrentDir)
+import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
+import Streamly
+
+listDirRecursive :: IO ()
+listDirRecursive = do
+ liftIO $ hSetBuffering stdout LineBuffering
+ runStreamT $ getCurrentDir >>= readdir
+ where readdir d = do
+ (ds, fs) <- lift $ listDir d
+ liftIO $ mapM_ putStrLn $ map show fs ++ map show ds
+ --foldWith (<>) $ map readdir ds -- serial
+ --foldWith (<=>) $ map readdir ds -- serial interleaved
+ foldWith (<|) $ map readdir ds -- concurrent left biased
+ --foldWith (<|>) $ map readdir ds -- concurrent interleaved
diff --git a/src/Streamly/Examples/MergeSortedStreams.hs b/src/Streamly/Examples/MergeSortedStreams.hs
new file mode 100644
index 0000000..d39a8a7
--- /dev/null
+++ b/src/Streamly/Examples/MergeSortedStreams.hs
@@ -0,0 +1,41 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+module Streamly.Examples.MergeSortedStreams where
+
+import Data.Word
+import System.Random (getStdGen, randoms)
+import Data.List (sort)
+import Streamly
+import qualified Streamly.Prelude as A
+
+getSorted :: MonadIO m => StreamT m Word16
+getSorted = do
+ g <- liftIO getStdGen
+ let ls = take 100000 (randoms g) :: [Word16]
+ foldMapWith (<>) return (sort ls)
+
+mergeAsync :: (Ord a, MonadAsync m)
+ => StreamT m a -> StreamT m a -> StreamT m a
+mergeAsync a b = do
+ x <- lift $ async a
+ y <- lift $ async b
+ merge x y
+
+merge :: (Ord a, MonadAsync m) => StreamT m a -> StreamT m a -> StreamT m a
+merge a b = do
+ a1 <- lift $ A.uncons a
+ case a1 of
+ Nothing -> b
+ Just (x, ma) -> do
+ b1 <- lift $ A.uncons b
+ case b1 of
+ Nothing -> return x <> ma
+ Just (y, mb) ->
+ if (y < x)
+ then (return y) <> merge (return x <> ma) mb
+ else (return x) <> merge ma (return y <> mb)
+
+mergeSortedStreams :: IO ()
+mergeSortedStreams = do
+ xs <- A.toList $ mergeAsync getSorted getSorted
+ putStrLn $ show $ length xs
diff --git a/src/Streamly/Examples/SearchEngineQuery.hs b/src/Streamly/Examples/SearchEngineQuery.hs
new file mode 100644
index 0000000..313ea5d
--- /dev/null
+++ b/src/Streamly/Examples/SearchEngineQuery.hs
@@ -0,0 +1,19 @@
+module Streamly.Examples.SearchEngineQuery where
+
+import Streamly
+import Network.HTTP.Simple
+
+-- Runs three search engine queries in parallel.
+searchEngineQuery :: IO ()
+searchEngineQuery = do
+ putStrLn "Using parallel alternative"
+ runStreamT $ google <|> bing <|> duckduckgo
+
+ putStrLn "\nUsing parallel applicative zip"
+ runZipAsync $ (,,) <$> pure google <*> pure bing <*> pure duckduckgo
+
+ where
+ get s = liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s))
+ google = get "https://www.google.com/search?q=haskell"
+ bing = get "https://www.bing.com/search?q=haskell"
+ duckduckgo = get "https://www.duckduckgo.com/?q=haskell"
diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs
new file mode 100644
index 0000000..0871526
--- /dev/null
+++ b/src/Streamly/Prelude.hs
@@ -0,0 +1,430 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving#-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE StandaloneDeriving #-}
+{-# LANGUAGE UndecidableInstances #-} -- XXX
+
+-- |
+-- Module : Streamly.Prelude
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+--
+--
+module Streamly.Prelude
+ (
+ -- * Construction
+ cons
+ , nil
+ , unfoldr
+ , unfoldrM
+ , each
+ , fromHandle
+
+ -- * Elimination
+ , foldr
+ , foldrM
+ , foldl
+ , foldlM
+ , uncons
+
+ -- * Elimination Special Folds
+ , toList
+ , toHandle
+ , all
+ , any
+ , sum
+ , product
+ , head
+ , last
+ , length
+ , elem
+ , notElem
+ , maximum
+ , minimum
+
+ -- * Filtering
+ , filter
+ , take
+ , takeWhile
+ , drop
+ , dropWhile
+
+ -- * Transformation
+ , mapM
+ , mapM_
+ , sequence
+
+ -- * Zipping
+ , zipWith
+ , zipWithM
+ , zipAsyncWith
+ , zipAsyncWithM
+ )
+where
+
+import Control.Monad (liftM)
+import Control.Monad.IO.Class (MonadIO(..))
+import Data.Semigroup (Semigroup(..))
+import Prelude hiding (filter, drop, dropWhile, take,
+ takeWhile, zipWith, foldr, foldl,
+ mapM, mapM_, sequence, all, any,
+ sum, product, elem, notElem,
+ maximum, minimum, head, last,
+ length)
+import qualified Prelude as Prelude
+import qualified System.IO as IO
+
+import Streamly.Core
+import Streamly.Streams
+
+------------------------------------------------------------------------------
+-- Construction
+------------------------------------------------------------------------------
+
+-- | Build a Stream by unfolding pure steps starting from a seed.
+unfoldr :: Streaming t => (b -> Maybe (a, b)) -> b -> t m a
+unfoldr step = fromStream . go
+ where
+ go s = Stream $ \_ stp yld -> do
+ case step s of
+ Nothing -> stp
+ Just (a, b) -> yld a (Just (go b))
+
+-- | Build a Stream by unfolding monadic steps starting from a seed.
+unfoldrM :: (Streaming t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM step = fromStream . go
+ where
+ go s = Stream $ \_ stp yld -> do
+ mayb <- step s
+ case mayb of
+ Nothing -> stp
+ Just (a, b) -> yld a (Just (go b))
+
+-- XXX need eachInterleaved, eachAsync, eachParallel
+-- | Same as @foldWith (<>)@ but more efficient.
+{-# INLINE each #-}
+each :: (Foldable f, Streaming t) => f a -> t m a
+each xs = Prelude.foldr cons nil xs
+
+-- | Read lines from an IO Handle into a stream of Strings.
+fromHandle :: (MonadIO m, Streaming t) => IO.Handle -> t m String
+fromHandle h = fromStream $ go
+ where
+ go = Stream $ \_ stp yld -> do
+ eof <- liftIO $ IO.hIsEOF h
+ if eof
+ then stp
+ else do
+ str <- liftIO $ IO.hGetLine h
+ yld str (Just go)
+
+------------------------------------------------------------------------------
+-- Elimination
+------------------------------------------------------------------------------
+
+-- Parallel variants of folds?
+
+-- | Right fold.
+foldr :: (Monad m, Streaming t) => (a -> b -> b) -> b -> t m a -> m b
+foldr step acc m = go (toStream m)
+ where
+ go m1 =
+ let stop = return acc
+ yield a Nothing = return (step a acc)
+ yield a (Just x) = go x >>= \b -> return (step a b)
+ in (runStream m1) Nothing stop yield
+
+-- | Right fold with a monadic step function. See 'toList' for an example use.
+{-# INLINE foldrM #-}
+foldrM :: Streaming t => (a -> m b -> m b) -> m b -> t m a -> m b
+foldrM step acc m = go (toStream m)
+ where
+ go m1 =
+ let stop = acc
+ yield a Nothing = step a acc
+ yield a (Just x) = step a (go x)
+ in (runStream m1) Nothing stop yield
+
+-- | Strict left fold. This is typed to work with the foldl package. To use
+-- directly pass 'id' as the third argument.
+foldl :: (Monad m, Streaming t)
+ => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
+foldl step begin done m = go begin (toStream m)
+ where
+ go !acc m1 =
+ let stop = return (done acc)
+ yield a Nothing = return (done (step acc a))
+ yield a (Just x) = go (step acc a) x
+ in (runStream m1) Nothing stop yield
+
+-- | Strict left fold, with monadic step function. This is typed to work
+-- with the foldl package. To use directly pass 'id' as the third argument.
+foldlM :: (Monad m, Streaming t)
+ => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
+foldlM step begin done m = go begin (toStream m)
+ where
+ go !acc m1 =
+ let stop = acc >>= done
+ yield a Nothing = acc >>= \b -> step b a >>= done
+ yield a (Just x) = acc >>= \b -> go (step b a) x
+ in (runStream m1) Nothing stop yield
+
+-- | Decompose a stream into its head and tail. If the stream is empty, returns
+-- 'Nothing'. If the stream is non-empty, returns 'Just (a, ma)', where 'a' is
+-- the head of the stream and 'ma' its tail.
+uncons :: (Streaming t, Monad m) => t m a -> m (Maybe (a, t m a))
+uncons m =
+ let stop = return Nothing
+ yield a Nothing = return (Just (a, nil))
+ yield a (Just x) = return (Just (a, (fromStream x)))
+ in (runStream (toStream m)) Nothing stop yield
+
+-- | Write a stream of Strings to an IO Handle.
+toHandle :: (Streaming t, MonadIO m) => IO.Handle -> t m String -> m ()
+toHandle h m = go (toStream m)
+ where
+ go m1 =
+ let stop = return ()
+ yield a Nothing = liftIO (IO.hPutStrLn h a)
+ yield a (Just x) = liftIO (IO.hPutStrLn h a) >> go x
+ in (runStream m1) Nothing stop yield
+
+------------------------------------------------------------------------------
+-- Special folds
+------------------------------------------------------------------------------
+
+-- | Convert a stream into a list in the underlying monad.
+{-# INLINABLE toList #-}
+toList :: (Monad m, Streaming t) => t m a -> m [a]
+toList = foldrM (\a xs -> liftM (a :) xs) (return [])
+
+-- | Take first 'n' elements from the stream and discard the rest.
+take :: Streaming t => Int -> t m a -> t m a
+take n m = fromStream $ go n (toStream m)
+ where
+ go n1 m1 = Stream $ \ctx stp yld -> do
+ let yield a Nothing = yld a Nothing
+ yield a (Just x) = yld a (Just (go (n1 - 1) x))
+ if (n1 <= 0)
+ then stp
+ else (runStream m1) ctx stp yield
+
+-- XXX This is not as efficient as it could be. We need a short circuiting at
+-- a lower level. Compare with simple-conduit, filtering there cuts down time
+-- due to short circuting whereas the time spent remains the same here.
+
+-- | Include only those elements that pass a predicate.
+{-# INLINE filter #-}
+filter :: (Streaming t, Monad (t m)) => (a -> Bool) -> t m a -> t m a
+filter p m = m >>= \x -> if p x then return x else nil
+
+-- | End the stream as soon as the predicate fails on an element.
+takeWhile :: Streaming t => (a -> Bool) -> t m a -> t m a
+takeWhile p m = fromStream $ go (toStream m)
+ where
+ go m1 = Stream $ \ctx stp yld -> do
+ let yield a Nothing | p a = yld a Nothing
+ | otherwise = stp
+ yield a (Just x) | p a = yld a (Just (go x))
+ | otherwise = stp
+ in (runStream m1) ctx stp yield
+
+-- | Discard first 'n' elements from the stream and take the rest.
+drop :: Streaming t => Int -> t m a -> t m a
+drop n m = fromStream $ go n (toStream m)
+ where
+ go n1 m1 = Stream $ \ctx stp yld -> do
+ let yield _ Nothing = stp
+ yield _ (Just x) = (runStream $ go (n1 - 1) x) ctx stp yld
+ if (n1 <= 0)
+ then (runStream m1) ctx stp yld
+ else (runStream m1) ctx stp yield
+
+-- | Drop elements in the stream as long as the predicate succeeds and then
+-- take the rest of the stream.
+dropWhile :: Streaming t => (a -> Bool) -> t m a -> t m a
+dropWhile p m = fromStream $ go (toStream m)
+ where
+ go m1 = Stream $ \ctx stp yld -> do
+ let yield a Nothing | p a = stp
+ | otherwise = yld a Nothing
+ yield a (Just x) | p a = (runStream (go x)) ctx stp yield
+ | otherwise = yld a (Just x)
+ in (runStream m1) ctx stp yield
+
+-- | Determine whether all elements of a stream satisfy a predicate.
+all :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool
+all p m = go (toStream m)
+ where
+ go m1 =
+ let yield a Nothing | p a = return True
+ | otherwise = return False
+ yield a (Just x) | p a = go x
+ | otherwise = return False
+ in (runStream m1) Nothing (return True) yield
+
+-- | Determine whether any of the elements of a stream satisfy a predicate.
+any :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool
+any p m = go (toStream m)
+ where
+ go m1 =
+ let yield a Nothing | p a = return True
+ | otherwise = return False
+ yield a (Just x) | p a = return True
+ | otherwise = go x
+ in (runStream m1) Nothing (return False) yield
+
+-- | Determine the sum of all elements of a stream of numbers
+sum :: (Streaming t, Monad m, Num a) => t m a -> m a
+sum = foldl (+) 0 id
+
+-- | Determine the product of all elements of a stream of numbers
+product :: (Streaming t, Monad m, Num a) => t m a -> m a
+product = foldl (*) 0 id
+
+-- | Extract the first element of the stream, if any.
+head :: (Streaming t, Monad m) => t m a -> m (Maybe a)
+head m =
+ let stop = return Nothing
+ yield a _ = return (Just a)
+ in (runStream (toStream m)) Nothing stop yield
+
+-- | Extract the last element of the stream, if any.
+last :: (Streaming t, Monad m) => t m a -> m (Maybe a)
+last m = go (toStream m)
+ where
+ go m1 =
+ let stop = return Nothing
+ yield a Nothing = return (Just a)
+ yield _ (Just x) = go x
+ in (runStream m1) Nothing stop yield
+
+-- | Determine whether an element is present in the stream.
+elem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool
+elem e m = go (toStream m)
+ where
+ go m1 =
+ let stop = return False
+ yield a Nothing = return (a == e)
+ yield a (Just x) = if (a == e) then return True else go x
+ in (runStream m1) Nothing stop yield
+
+-- | Determine whether an element is not present in the stream.
+notElem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool
+notElem e m = go (toStream m)
+ where
+ go m1 =
+ let stop = return True
+ yield a Nothing = return (a /= e)
+ yield a (Just x) = if (a == e) then return False else go x
+ in (runStream m1) Nothing stop yield
+
+-- | Determine the length of the stream.
+length :: (Streaming t, Monad m) => t m a -> m Int
+length = foldl (\n _ -> n + 1) 0 id
+
+-- | Determine the minimum element in a stream.
+minimum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a)
+minimum m = go Nothing (toStream m)
+ where
+ go r m1 =
+ let stop = return r
+ yield a Nothing = return $ min_ a r
+ yield a (Just x) = go (min_ a r) x
+ in (runStream m1) Nothing stop yield
+
+ min_ a r = case r of
+ Nothing -> Just a
+ Just e -> Just $ min a e
+
+-- | Determine the maximum element in a stream.
+maximum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a)
+maximum m = go Nothing (toStream m)
+ where
+ go r m1 =
+ let stop = return r
+ yield a Nothing = return $ max_ a r
+ yield a (Just x) = go (max_ a r) x
+ in (runStream m1) Nothing stop yield
+
+ max_ a r = case r of
+ Nothing -> Just a
+ Just e -> Just $ max a e
+
+------------------------------------------------------------------------------
+-- Transformation
+------------------------------------------------------------------------------
+
+-- XXX Parallel variants of these? mapMWith et al. sequenceWith.
+
+-- | Replace each element of the stream with the result of a monadic action
+-- applied on the element.
+mapM :: (Streaming t, Monad m) => (a -> m b) -> t m a -> t m b
+mapM f m = fromStream $ go (toStream m)
+ where
+ go m1 = Stream $ \_ stp yld -> do
+ let stop = stp
+ yield a Nothing = f a >>= \b -> yld b Nothing
+ yield a (Just x) = f a >>= \b -> yld b (Just (go x))
+ in (runStream m1) Nothing stop yield
+
+-- | Apply a monadic action to each element of the stream and discard the
+-- output of the action.
+mapM_ :: (Streaming t, Monad m) => (a -> m b) -> t m a -> m ()
+mapM_ f m = go (toStream m)
+ where
+ go m1 =
+ let stop = return ()
+ yield a Nothing = f a >> return ()
+ yield a (Just x) = f a >> go x
+ in (runStream m1) Nothing stop yield
+
+-- | Reduce a stream of monadic actions to a stream of the output of those
+-- actions.
+sequence :: (Streaming t, Monad m) => t m (m a) -> t m a
+sequence m = fromStream $ go (toStream m)
+ where
+ go m1 = Stream $ \_ stp yld -> do
+ let stop = stp
+ yield a Nothing = a >>= \b -> yld b Nothing
+ yield a (Just x) = a >>= \b -> yld b (Just (go x))
+ in (runStream m1) Nothing stop yield
+
+------------------------------------------------------------------------------
+-- Serially Zipping Streams
+------------------------------------------------------------------------------
+
+-- | Zip two streams serially using a monadic zipping function.
+zipWithM :: Streaming t => (a -> b -> t m c) -> t m a -> t m b -> t m c
+zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
+ where
+ go mx my = Stream $ \_ stp yld -> do
+ let merge a ra =
+ let yield2 b Nothing = (runStream (g a b)) Nothing stp yld
+ yield2 b (Just rb) =
+ (runStream ((g a b) <> (go ra rb))) Nothing stp yld
+ in (runStream my) Nothing stp yield2
+ let yield1 a Nothing = merge a snil
+ yield1 a (Just ra) = merge a ra
+ (runStream mx) Nothing stp yield1
+ g a b = toStream $ f a b
+
+------------------------------------------------------------------------------
+-- Parallely Zipping Streams
+------------------------------------------------------------------------------
+
+-- | Zip two streams asyncly (i.e. both the elements being zipped are generated
+-- concurrently) using a monadic zipping function.
+zipAsyncWithM :: (Streaming t, MonadAsync m)
+ => (a -> b -> t m c) -> t m a -> t m b -> t m c
+zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp yld -> do
+ ma <- async m1
+ mb <- async m2
+ (runStream (toStream (zipWithM f ma mb))) Nothing stp yld
diff --git a/src/Streamly/Streams.hs b/src/Streamly/Streams.hs
new file mode 100644
index 0000000..4a297d4
--- /dev/null
+++ b/src/Streamly/Streams.hs
@@ -0,0 +1,985 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving#-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE StandaloneDeriving #-}
+{-# LANGUAGE UndecidableInstances #-} -- XXX
+
+-- |
+-- Module : Streamly.Streams
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+--
+--
+module Streamly.Streams
+ (
+ Streaming (..)
+ , MonadAsync
+
+ -- * SVars
+ , SVarSched (..)
+ , SVarTag (..)
+ , SVarStyle (..)
+ , SVar
+ , newEmptySVar
+
+ -- * Construction
+ , streamBuild
+ , fromCallback
+ , fromSVar
+
+ -- * Elimination
+ , cons
+ , nil
+ , streamFold
+ , runStreaming
+ , toSVar
+
+ -- * Transformation
+ , async
+
+ -- * Stream Styles
+ , StreamT
+ , InterleavedT
+ , AsyncT
+ , ParallelT
+ , ZipStream
+ , ZipAsync
+
+ -- * Type Adapters
+ , serially
+ , interleaving
+ , asyncly
+ , parallely
+ , zipping
+ , zippingAsync
+ , adapt
+
+ -- * Running Streams
+ , runStreamT
+ , runInterleavedT
+ , runAsyncT
+ , runParallelT
+ , runZipStream
+ , runZipAsync
+
+ -- * Zipping
+ , zipWith
+ , zipAsyncWith
+
+ -- * Sum Style Composition
+ , (<=>)
+ , (<|)
+
+ -- * Fold Utilities
+ -- $foldutils
+ , foldWith
+ , foldMapWith
+ , forEachWith
+ )
+where
+
+import Control.Applicative (Alternative (..), liftA2)
+import Control.Monad (MonadPlus(..), ap)
+import Control.Monad.Base (MonadBase (..))
+import Control.Monad.Catch (MonadThrow)
+import Control.Monad.Error.Class (MonadError(..))
+import Control.Monad.IO.Class (MonadIO(..))
+import Control.Monad.Reader.Class (MonadReader(..))
+import Control.Monad.State.Class (MonadState(..))
+import Control.Monad.Trans.Class (MonadTrans)
+import Data.Semigroup (Semigroup(..))
+import Prelude hiding (drop, take, zipWith)
+import Streamly.Core
+
+------------------------------------------------------------------------------
+-- Types that can behave as a Stream
+------------------------------------------------------------------------------
+
+-- | Class of types that can represent a stream of elements of some type 'a' in
+-- some monad 'm'.
+class Streaming t where
+ toStream :: t m a -> Stream m a
+ fromStream :: Stream m a -> t m a
+
+------------------------------------------------------------------------------
+-- Constructing a stream
+------------------------------------------------------------------------------
+
+-- | Add an element a the head of a stream.
+cons :: (Streaming t) => a -> t m a -> t m a
+cons a r = fromStream $ scons a (Just (toStream r))
+
+-- | An empty stream.
+nil :: Streaming t => t m a
+nil = fromStream $ snil
+
+-- | Build a stream from its church encoding. The function passed maps
+-- directly to the underlying representation of the stream type. The second
+-- parameter to the function is the "yield" function yielding a value and the
+-- remaining stream if any otherwise 'Nothing'. The third parameter is to
+-- represent an "empty" stream.
+streamBuild :: Streaming t
+ => (forall r. Maybe (SVar m a)
+ -> (a -> Maybe (t m a) -> m r)
+ -> m r
+ -> m r)
+ -> t m a
+streamBuild k = fromStream $ Stream $ \sv stp yld ->
+ let yield a Nothing = yld a Nothing
+ yield a (Just r) = yld a (Just (toStream r))
+ in k sv yield stp
+
+-- | Build a singleton stream from a callback function.
+fromCallback :: (Streaming t) => (forall r. (a -> m r) -> m r) -> t m a
+fromCallback k = fromStream $ Stream $ \_ _ yld -> k (\a -> yld a Nothing)
+
+-- | Read an SVar to get a stream.
+fromSVar :: (MonadAsync m, Streaming t) => SVar m a -> t m a
+fromSVar sv = fromStream $ fromStreamVar sv
+
+------------------------------------------------------------------------------
+-- Destroying a stream
+------------------------------------------------------------------------------
+
+-- | Fold a stream using its church encoding. The second argument is the "step"
+-- function consuming an element and the remaining stream, if any. The third
+-- argument is for consuming an "empty" stream that yields nothing.
+streamFold :: Streaming t
+ => Maybe (SVar m a) -> (a -> Maybe (t m a) -> m r) -> m r -> t m a -> m r
+streamFold sv step blank m =
+ let yield a Nothing = step a Nothing
+ yield a (Just x) = step a (Just (fromStream x))
+ in (runStream (toStream m)) sv blank yield
+
+-- | Run a streaming composition, discard the results.
+runStreaming :: (Monad m, Streaming t) => t m a -> m ()
+runStreaming m = go (toStream m)
+ where
+ go m1 =
+ let stop = return ()
+ yield _ Nothing = stop
+ yield _ (Just x) = go x
+ in (runStream m1) Nothing stop yield
+
+-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
+-- be read back from the SVar using 'fromSVar'.
+toSVar :: (Streaming t, MonadAsync m) => SVar m a -> t m a -> m ()
+toSVar sv m = toStreamVar sv (toStream m)
+
+------------------------------------------------------------------------------
+-- Transformation
+------------------------------------------------------------------------------
+
+-- XXX Get rid of this?
+-- | Make a stream asynchronous, triggers the computation and returns a stream
+-- in the underlying monad representing the output generated by the original
+-- computation. The returned action is exhaustible and must be drained once. If
+-- not drained fully we may have a thread blocked forever and once exhausted it
+-- will always return 'empty'.
+
+async :: (Streaming t, MonadAsync m) => t m a -> m (t m a)
+async m = do
+ sv <- newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m)
+ return $ fromSVar sv
+
+------------------------------------------------------------------------------
+-- StreamT
+------------------------------------------------------------------------------
+
+-- | The 'Monad' instance of 'StreamT' runs the /monadic continuation/ for each
+-- element of the stream, serially.
+--
+-- @
+-- main = 'runStreamT' $ do
+-- x <- return 1 \<\> return 2
+-- liftIO $ print x
+-- @
+-- @
+-- 1
+-- 2
+-- @
+--
+-- 'StreamT' nests streams serially in a depth first manner.
+--
+-- @
+-- main = 'runStreamT' $ do
+-- x <- return 1 \<\> return 2
+-- y <- return 3 \<\> return 4
+-- liftIO $ print (x, y)
+-- @
+-- @
+-- (1,3)
+-- (1,4)
+-- (2,3)
+-- (2,4)
+-- @
+--
+-- This behavior is exactly like a list transformer. We call the monadic code
+-- being run for each element of the stream a monadic continuation. In
+-- imperative paradigm we can think of this composition as nested @for@ loops
+-- and the monadic continuation is the body of the loop. The loop iterates for
+-- all elements of the stream.
+--
+newtype StreamT m a = StreamT {getStreamT :: Stream m a}
+ deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow)
+
+deriving instance MonadAsync m => Alternative (StreamT m)
+deriving instance MonadAsync m => MonadPlus (StreamT m)
+deriving instance (MonadBase b m, Monad m) => MonadBase b (StreamT m)
+deriving instance MonadError e m => MonadError e (StreamT m)
+deriving instance MonadReader r m => MonadReader r (StreamT m)
+deriving instance MonadState s m => MonadState s (StreamT m)
+
+instance Streaming StreamT where
+ toStream = getStreamT
+ fromStream = StreamT
+
+-- XXX The Functor/Applicative/Num instances for all the types are exactly the
+-- same, how can we reduce this boilerplate (use TH)? We cannot derive them
+-- from a single base type because they depend on the Monad instance which is
+-- different for each type.
+
+------------------------------------------------------------------------------
+-- Monad
+------------------------------------------------------------------------------
+
+instance Monad m => Monad (StreamT m) where
+ return = pure
+ (StreamT (Stream m)) >>= f = StreamT $ Stream $ \_ stp yld ->
+ let run x = (runStream x) Nothing stp yld
+ yield a Nothing = run $ getStreamT (f a)
+ yield a (Just r) = run $ getStreamT (f a)
+ <> getStreamT (StreamT r >>= f)
+ in m Nothing stp yield
+
+------------------------------------------------------------------------------
+-- Applicative
+------------------------------------------------------------------------------
+
+instance Monad m => Applicative (StreamT m) where
+ pure a = StreamT $ scons a Nothing
+ (<*>) = ap
+
+------------------------------------------------------------------------------
+-- Functor
+------------------------------------------------------------------------------
+
+instance Monad m => Functor (StreamT m) where
+ fmap f (StreamT (Stream m)) = StreamT $ Stream $ \_ stp yld ->
+ let yield a Nothing = yld (f a) Nothing
+ yield a (Just r) = yld (f a)
+ (Just (getStreamT (fmap f (StreamT r))))
+ in m Nothing stp yield
+
+------------------------------------------------------------------------------
+-- Num
+------------------------------------------------------------------------------
+
+instance (Monad m, Num a) => Num (StreamT m a) where
+ fromInteger n = pure (fromInteger n)
+
+ negate = fmap negate
+ abs = fmap abs
+ signum = fmap signum
+
+ (+) = liftA2 (+)
+ (*) = liftA2 (*)
+ (-) = liftA2 (-)
+
+instance (Monad m, Fractional a) => Fractional (StreamT m a) where
+ fromRational n = pure (fromRational n)
+
+ recip = fmap recip
+
+ (/) = liftA2 (/)
+
+instance (Monad m, Floating a) => Floating (StreamT m a) where
+ pi = pure pi
+
+ exp = fmap exp
+ sqrt = fmap sqrt
+ log = fmap log
+ sin = fmap sin
+ tan = fmap tan
+ cos = fmap cos
+ asin = fmap asin
+ atan = fmap atan
+ acos = fmap acos
+ sinh = fmap sinh
+ tanh = fmap tanh
+ cosh = fmap cosh
+ asinh = fmap asinh
+ atanh = fmap atanh
+ acosh = fmap acosh
+
+ (**) = liftA2 (**)
+ logBase = liftA2 logBase
+
+------------------------------------------------------------------------------
+-- InterleavedT
+------------------------------------------------------------------------------
+
+-- | Like 'StreamT' but different in nesting behavior. It fairly interleaves
+-- the iterations of the inner and the outer loop, nesting loops in a breadth
+-- first manner.
+--
+--
+-- @
+-- main = 'runInterleavedT' $ do
+-- x <- return 1 \<\> return 2
+-- y <- return 3 \<\> return 4
+-- liftIO $ print (x, y)
+-- @
+-- @
+-- (1,3)
+-- (2,3)
+-- (1,4)
+-- (2,4)
+-- @
+--
+newtype InterleavedT m a = InterleavedT {getInterleavedT :: Stream m a}
+ deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow)
+
+deriving instance MonadAsync m => Alternative (InterleavedT m)
+deriving instance MonadAsync m => MonadPlus (InterleavedT m)
+deriving instance (MonadBase b m, Monad m) => MonadBase b (InterleavedT m)
+deriving instance MonadError e m => MonadError e (InterleavedT m)
+deriving instance MonadReader r m => MonadReader r (InterleavedT m)
+deriving instance MonadState s m => MonadState s (InterleavedT m)
+
+instance Streaming InterleavedT where
+ toStream = getInterleavedT
+ fromStream = InterleavedT
+
+instance Monad m => Monad (InterleavedT m) where
+ return = pure
+ (InterleavedT (Stream m)) >>= f = InterleavedT $ Stream $ \_ stp yld ->
+ let run x = (runStream x) Nothing stp yld
+ yield a Nothing = run $ getInterleavedT (f a)
+ yield a (Just r) = run $ getInterleavedT (f a)
+ `interleave`
+ getInterleavedT (InterleavedT r >>= f)
+ in m Nothing stp yield
+
+------------------------------------------------------------------------------
+-- Applicative
+------------------------------------------------------------------------------
+
+instance Monad m => Applicative (InterleavedT m) where
+ pure a = InterleavedT $ scons a Nothing
+ (<*>) = ap
+
+------------------------------------------------------------------------------
+-- Functor
+------------------------------------------------------------------------------
+
+instance Monad m => Functor (InterleavedT m) where
+ fmap f (InterleavedT (Stream m)) = InterleavedT $ Stream $ \_ stp yld ->
+ let yield a Nothing = yld (f a) Nothing
+ yield a (Just r) =
+ yld (f a) (Just (getInterleavedT (fmap f (InterleavedT r))))
+ in m Nothing stp yield
+
+------------------------------------------------------------------------------
+-- Num
+------------------------------------------------------------------------------
+
+instance (Monad m, Num a) => Num (InterleavedT m a) where
+ fromInteger n = pure (fromInteger n)
+
+ negate = fmap negate
+ abs = fmap abs
+ signum = fmap signum
+
+ (+) = liftA2 (+)
+ (*) = liftA2 (*)
+ (-) = liftA2 (-)
+
+instance (Monad m, Fractional a) => Fractional (InterleavedT m a) where
+ fromRational n = pure (fromRational n)
+
+ recip = fmap recip
+
+ (/) = liftA2 (/)
+
+instance (Monad m, Floating a) => Floating (InterleavedT m a) where
+ pi = pure pi
+
+ exp = fmap exp
+ sqrt = fmap sqrt
+ log = fmap log
+ sin = fmap sin
+ tan = fmap tan
+ cos = fmap cos
+ asin = fmap asin
+ atan = fmap atan
+ acos = fmap acos
+ sinh = fmap sinh
+ tanh = fmap tanh
+ cosh = fmap cosh
+ asinh = fmap asinh
+ atanh = fmap atanh
+ acosh = fmap acosh
+
+ (**) = liftA2 (**)
+ logBase = liftA2 logBase
+
+------------------------------------------------------------------------------
+-- AsyncT
+------------------------------------------------------------------------------
+
+-- | Like 'StreamT' but /may/ run each iteration concurrently using demand
+-- driven concurrency. More concurrent iterations are started only if the
+-- previous iterations are not able to produce enough output for the consumer.
+--
+-- @
+-- import "Streamly"
+-- import Control.Concurrent
+--
+-- main = 'runAsyncT' $ do
+-- n <- return 3 \<\> return 2 \<\> return 1
+-- liftIO $ do
+-- threadDelay (n * 1000000)
+-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
+-- @
+-- @
+-- ThreadId 40: Delay 1
+-- ThreadId 39: Delay 2
+-- ThreadId 38: Delay 3
+-- @
+--
+-- All iterations may run in the same thread if they do not block.
+newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a}
+ deriving (Semigroup, Monoid, MonadTrans)
+
+deriving instance MonadAsync m => Alternative (AsyncT m)
+deriving instance MonadAsync m => MonadPlus (AsyncT m)
+deriving instance MonadAsync m => MonadIO (AsyncT m)
+deriving instance MonadAsync m => MonadThrow (AsyncT m)
+deriving instance (MonadBase b m, MonadAsync m) => MonadBase b (AsyncT m)
+deriving instance (MonadError e m, MonadAsync m) => MonadError e (AsyncT m)
+deriving instance (MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m)
+deriving instance (MonadState s m, MonadAsync m) => MonadState s (AsyncT m)
+
+instance Streaming AsyncT where
+ toStream = getAsyncT
+ fromStream = AsyncT
+
+{-# INLINE parbind #-}
+parbind
+ :: (forall c. Stream m c -> Stream m c -> Stream m c)
+ -> Stream m a
+ -> (a -> Stream m b)
+ -> Stream m b
+parbind par m f = go m
+ where
+ go (Stream g) =
+ Stream $ \ctx stp yld ->
+ let run x = (runStream x) ctx stp yld
+ yield a Nothing = run $ f a
+ yield a (Just r) = run $ f a `par` (go r)
+ in g Nothing stp yield
+
+instance MonadAsync m => Monad (AsyncT m) where
+ return = pure
+ (AsyncT m) >>= f = AsyncT $ parbind par m g
+ where g x = getAsyncT (f x)
+ par = joinStreamVar2 (SVarStyle Conjunction LIFO)
+
+------------------------------------------------------------------------------
+-- Applicative
+------------------------------------------------------------------------------
+
+instance MonadAsync m => Applicative (AsyncT m) where
+ pure a = AsyncT $ scons a Nothing
+ (<*>) = ap
+
+------------------------------------------------------------------------------
+-- Functor
+------------------------------------------------------------------------------
+
+instance Monad m => Functor (AsyncT m) where
+ fmap f (AsyncT (Stream m)) = AsyncT $ Stream $ \_ stp yld ->
+ let yield a Nothing = yld (f a) Nothing
+ yield a (Just r) = yld (f a) (Just (getAsyncT (fmap f (AsyncT r))))
+ in m Nothing stp yield
+
+------------------------------------------------------------------------------
+-- Num
+------------------------------------------------------------------------------
+
+instance (MonadAsync m, Num a) => Num (AsyncT m a) where
+ fromInteger n = pure (fromInteger n)
+
+ negate = fmap negate
+ abs = fmap abs
+ signum = fmap signum
+
+ (+) = liftA2 (+)
+ (*) = liftA2 (*)
+ (-) = liftA2 (-)
+
+instance (MonadAsync m, Fractional a) => Fractional (AsyncT m a) where
+ fromRational n = pure (fromRational n)
+
+ recip = fmap recip
+
+ (/) = liftA2 (/)
+
+instance (MonadAsync m, Floating a) => Floating (AsyncT m a) where
+ pi = pure pi
+
+ exp = fmap exp
+ sqrt = fmap sqrt
+ log = fmap log
+ sin = fmap sin
+ tan = fmap tan
+ cos = fmap cos
+ asin = fmap asin
+ atan = fmap atan
+ acos = fmap acos
+ sinh = fmap sinh
+ tanh = fmap tanh
+ cosh = fmap cosh
+ asinh = fmap asinh
+ atanh = fmap atanh
+ acosh = fmap acosh
+
+ (**) = liftA2 (**)
+ logBase = liftA2 logBase
+
+------------------------------------------------------------------------------
+-- ParallelT
+------------------------------------------------------------------------------
+
+-- | Like 'StreamT' but runs /all/ iterations fairly concurrently using a round
+-- robin scheduling.
+--
+-- @
+-- import "Streamly"
+-- import Control.Concurrent
+--
+-- main = 'runParallelT' $ do
+-- n <- return 3 \<\> return 2 \<\> return 1
+-- liftIO $ do
+-- threadDelay (n * 1000000)
+-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
+-- @
+-- @
+-- ThreadId 40: Delay 1
+-- ThreadId 39: Delay 2
+-- ThreadId 38: Delay 3
+-- @
+--
+-- Unlike 'AsyncT' all iterations are guaranteed to run fairly concurrently,
+-- unconditionally.
+newtype ParallelT m a = ParallelT {getParallelT :: Stream m a}
+ deriving (Semigroup, Monoid, MonadTrans)
+
+deriving instance MonadAsync m => Alternative (ParallelT m)
+deriving instance MonadAsync m => MonadPlus (ParallelT m)
+deriving instance MonadAsync m => MonadIO (ParallelT m)
+deriving instance MonadAsync m => MonadThrow (ParallelT m)
+deriving instance (MonadBase b m, MonadAsync m) => MonadBase b (ParallelT m)
+deriving instance (MonadError e m, MonadAsync m) => MonadError e (ParallelT m)
+deriving instance (MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m)
+deriving instance (MonadState s m, MonadAsync m) => MonadState s (ParallelT m)
+
+instance Streaming ParallelT where
+ toStream = getParallelT
+ fromStream = ParallelT
+
+instance MonadAsync m => Monad (ParallelT m) where
+ return = pure
+ (ParallelT m) >>= f = ParallelT $ parbind par m g
+ where g x = getParallelT (f x)
+ par = joinStreamVar2 (SVarStyle Conjunction FIFO)
+
+------------------------------------------------------------------------------
+-- Applicative
+------------------------------------------------------------------------------
+
+instance MonadAsync m => Applicative (ParallelT m) where
+ pure a = ParallelT $ scons a Nothing
+ (<*>) = ap
+
+------------------------------------------------------------------------------
+-- Functor
+------------------------------------------------------------------------------
+
+instance Monad m => Functor (ParallelT m) where
+ fmap f (ParallelT (Stream m)) = ParallelT $ Stream $ \_ stp yld ->
+ let yield a Nothing = yld (f a) Nothing
+ yield a (Just r) = yld (f a)
+ (Just (getParallelT (fmap f (ParallelT r))))
+ in m Nothing stp yield
+
+------------------------------------------------------------------------------
+-- Num
+------------------------------------------------------------------------------
+
+instance (MonadAsync m, Num a) => Num (ParallelT m a) where
+ fromInteger n = pure (fromInteger n)
+
+ negate = fmap negate
+ abs = fmap abs
+ signum = fmap signum
+
+ (+) = liftA2 (+)
+ (*) = liftA2 (*)
+ (-) = liftA2 (-)
+
+instance (MonadAsync m, Fractional a) => Fractional (ParallelT m a) where
+ fromRational n = pure (fromRational n)
+
+ recip = fmap recip
+
+ (/) = liftA2 (/)
+
+instance (MonadAsync m, Floating a) => Floating (ParallelT m a) where
+ pi = pure pi
+
+ exp = fmap exp
+ sqrt = fmap sqrt
+ log = fmap log
+ sin = fmap sin
+ tan = fmap tan
+ cos = fmap cos
+ asin = fmap asin
+ atan = fmap atan
+ acos = fmap acos
+ sinh = fmap sinh
+ tanh = fmap tanh
+ cosh = fmap cosh
+ asinh = fmap asinh
+ atanh = fmap atanh
+ acosh = fmap acosh
+
+ (**) = liftA2 (**)
+ logBase = liftA2 logBase
+
+------------------------------------------------------------------------------
+-- Serially Zipping Streams
+------------------------------------------------------------------------------
+
+-- | Zip two streams serially using a pure zipping function.
+zipWith :: Streaming t => (a -> b -> c) -> t m a -> t m b -> t m c
+zipWith f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
+ where
+ go mx my = Stream $ \_ stp yld -> do
+ let merge a ra =
+ let yield2 b Nothing = yld (f a b) Nothing
+ yield2 b (Just rb) = yld (f a b) (Just (go ra rb))
+ in (runStream my) Nothing stp yield2
+ let yield1 a Nothing = merge a snil
+ yield1 a (Just ra) = merge a ra
+ (runStream mx) Nothing stp yield1
+
+-- | 'ZipStream' zips serially i.e. it produces one element from each stream
+-- serially and then zips the two elements. Note, for convenience we have used
+-- the 'zipping' combinator in the following example instead of using a type
+-- annotation.
+--
+-- @
+-- main = (toList . 'zipping' $ (,) \<$\> s1 \<*\> s2) >>= print
+-- where s1 = pure 1 <> pure 2
+-- s2 = pure 3 <> pure 4
+-- @
+-- @
+-- [(1,3),(2,4)]
+-- @
+--
+-- This applicative operation can be seen as the zipping equivalent of
+-- interleaving with '<=>'.
+newtype ZipStream m a = ZipStream {getZipStream :: Stream m a}
+ deriving (Semigroup, Monoid)
+
+deriving instance MonadAsync m => Alternative (ZipStream m)
+
+instance Monad m => Functor (ZipStream m) where
+ fmap f (ZipStream (Stream m)) = ZipStream $ Stream $ \_ stp yld ->
+ let yield a Nothing = yld (f a) Nothing
+ yield a (Just r) = yld (f a)
+ (Just (getZipStream (fmap f (ZipStream r))))
+ in m Nothing stp yield
+
+instance Monad m => Applicative (ZipStream m) where
+ pure a = ZipStream $ scons a Nothing
+ (<*>) = zipWith id
+
+instance Streaming ZipStream where
+ toStream = getZipStream
+ fromStream = ZipStream
+
+instance (Monad m, Num a) => Num (ZipStream m a) where
+ fromInteger n = pure (fromInteger n)
+
+ negate = fmap negate
+ abs = fmap abs
+ signum = fmap signum
+
+ (+) = liftA2 (+)
+ (*) = liftA2 (*)
+ (-) = liftA2 (-)
+
+instance (Monad m, Fractional a) => Fractional (ZipStream m a) where
+ fromRational n = pure (fromRational n)
+
+ recip = fmap recip
+
+ (/) = liftA2 (/)
+
+instance (Monad m, Floating a) => Floating (ZipStream m a) where
+ pi = pure pi
+
+ exp = fmap exp
+ sqrt = fmap sqrt
+ log = fmap log
+ sin = fmap sin
+ tan = fmap tan
+ cos = fmap cos
+ asin = fmap asin
+ atan = fmap atan
+ acos = fmap acos
+ sinh = fmap sinh
+ tanh = fmap tanh
+ cosh = fmap cosh
+ asinh = fmap asinh
+ atanh = fmap atanh
+ acosh = fmap acosh
+
+ (**) = liftA2 (**)
+ logBase = liftA2 logBase
+
+------------------------------------------------------------------------------
+-- Parallely Zipping Streams
+------------------------------------------------------------------------------
+
+-- | Zip two streams asyncly (i.e. both the elements being zipped are generated
+-- concurrently) using a pure zipping function.
+zipAsyncWith :: (Streaming t, MonadAsync m)
+ => (a -> b -> c) -> t m a -> t m b -> t m c
+zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do
+ ma <- async m1
+ mb <- async m2
+ (runStream (toStream (zipWith f ma mb))) Nothing stp yld
+
+-- | Like 'ZipStream' but zips in parallel, it generates both the elements to
+-- be zipped concurrently.
+--
+-- @
+-- main = (toList . 'zippingAsync' $ (,) \<$\> s1 \<*\> s2) >>= print
+-- where s1 = pure 1 <> pure 2
+-- s2 = pure 3 <> pure 4
+-- @
+-- @
+-- [(1,3),(2,4)]
+-- @
+--
+-- This applicative operation can be seen as the zipping equivalent of
+-- parallel composition with '<|>'.
+newtype ZipAsync m a = ZipAsync {getZipAsync :: Stream m a}
+ deriving (Semigroup, Monoid)
+
+deriving instance MonadAsync m => Alternative (ZipAsync m)
+
+instance Monad m => Functor (ZipAsync m) where
+ fmap f (ZipAsync (Stream m)) = ZipAsync $ Stream $ \_ stp yld ->
+ let yield a Nothing = yld (f a) Nothing
+ yield a (Just r) = yld (f a)
+ (Just (getZipAsync (fmap f (ZipAsync r))))
+ in m Nothing stp yield
+
+instance MonadAsync m => Applicative (ZipAsync m) where
+ pure a = ZipAsync $ scons a Nothing
+ (<*>) = zipAsyncWith id
+
+instance Streaming ZipAsync where
+ toStream = getZipAsync
+ fromStream = ZipAsync
+
+instance (MonadAsync m, Num a) => Num (ZipAsync m a) where
+ fromInteger n = pure (fromInteger n)
+
+ negate = fmap negate
+ abs = fmap abs
+ signum = fmap signum
+
+ (+) = liftA2 (+)
+ (*) = liftA2 (*)
+ (-) = liftA2 (-)
+
+instance (MonadAsync m, Fractional a) => Fractional (ZipAsync m a) where
+ fromRational n = pure (fromRational n)
+
+ recip = fmap recip
+
+ (/) = liftA2 (/)
+
+instance (MonadAsync m, Floating a) => Floating (ZipAsync m a) where
+ pi = pure pi
+
+ exp = fmap exp
+ sqrt = fmap sqrt
+ log = fmap log
+ sin = fmap sin
+ tan = fmap tan
+ cos = fmap cos
+ asin = fmap asin
+ atan = fmap atan
+ acos = fmap acos
+ sinh = fmap sinh
+ tanh = fmap tanh
+ cosh = fmap cosh
+ asinh = fmap asinh
+ atanh = fmap atanh
+ acosh = fmap acosh
+
+ (**) = liftA2 (**)
+ logBase = liftA2 logBase
+
+-------------------------------------------------------------------------------
+-- Type adapting combinators
+-------------------------------------------------------------------------------
+
+-- | Adapt one streaming type to another.
+adapt :: (Streaming t1, Streaming t2) => t1 m a -> t2 m a
+adapt = fromStream . toStream
+
+-- | Interpret an ambiguously typed stream as 'StreamT'.
+serially :: StreamT m a -> StreamT m a
+serially x = x
+
+-- | Interpret an ambiguously typed stream as 'InterleavedT'.
+interleaving :: InterleavedT m a -> InterleavedT m a
+interleaving x = x
+
+-- | Interpret an ambiguously typed stream as 'AsyncT'.
+asyncly :: AsyncT m a -> AsyncT m a
+asyncly x = x
+
+-- | Interpret an ambiguously typed stream as 'ParallelT'.
+parallely :: ParallelT m a -> ParallelT m a
+parallely x = x
+
+-- | Interpret an ambiguously typed stream as 'ZipStream'.
+zipping :: ZipStream m a -> ZipStream m a
+zipping x = x
+
+-- | Interpret an ambiguously typed stream as 'ZipAsync'.
+zippingAsync :: ZipAsync m a -> ZipAsync m a
+zippingAsync x = x
+
+-------------------------------------------------------------------------------
+-- Running Streams, convenience functions specialized to types
+-------------------------------------------------------------------------------
+
+-- | Same as @runStreaming . serially@.
+runStreamT :: Monad m => StreamT m a -> m ()
+runStreamT = runStreaming
+
+-- | Same as @runStreaming . interleaving@.
+runInterleavedT :: Monad m => InterleavedT m a -> m ()
+runInterleavedT = runStreaming
+
+-- | Same as @runStreaming . asyncly@.
+runAsyncT :: Monad m => AsyncT m a -> m ()
+runAsyncT = runStreaming
+
+-- | Same as @runStreaming . parallely@.
+runParallelT :: Monad m => ParallelT m a -> m ()
+runParallelT = runStreaming
+
+-- | Same as @runStreaming . zipping@.
+runZipStream :: Monad m => ZipStream m a -> m ()
+runZipStream = runStreaming
+
+-- | Same as @runStreaming . zippingAsync@.
+runZipAsync :: Monad m => ZipAsync m a -> m ()
+runZipAsync = runStreaming
+
+------------------------------------------------------------------------------
+-- Sum Style Composition
+------------------------------------------------------------------------------
+
+infixr 5 <=>
+
+-- | Sequential interleaved composition, in contrast to '<>' this operator
+-- fairly interleaves two streams instead of appending them; yielding one
+-- element from each stream alternately.
+--
+-- @
+-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<=\> (return 3 <> return 4)) >>= print
+-- @
+-- @
+-- [1,3,2,4]
+-- @
+--
+-- This operator corresponds to the 'InterleavedT' style. Unlike '<>', this
+-- operator cannot be used to fold infinite containers since that might
+-- accumulate too many partially drained streams. To be clear, it can combine
+-- infinite streams but not infinite number of streams.
+{-# INLINE (<=>) #-}
+(<=>) :: Streaming t => t m a -> t m a -> t m a
+m1 <=> m2 = fromStream $ interleave (toStream m1) (toStream m2)
+
+-- | Demand driven concurrent composition. In contrast to '<|>' this operator
+-- concurrently "merges" streams in a left biased manner rather than fairly
+-- interleaving them. It keeps yielding from the stream on the left as long as
+-- it can. If the left stream blocks or cannot keep up with the pace of the
+-- consumer it can concurrently yield from the stream on the right in parallel.
+--
+-- @
+-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<| (return 3 <> return 4)) >>= print
+-- @
+-- @
+-- [1,2,3,4]
+-- @
+--
+-- Unlike '<|>' it can be used to fold infinite containers of streams. This
+-- operator corresponds to the 'AsyncT' type for product style composition.
+--
+{-# INLINE (<|) #-}
+(<|) :: (Streaming t, MonadAsync m) => t m a -> t m a -> t m a
+m1 <| m2 = fromStream $ parLeft (toStream m1) (toStream m2)
+
+------------------------------------------------------------------------------
+-- Fold Utilities
+------------------------------------------------------------------------------
+
+-- $foldutils
+-- These utilities are designed to pass the first argument as one of the sum
+-- style composition operators (i.e. '<>', '<=>', '<|', '<|>') to conveniently
+-- fold a container using any style of stream composition.
+
+-- | Like the 'Prelude' 'fold' but allows you to specify a binary sum style
+-- stream composition operator to fold a container of streams.
+--
+-- @foldWith (<>) $ map return [1..3]@
+{-# INLINABLE foldWith #-}
+foldWith :: (Streaming t, Foldable f)
+ => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
+foldWith f = foldr f nil
+
+-- | Like 'foldMap' but allows you to specify a binary sum style composition
+-- operator to fold a container of streams. Maps a monadic streaming action on
+-- the container before folding it.
+--
+-- @foldMapWith (<>) return [1..3]@
+{-# INLINABLE foldMapWith #-}
+foldMapWith :: (Streaming t, Foldable f)
+ => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
+foldMapWith f g = foldr (f . g) nil
+
+-- | Like 'foldMapWith' but with the last two arguments reversed i.e. the
+-- monadic streaming function is the last argument.
+{-# INLINABLE forEachWith #-}
+forEachWith :: (Streaming t, Foldable f)
+ => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
+forEachWith f xs g = foldr (f . g) nil xs
diff --git a/src/Streamly/Time.hs b/src/Streamly/Time.hs
new file mode 100644
index 0000000..8d17897
--- /dev/null
+++ b/src/Streamly/Time.hs
@@ -0,0 +1,65 @@
+-- |
+-- Module : Streamly.Time
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+-- Stability : experimental
+-- Portability : GHC
+--
+-- Time utilities for reactive programming.
+
+module Streamly.Time
+ ( periodic
+ , withClock
+ )
+where
+
+import Control.Monad (when)
+import Control.Concurrent (threadDelay)
+
+-- | Run an action forever periodically at the given frequency specified in per
+-- second (Hz).
+periodic :: Int -> IO () -> IO ()
+periodic freq action = do
+ action
+ threadDelay (1000000 `div` freq)
+ periodic freq action
+
+-- | Run a computation on every clock tick, the clock runs at the specified
+-- frequency. It allows running a computation at high frequency efficiently by
+-- maintaining a local clock and adjusting it with the provided base clock at
+-- longer intervals. The first argument is a base clock returning some notion
+-- of time in microseconds. The second argument is the frequency in per second
+-- (Hz). The third argument is the action to run, the action is provided the
+-- local time as an argument.
+withClock :: IO Int -> Int -> (Int -> IO ()) -> IO ()
+withClock clock freq action = do
+ t <- clock
+ go t period period t 0
+
+ where
+
+ period = 1000000 `div` freq
+
+ -- Note that localTime is roughly but not exactly equal to (lastAdj + tick
+ -- * n). That is because we do not abruptly adjust the clock skew instead
+ -- we adjust the tick size.
+ go lastAdj delay tick localTime n = do
+ action localTime
+ when (delay > 0) $ threadDelay delay
+
+ if (n == freq)
+ then do
+ (t, newTick, newDelay) <- adjustClock lastAdj localTime delay
+ go t newDelay newTick (localTime + newTick) 0
+ else go lastAdj delay tick (localTime + tick) (n + 1)
+
+ -- Adjust the tick size rather than the clock to avoid abrupt changes
+ -- resulting in jittery behavior at the end of every interval.
+ adjustClock lastAdj localTime delay = do
+ baseTime <- clock
+ let newTick = period + (baseTime - localTime) `div` freq
+ lastPeriod = (baseTime - lastAdj) `div` freq
+ newDelay = max 0 (delay + period - lastPeriod)
+ return (baseTime, newTick, newDelay)
diff --git a/src/Streamly/Tutorial.hs b/src/Streamly/Tutorial.hs
new file mode 100644
index 0000000..b357b3c
--- /dev/null
+++ b/src/Streamly/Tutorial.hs
@@ -0,0 +1,1042 @@
+{-# OPTIONS_GHC -fno-warn-unused-imports #-}
+-- |
+-- Module : Streamly.Tutorial
+-- Copyright : (c) 2017 Harendra Kumar
+--
+-- License : BSD3
+-- Maintainer : harendra.kumar@gmail.com
+--
+-- Streamly, short for stream concurrently, combines the essence of
+-- non-determinism, streaming and concurrency in functional programming.
+-- Concurrent and non-concurrent applications are almost indistinguisable,
+-- concurrency capability does not at all impact the performance of
+-- non-concurrent case.
+-- Streaming enables writing modular, composable and scalable applications with
+-- ease and concurrency allows you to make them scale and perform well.
+-- Streamly enables writing concurrent applications without being aware of
+-- threads or synchronization. No explicit thread control is needed, where
+-- applicable the concurrency rate is automatically controlled based on the
+-- demand by the consumer. However, combinators are provided to fine tune the
+-- concurrency control.
+-- Streaming and concurrency together enable expressing reactive applications
+-- conveniently. See "Streamly.Examples" for a simple SDL based FRP example.
+--
+-- Streamly streams are very much like the Haskell lists and most of the
+-- functions that work on lists have a counterpart that works on streams.
+-- However, streamly streams can be generated, consumed or combined
+-- concurrently. In this tutorial we will go over the basic concepts and how to
+-- use the library. The documentation of @Streamly@ module has more details on
+-- core APIs. For more APIs for constructing, folding, filtering, mapping and
+-- zipping etc. see the documentation of "Streamly.Prelude" module. For
+-- examples and other ways to use the library see the module
+-- "Streamly.Examples" as well.
+
+module Streamly.Tutorial
+ (
+ -- * Streams
+ -- $streams
+
+ -- ** Generating Streams
+ -- $generating
+
+ -- ** Eliminating Streams
+ -- $eliminating
+
+ -- * Combining Streams
+ -- $combining
+
+ -- ** Semigroup Style
+ -- $semigroup
+
+ -- *** Serial composition ('<>')
+ -- $serial
+
+ -- *** Async composition ('<|')
+ -- $parallel
+
+ -- *** Interleaved composition ('<=>')
+ -- $interleaved
+
+ -- *** Fair Concurrent composition ('<|>')
+ -- $fairParallel
+
+ -- *** Custom composition
+ -- $custom
+
+ -- ** Monoid Style
+ -- $monoid
+
+ -- * Transforming Streams
+ -- $transforming
+
+ -- ** Monad
+ -- $monad
+
+ -- *** Serial Composition ('StreamT')
+ -- $regularSerial
+
+ -- *** Async Composition ('AsyncT')
+ -- $concurrentNesting
+
+ -- *** Interleaved Composition ('InterleavedT')
+ -- $interleavedNesting
+
+ -- *** Fair Concurrent Composition ('ParallelT')
+ -- $fairlyConcurrentNesting
+
+ -- *** Exercise
+ -- $monadExercise
+
+ -- ** Applicative
+ -- $applicative
+
+ -- ** Functor
+ -- $functor
+
+ -- * Zipping Streams
+ -- $zipping
+
+ -- ** Serial Zipping
+ -- $serialzip
+
+ -- ** Parallel Zipping
+ -- $parallelzip
+
+ -- * Summary of Compositions
+ -- $compositionSummary
+
+ -- * Concurrent Programming
+ -- $concurrent
+
+ -- * Reactive Programming
+ -- $reactive
+
+ -- * Performance
+ -- $performance
+
+ -- * Interoperation with Streaming Libraries
+ -- $interop
+
+ -- * Comparison with Existing Packages
+ -- $comparison
+ )
+where
+
+import Streamly
+import Streamly.Prelude
+import Data.Semigroup
+import Control.Applicative
+import Control.Monad
+import Control.Monad.IO.Class (MonadIO(..))
+import Control.Monad.Trans.Class (MonadTrans (lift))
+
+-- $streams
+--
+-- Streamly provides many different stream types depending on the desired
+-- composition style. The simplest type is 'StreamT'. 'StreamT' is a monad
+-- transformer, the type @StreamT m a@ represents a stream of values of type
+-- 'a' in some underlying monad 'm'. For example, @StreamT IO Int@ is a stream
+-- of 'Int' in 'IO' monad.
+
+-- $generating
+--
+-- Pure values can be placed into the stream type using 'return' or 'pure'.
+-- Effects in the IO monad can be lifted to the stream type using the 'liftIO'
+-- combinator. In a transformer stack we can lift actions from the lower monad
+-- using the 'lift' combinator. Some examples of streams with a single element:
+--
+-- @
+-- return 1 :: 'StreamT' IO Int
+-- @
+-- @
+-- liftIO $ putStrLn "Hello world!" :: 'StreamT' IO ()
+-- @
+--
+-- We can combine streams using '<>' to create streams of many elements:
+--
+-- @
+-- return 1 <> return 2 <> return 3 :: 'StreamT' IO Int
+-- @
+--
+-- For more ways to construct or generate a stream see the module
+-- "Streamly.Prelude".
+
+-- $eliminating
+--
+-- 'runStreamT' runs a composed 'StreamT' computation, lowering the type into
+-- the underlying monad and discarding the result stream:
+--
+-- @
+-- import "Streamly"
+--
+-- main = 'runStreamT' $ liftIO $ putStrLn "Hello world!"
+-- @
+--
+-- 'toList' runs a stream computation and collects the result stream in a list
+-- in the underlying monad. 'toList' is a polymorphic function that works on
+-- multiple stream types belonging to the class 'Streaming'. Therefore, before
+-- you run a stream you need to tell how you want to interpret the stream by
+-- using one of the stream type combinators ('serially', 'asyncly', 'parallely'
+-- etc.). The combinator 'serially' is equivalent to annotating the type as @::
+-- StreamT@.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = do
+-- xs \<- 'toList' $ 'serially' $ return 1 <> return 2
+-- print xs
+-- @
+--
+-- For other ways to eliminate or fold a stream see the module
+-- "Streamly.Prelude".
+
+-- $semigroup
+-- Streams of the same type can be combined into a composite stream in many
+-- different ways using one of the semigroup style binary composition operators
+-- i.e. '<>', '<=>', '<|', '<|>', 'mplus'. These operators work on all stream
+-- types ('StreamT', 'AsyncT' etc.) uniformly.
+--
+-- To illustrate the concurrent aspects, we will use the following @delay@
+-- function to introduce a delay specified in seconds.
+--
+-- @
+-- import "Streamly"
+-- import Control.Concurrent
+--
+-- delay n = liftIO $ do
+-- threadDelay (n * 1000000)
+-- tid \<- myThreadId
+-- putStrLn (show tid ++ ": Delay " ++ show n)
+-- @
+
+-- $serial
+--
+-- We have already seen, the '<>' operator. It composes two streams in series
+-- i.e. the first stream is completely exhausted and then the second stream is
+-- processed. The following example prints the sequence 3, 2, 1 and takes a
+-- total of 6 seconds because everything is serial:
+--
+-- @
+-- main = 'runStreamT' $ delay 3 <> delay 2 <> delay 1
+-- @
+-- @
+-- ThreadId 36: Delay 3
+-- ThreadId 36: Delay 2
+-- ThreadId 36: Delay 1
+-- @
+
+-- $interleaved
+-- The '<=>' operator is serial like '<>' but it interleaves the two streams
+-- i.e. it yields one element from the first stream and then one element from
+-- the second stream, and so on. The following example prints the sequence 1,
+-- 3, 2, 4 and takes a total of 10 seconds because everything is serial:
+--
+-- @
+-- main = 'runStreamT' $ (delay 1 <> delay 2) '<=>' (delay 3 <> delay 4)
+-- @
+-- @
+-- ThreadId 36: Delay 1
+-- ThreadId 36: Delay 3
+-- ThreadId 36: Delay 2
+-- ThreadId 36: Delay 4
+-- @
+--
+-- Note that this operator cannot be used to fold infinite containers since it
+-- requires preserving the state until a stream is finished. To be clear, it
+-- can combine infinite streams but not infinite number of streams.
+
+-- $parallel
+--
+-- The '<|' operator can run both computations concurrently, /when needed/.
+-- In the following example since the first computation blocks we start the
+-- next one in a separate thread and so on:
+--
+-- @
+-- main = 'runStreamT' $ delay 3 '<|' delay 2 '<|' delay 1
+-- @
+-- @
+-- ThreadId 42: Delay 1
+-- ThreadId 41: Delay 2
+-- ThreadId 40: Delay 3
+-- @
+--
+-- This is the concurrent version of the '<>' operator. The computations are
+-- triggered in the same order as '<>' except that they are concurrent. When
+-- we have a tree of computations composed using this operator, the tree is
+-- traversed in DFS style just like '<>'.
+--
+-- @
+-- main = 'runStreamT' $ (p 1 '<|' p 2) '<|' (p 3 '<|' p 4)
+-- where p = liftIO . print
+-- @
+-- @
+-- 1
+-- 2
+-- 3
+-- 4
+-- @
+--
+-- Concurrency provided by this operator is demand driven. The second
+-- computation is run concurrently with the first only if the first computation
+-- is not producing enough output to keep the stream consumer busy otherwise
+-- the second computation is run serially after the previous one. The number of
+-- concurrent threads is adapted dynamically based on the pull rate of the
+-- consumer of the stream.
+-- As you can see, in the following example the computations are run in a
+-- single thread one after another, because none of them blocks. However, if
+-- the thread consuming the stream were faster than the producer then it would
+-- have started parallel threads for each computation to keep up even if none
+-- of them blocks:
+--
+-- @
+-- main = 'runStreamT' $ traced (sqrt 9) '<|' traced (sqrt 16) '<|' traced (sqrt 25)
+-- @
+-- @
+-- ThreadId 40
+-- ThreadId 40
+-- ThreadId 40
+-- @
+--
+-- Since the concurrency provided by this operator is demand driven it cannot
+-- be used when the composed computations have timers that are relative to each
+-- other because all computations may not be started at the same time and
+-- therefore timers in all of them may not start at the same time. When
+-- relative timing among all computations is important or when we need to start
+-- all computations at once for some reason '<|>' must be used instead.
+-- However, '<|' is useful in situations when we want to optimally utilize the
+-- resources and we know that the computations can run in parallel but we do
+-- not care if they actually run in parallel or not, that decision is left to
+-- the scheduler. Also, note that this operator can be used to fold infinite
+-- containers in contrast to '<|>', because it does not require us to run all
+-- of them at the same time.
+--
+-- The left bias (or the DFS style) of the operator '<|' is suggested by its
+-- shape. You can also think of this as an unbalanced version of the fairly
+-- parallel operator '<|>'.
+
+-- $fairParallel
+--
+-- The 'Alternative' composition operator '<|>', like '<|', runs the composed
+-- computations concurrently. However, unlike '<|' it runs all of the
+-- computations in fairly parallel manner using a round robin scheduling
+-- mechanism. This can be considered as the concurrent version of the fairly
+-- interleaved serial operation '<=>'. Note that this cannot be used on
+-- infinite containers, as it will lead to an infinite sized scheduling queue.
+--
+-- The following example sends a query to three search engines in parallel and
+-- prints the name of the search engine as a response arrives:
+--
+-- @
+-- import "Streamly"
+-- import Network.HTTP.Simple
+--
+-- main = 'runStreamT' $ google \<|> bing \<|> duckduckgo
+-- where
+-- google = get "https://www.google.com/search?q=haskell"
+-- bing = get "https://www.bing.com/search?q=haskell"
+-- duckduckgo = get "https://www.duckduckgo.com/?q=haskell"
+-- get s = liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s))
+-- @
+
+-- $custom
+--
+-- The 'async' API can be used to create references to asynchronously running
+-- stream computations. We can then use 'uncons' to explore the streams
+-- arbitrarily and then recompose individual elements to create a new stream.
+-- This way we can dynamically decide which stream to explore at any given
+-- time. Take an example of a merge sort of two sorted streams. We need to
+-- keep consuming items from the stream which has the lowest item in the sort
+-- order. This can be achieved using async references to streams. See
+-- "Streamly.Examples.MergeSortedStreams".
+
+-- $monoid
+--
+-- Each of the semigroup compositions described has an identity that can be
+-- used to fold a possibly empty container. An empty stream is represented by
+-- 'nil' which can be represented in various standard forms as 'mempty',
+-- 'empty' or 'mzero'.
+-- Some fold utilities are also provided by the library for convenience:
+--
+-- * 'foldWith' folds a 'Foldable' container of stream computations using the
+-- given composition operator.
+-- * 'foldMapWith' folds like foldWith but also maps a function before folding.
+-- * 'forEachWith' is like foldMapwith but the container argument comes before
+-- the function argument.
+-- * The 'each' primitive from "Streamly.Prelude" folds a 'Foldable' container
+-- using the '<>' operator:
+--
+-- All of the following are equivalent:
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = do
+-- 'toList' . 'serially' $ 'foldWith' (<>) (map return [1..10]) >>= print
+-- 'toList' . 'serially' $ 'foldMapWith' (<>) return [1..10] >>= print
+-- 'toList' . 'serially' $ 'forEachWith' (<>) [1..10] return >>= print
+-- 'toList' . 'serially' $ 'each' [1..10] >>= print
+-- @
+
+-- $transforming
+--
+-- The previous section discussed ways to merge the elements of two streams
+-- without doing any transformation on them. In this section we will explore
+-- how to transform streams using 'Functor', 'Applicative' or 'Monad' style
+-- compositions. The applicative and monad composition of all 'Streaming' types
+-- behave exactly the same way as a list transformer. For simplicity of
+-- illustration we are using streams of pure values in the following examples.
+-- However, the real application of streams arises when these streams are
+-- generated using monadic actions.
+
+-- $monad
+--
+-- In functional programmer's parlance the 'Monad' instance of 'Streaming'
+-- types implement non-determinism, exploring all possible combination of
+-- choices from both the streams. From an imperative programmer's point of view
+-- it behaves like nested loops i.e. for each element in the first stream and
+-- for each element in the second stream apply the body of the loop. If you are
+-- familiar with list transformer this behavior is exactly the same as that of
+-- a list transformer.
+--
+-- Just like we saw in sum style compositions earlier, monadic composition also
+-- has multiple variants each of which exactly corresponds to one of the sum
+-- style composition variant.
+
+-- $regularSerial
+--
+-- When we interpret the monadic composition as 'StreamT' we get a standard
+-- list transformer like serial composition.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = 'runStreamT' $ do
+-- x <- 'each' [3,2,1]
+-- delay x
+-- @
+-- @
+-- ThreadId 30: Delay 3
+-- ThreadId 30: Delay 2
+-- ThreadId 30: Delay 1
+-- @
+--
+-- As you can see the code after the @each@ statement is run three times, once
+-- for each value of @x@. All the three iterations are serial and run in the
+-- same thread one after another. When compared to imperative programming, this
+-- can also be viewed as a @for@ loop with three iterations.
+--
+-- A console echo loop copying standard input to standard output can simply be
+-- written like this:
+--
+-- @
+-- import "Streamly"
+-- import Data.Semigroup (cycle1)
+--
+-- main = 'runStreamT' $ cycle1 (liftIO getLine) >>= liftIO . putStrLn
+-- @
+--
+-- When multiple streams are composed using this style they nest in a DFS
+-- manner i.e. nested iterations of an iteration are executed before we proceed
+-- to the next iteration at higher level. This behaves just like nested @for@
+-- loops in imperative programming.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = 'runStreamT' $ do
+-- x <- 'each' [1,2]
+-- y <- 'each' [3,4]
+-- liftIO $ putStrLn $ show (x, y)
+-- @
+-- @
+-- (1,3)
+-- (1,4)
+-- (2,3)
+-- (2,4)
+-- @
+--
+-- You will also notice that this is the monadic equivalent of the sum style
+-- composition using '<>'.
+
+-- $concurrentNesting
+--
+-- When we interpret the monadic composition as 'AsyncT' we get a /concurrent/
+-- list-transformer like composition. Multiple monadic continuations (or loop
+-- iterations) may be started concurrently. Concurrency is demand driven
+-- i.e. more concurrent iterations are started only if the previous iterations
+-- are not able to produce enough output for the consumer of the output stream.
+-- This is the concurrent version of 'StreamT'.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = 'runAsyncT' $ do
+-- x <- 'each' [3,2,1]
+-- delay x
+-- @
+-- @
+-- ThreadId 40: Delay 1
+-- ThreadId 39: Delay 2
+-- ThreadId 38: Delay 3
+-- @
+--
+-- As you can see the code after the @each@ statement is run three times, once
+-- for each value of @x@. All the three iterations are concurrent and run in
+-- different threads. The iteration with least delay finishes first. When
+-- compared to imperative programming, this can be viewed as a @for@ loop
+-- with three concurrent iterations.
+--
+-- Concurrency is demand driven just as in the case of '<|'. When multiple
+-- streams are composed using this style the iterations are triggered in a DFS
+-- manner just like 'StreamT' i.e. nested iterations are executed before we
+-- proceed to the next iteration at higher level. However, unlike 'StreamT'
+-- more than one iterations may be started concurrently, and based on the
+-- demand from the consumer.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = 'runAsyncT' $ do
+-- x <- 'each' [1,2]
+-- y <- 'each' [3,4]
+-- liftIO $ putStrLn $ show (x, y)
+-- @
+-- @
+-- (1,3)
+-- (1,4)
+-- (2,3)
+-- (2,4)
+-- @
+--
+-- You will notice that this is the monadic equivalent of the '<|' style
+-- sum composition. The same caveats apply to this as the '<|' operation.
+
+-- $interleavedNesting
+--
+-- When we interpret the monadic composition as 'InterleavedT' we get a serial
+-- but fairly interleaved list-transformer like composition. The monadic
+-- continuations or iterations of the outer loop are fairly interleaved with
+-- the continuations or iterations of the inner loop.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = 'runInterleavedT' $ do
+-- x <- 'each' [1,2]
+-- y <- 'each' [3,4]
+-- liftIO $ putStrLn $ show (x, y)
+-- @
+-- @
+-- (1,3)
+-- (2,3)
+-- (1,4)
+-- (2,4)
+-- @
+--
+-- You will notice that this is the monadic equivalent of the '<=>' style
+-- sum composition. The same caveats apply to this as the '<=>' operation.
+
+-- $fairlyConcurrentNesting
+--
+-- When we interpret the monadic composition as 'ParallelT' we get a
+-- /concurrent/ list-transformer like composition just like 'AsyncT'. The
+-- difference is that this is fully parallel with all iterations starting
+-- concurrently instead of the demand driven concurrency of 'AsyncT'.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+-- main = 'runParallelT' $ do
+-- x <- 'each' [3,2,1]
+-- delay x
+-- @
+-- @
+-- ThreadId 40: Delay 1
+-- ThreadId 39: Delay 2
+-- ThreadId 38: Delay 3
+-- @
+--
+-- You will notice that this is the monadic equivalent of the '<|>' style
+-- sum composition. The same caveats apply to this as the '<|>' operation.
+
+-- $monadExercise
+--
+-- The streamly code is usually written in a way that is agnostic of the
+-- specific monadic composition type. We use a polymorphic type with a
+-- 'Streaming' type class constraint. When running the stream we can choose the
+-- specific mode of composition. For example look at the following code.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+--
+--
+-- composed :: 'Streaming' t => t m a
+-- composed = do
+-- sz <- sizes
+-- cl <- colors
+-- sh <- shapes
+-- liftIO $ putStrLn $ show (sz, cl, sh)
+--
+-- where
+--
+-- sizes = 'each' [1, 2, 3]
+-- colors = 'each' ["red", "green", "blue"]
+-- shapes = 'each' ["triangle", "square", "circle"]
+-- @
+--
+-- Now we can interpret this in whatever way we want:
+--
+-- @
+-- main = 'runStreamT' composed
+-- main = 'runAsyncT' composed
+-- main = 'runInterleavedT' composed
+-- main = 'runParallelT' composed
+-- @
+--
+-- Equivalently, we can also write it using the type adapter combinators, like
+-- this:
+--
+-- @
+-- main = 'runStreaming' $ 'serially' $ composed
+-- main = 'runStreaming' $ 'asyncly' $ composed
+-- main = 'runStreaming' $ 'interleaving' $ composed
+-- main = 'runStreaming' $ 'parallely' $ composed
+-- @
+--
+-- As an exercise try to figure out the output of this code for each mode of
+-- composition.
+
+-- $functor
+--
+-- 'fmap' transforms a stream by mapping a function on all elements of the
+-- stream. The functor instance of each stream type defines 'fmap' to be
+-- precisely the same as 'liftM', and therefore 'fmap' is always serial
+-- irrespective of the type. For concurrent mapping, alternative versions of
+-- 'fmap', namely, 'asyncMap' and 'parMap' are provided.
+--
+-- @
+-- import "Streamly"
+--
+-- main = ('toList' $ 'serially' $ fmap show $ 'each' [1..10]) >>= print
+-- @
+--
+-- Also see the 'mapM' and 'sequence' functions for mapping actions, in the
+-- "Streamly.Prelude" module.
+
+-- $applicative
+--
+-- Applicative is precisely the same as the 'ap' operation of 'Monad'. For
+-- zipping and parallel applicatives separate types 'ZipStream' and 'ZipAsync'
+-- are provided.
+--
+-- The following example runs all iterations serially and takes a total 17
+-- seconds (1 + 3 + 4 + 2 + 3 + 4):
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+-- import Control.Concurrent
+--
+-- s1 = d 1 <> d 2
+-- s2 = d 3 <> d 4
+-- d n = delay n >> return n
+--
+-- main = ('toList' . 'serially' $ (,) \<$> s1 \<*> s2) >>= print
+-- @
+-- @
+-- ThreadId 36: Delay 1
+-- ThreadId 36: Delay 3
+-- ThreadId 36: Delay 4
+-- ThreadId 36: Delay 2
+-- ThreadId 36: Delay 3
+-- ThreadId 36: Delay 4
+-- [(1,3),(1,4),(2,3),(2,4)]
+-- @
+--
+-- Similalrly interleaving runs the iterations in an interleaved order but
+-- since it is serial it takes a total of 17 seconds:
+--
+-- @
+-- main = ('toList' . 'interleaving' $ (,) \<$> s1 \<*> s2) >>= print
+-- @
+-- @
+-- ThreadId 36: Delay 1
+-- ThreadId 36: Delay 3
+-- ThreadId 36: Delay 2
+-- ThreadId 36: Delay 3
+-- ThreadId 36: Delay 4
+-- ThreadId 36: Delay 4
+-- [(1,3),(2,3),(1,4),(2,4)]
+-- @
+--
+-- 'AsyncT' can run the iterations concurrently and therefore takes a total
+-- of 10 seconds (1 + 2 + 3 + 4):
+--
+-- @
+-- main = ('toList' . 'asyncly' $ (,) \<$> s1 \<*> s2) >>= print
+-- @
+-- @
+-- ThreadId 34: Delay 1
+-- ThreadId 36: Delay 2
+-- ThreadId 35: Delay 3
+-- ThreadId 36: Delay 3
+-- ThreadId 35: Delay 4
+-- ThreadId 36: Delay 4
+-- [(1,3),(2,3),(1,4),(2,4)]
+-- @
+--
+-- Similalrly 'ParallelT' as well can run the iterations concurrently and
+-- therefore takes a total of 10 seconds (1 + 2 + 3 + 4):
+--
+-- @
+-- main = ('toList' . 'parallely' $ (,) \<$> s1 \<*> s2) >>= print
+-- @
+-- @
+-- ThreadId 34: Delay 1
+-- ThreadId 36: Delay 2
+-- ThreadId 35: Delay 3
+-- ThreadId 36: Delay 3
+-- ThreadId 35: Delay 4
+-- ThreadId 36: Delay 4
+-- [(1,3),(2,3),(1,4),(2,4)]
+-- @
+
+-- $compositionSummary
+--
+-- The following table summarizes the types for monadic compositions and the
+-- operators for sum style compositions. This table captures the essence of
+-- streamly.
+--
+-- @
+-- +-----+--------------+------------+
+-- | | Serial | Concurrent |
+-- +=====+==============+============+
+-- | DFS | 'StreamT' | 'AsyncT' |
+-- | +--------------+------------+
+-- | | '<>' | '<|' |
+-- +-----+--------------+------------+
+-- | BFS | 'InterleavedT' | 'ParallelT' |
+-- | +--------------+------------+
+-- | | '<=>' | '<|>' |
+-- +-----+--------------+------------+
+-- @
+
+-- $zipping
+--
+-- Zipping is a special transformation where the corresponding elements of two
+-- streams are combined together using a zip function producing a new stream of
+-- outputs. Two different types are provided for serial and concurrent zipping.
+-- These types provide an applicative instance that zips the argument streams.
+-- Also see the zipping function in the "Streamly.Prelude" module.
+
+-- $serialzip
+--
+-- 'ZipStream' zips streams serially:
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+-- import Control.Concurrent
+--
+-- d n = delay n >> return n
+-- s1 = 'adapt' . 'serially' $ d 1 <> d 2
+-- s2 = 'adapt' . 'serially' $ d 3 <> d 4
+--
+-- main = ('toList' . 'zipping' $ (,) \<$> s1 \<*> s2) >>= print
+-- @
+--
+-- This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since
+-- everything runs serially:
+--
+-- @
+-- ThreadId 29: Delay 1
+-- ThreadId 29: Delay 3
+-- ThreadId 29: Delay 2
+-- ThreadId 29: Delay 4
+-- [(1,3),(2,4)]
+-- @
+
+-- $parallelzip
+--
+-- 'ZipAsync' zips streams concurrently:
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+-- import Control.Concurrent
+-- import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
+--
+-- d n = delay n >> return n
+-- s1 = 'adapt' . 'serially' $ d 1 <> d 2
+-- s2 = 'adapt' . 'serially' $ d 3 <> d 4
+--
+-- main = do
+-- liftIO $ hSetBuffering stdout LineBuffering
+-- ('toList' . 'zippingAsync' $ (,) \<$> s1 \<*> s2) >>= print
+-- @
+--
+-- This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3
+-- are produced concurrently, and 2 and 4 are produced concurrently:
+--
+-- @
+-- ThreadId 32: Delay 1
+-- ThreadId 32: Delay 2
+-- ThreadId 33: Delay 3
+-- ThreadId 33: Delay 4
+-- [(1,3),(2,4)]
+-- @
+
+-- $concurrent
+--
+-- When writing concurrent programs there are two distinct places where the
+-- programmer chooses the type of concurrency. First, when /generating/ a
+-- stream by combining other streams we can use one of the sum style operators
+-- to combine them concurrently or serially. Second, when /processing/ a stream
+-- in a monadic composition we can choose one of the monad composition types to
+-- choose the desired type of concurrency.
+--
+-- In the following example the squares of @x@ and @y@ are computed
+-- concurrently using the '<|' operator and the square roots of their sum are
+-- also computed concurrently by using the 'asyncly' combinator. We can choose
+-- different combinators e.g. '<>' and 'serially', to control the concurrency.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude" (toList)
+-- import Data.List (sum)
+--
+-- main = do
+-- z \<- 'toList'
+-- $ 'asyncly' -- Concurrent monadic processing (sqrt below)
+-- $ do
+-- x2 \<- 'forEachWith' ('<|') [1..100] $ -- Concurrent @"for"@ loop
+-- \\x -> return $ x * x -- body of the loop
+-- y2 \<- 'forEachWith' ('<|') [1..100] $
+-- \\y -> return $ y * y
+-- return $ sqrt (x2 + y2)
+-- print $ sum z
+-- @
+--
+-- You can see how this directly maps to the imperative style
+-- <https://en.wikipedia.org/wiki/OpenMP OpenMP> model, we use combinators
+-- and operators instead of the ugly pragmas.
+--
+-- For more concurrent programming examples see,
+-- "Streamly.Examples.ListDirRecursive", "Streamly.Examples.MergeSortedStreams"
+-- and "Streamly.Examples.SearchEngineQuery".
+
+-- $reactive
+--
+-- Reactive programming is nothing but concurrent streaming which is what
+-- streamly is all about. With streamly we can generate streams of events,
+-- merge streams that are generated concurrently and process events
+-- concurrently. We can do all this without any knowledge about the specifics
+-- of the implementation of concurrency. In the following example you will see
+-- that the code is just regular Haskell code without much streamly APIs used
+-- (active hyperlinks are the streamly APIs) and yet it is a reactive
+-- application.
+--
+--
+-- This application has two independent and concurrent sources of event
+-- streams, @acidRain@ and @userAction@. @acidRain@ continuously generates
+-- events that deteriorate the health of the game character. @userAction@ can
+-- be "potion" or "quit". When the user types "potion" the health improves and
+-- the game continues.
+--
+-- @
+-- {-\# LANGUAGE FlexibleContexts #-}
+--
+-- import "Streamly"
+-- import Control.Concurrent (threadDelay)
+-- import Control.Monad (when)
+-- import Control.Monad.State
+-- import Data.Semigroup (cycle1)
+--
+-- data Event = Harm Int | Heal Int | Quit deriving (Show)
+--
+-- userAction :: MonadIO m => 'StreamT' m Event
+-- userAction = cycle1 $ liftIO askUser
+-- where
+-- askUser = do
+-- command <- getLine
+-- case command of
+-- "potion" -> return (Heal 10)
+-- "quit" -> return Quit
+-- _ -> putStrLn "What?" >> askUser
+--
+-- acidRain :: MonadIO m => 'StreamT' m Event
+-- acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1)
+--
+-- game :: ('MonadAsync' m, MonadState Int m) => 'StreamT' m ()
+-- game = do
+-- event \<- userAction \<|> acidRain
+-- case event of
+-- Harm n -> modify $ \\h -> h - n
+-- Heal n -> modify $ \\h -> h + n
+-- Quit -> fail "quit"
+--
+-- h <- get
+-- when (h <= 0) $ fail "You die!"
+-- liftIO $ putStrLn $ "Health = " ++ show h
+--
+-- main = do
+-- putStrLn "Your health is deteriorating due to acid rain,\\
+-- \\ type \\"potion\\" or \\"quit\\""
+-- _ <- runStateT ('runStreamT' game) 60
+-- return ()
+-- @
+--
+-- You can also find the source of this example in
+-- "Streamly.Examples.AcidRainGame". It has been adapted from Gabriel's
+-- <https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html pipes-concurrency>
+-- package.
+-- This is much simpler compared to the pipes version because of the builtin
+-- concurrency in streamly. You can also find a SDL based reactive programming
+-- example adapted from Yampa in "Streamly.Examples.CirclingSquare".
+
+-- $performance
+--
+-- Streamly is highly optimized for performance, it is designed for serious
+-- high performing, concurrent and scalable applications. We have created the
+-- <https://hackage.haskell.org/package/streaming-benchmarks streaming-benchmarks>
+-- package which is specifically and carefully designed to measure the
+-- performance of Haskell streaming libraries fairly and squarely in the right
+-- way. Streamly performs at par or even better than most streaming libraries
+-- for common operations even though it needs to deal with the concurrency
+-- capability.
+
+-- $interop
+--
+-- We can use @unfoldr@ and @uncons@ to convert one streaming type to another.
+-- We will assume the following common code to be available in the examples
+-- demonstrated below.
+--
+-- @
+-- import "Streamly"
+-- import "Streamly.Prelude"
+-- import System.IO (stdin)
+--
+-- -- Adapt uncons to return an Either instead of Maybe
+-- unconsE s = 'uncons' s >>= maybe (return $ Left ()) (return . Right)
+-- stdinLn = 'serially' $ 'fromHandle' stdin
+-- @
+--
+-- Interop with @pipes@:
+--
+-- @
+-- import qualified Pipes as P
+-- import qualified Pipes.Prelude as P
+--
+-- main = do
+-- -- streamly to pipe
+-- P.runEffect $ P.for (P.unfoldr unconsE stdinLn) (lift . putStrLn)
+--
+-- -- pipe to streamly
+-- -- Adapt P.next to return a Maybe instead of Either
+-- let nextM p = P.next p >>= either (\\_ -> return Nothing) (return . Just)
+-- 'runStreamT' $ 'unfoldrM' nextM P.stdinLn >>= lift . putStrLn
+-- @
+--
+-- Interop with @streaming@:
+--
+-- @
+-- import qualified Streaming as S
+-- import qualified Streaming.Prelude as S
+--
+-- main = do
+-- -- streamly to streaming
+-- S.stdoutLn $ S.unfoldr unconsE stdinLn
+--
+-- -- streaming to streamly
+-- 'runStreamT' $ unfoldrM S.uncons S.stdinLn >>= lift . putStrLn
+--
+-- @
+--
+-- Interop with @conduit@:
+--
+-- @
+-- import qualified Data.Conduit as C
+-- import qualified Data.Conduit.List as C
+-- import qualified Data.Conduit.Combinators as C
+--
+-- -- streamly to conduit
+-- main = (C.unfoldM 'uncons' stdinLn) C.$$ C.print
+-- @
+
+-- $comparison
+--
+-- Streamly unifies non-determinism, streaming, concurrency and FRP
+-- functionality that is otherwise covered by several disparate packages, and
+-- it does that with a surprisingly concise API. Here is a list of popular and
+-- well-known packages in all these areas:
+--
+-- @
+-- +-----------------+----------------+
+-- | Non-determinism | <https://hackage.haskell.org/package/list-t list-t> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/logict logict> |
+-- +-----------------+----------------+
+-- | Streaming | <https://hackage.haskell.org/package/streaming streaming> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/conduit conduit> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/pipes pipes> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/simple-conduit simple-conduit> |
+-- +-----------------+----------------+
+-- | Concurrency | <https://hackage.haskell.org/package/async async> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/transient transient> |
+-- +-----------------+----------------+
+-- | FRP | <https://hackage.haskell.org/package/Yampa Yampa> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/dunai dunai> |
+-- | +----------------+
+-- | | <https://hackage.haskell.org/package/reflex reflex> |
+-- +-----------------+----------------+
+-- @
+--
+-- Streamly covers all the functionality provided by both the non-determinism
+-- packages listed above and provides better performance in comparison to
+-- those. In fact, at the core streamly is a list transformer but it naturally
+-- integrates the concurrency dimension to the basic list transformer
+-- functionality.
+--
+-- When it comes to streaming, in terms of core concepts, @simple-conduit@ is
+-- the package that is closest to streamly if we set aside the concurrency
+-- dimension, both are streaming packages with list transformer like monad
+-- composition. However, in terms of API it is more like the @streaming@
+-- package. Streamly can be used to achieve more or less the functionality
+-- provided by any of the streaming packages listed above. The types and API of
+-- streamly are much simpler in comparison to conduit and pipes. It is more or
+-- less like the standard Haskell list APIs.
+--
+-- When it comes to concurrency, streamly can do everything that the @async@
+-- package can do and more. async provides applicative concurrency whereas
+-- streamly provides both applicative and monadic concurrency. The 'ZipAsync'
+-- type behaves like the applicative instance of async. This work was
+-- originally inspired by the concurrency implementation in @transient@ though
+-- it has no resemblence with that. Streamly provides concurrency as transient
+-- does but in a sort of dual manner, it can lazily stream the output. In
+-- comparison to transient streamly has a first class streaming interface and
+-- is a monad transformer that can be used universally in any Haskell monad
+-- transformer stack.
+--
+-- The non-determinism, concurrency and streaming combination make streamly a
+-- strong FRP capable library as well. FRP is fundamentally stream of events
+-- that can be processed concurrently. The example in this tutorial as well as
+-- the "Streamly.Examples.CirclingSquare" example from Yampa demonstrate the
+-- basic FRP capability of streamly. In core concepts streamly is strikingly
+-- similar to @dunai@. dunai was designed from a FRP perspective and streamly
+-- wa original designed from a concurrency perspective. However, both have
+-- similarity at the core.
diff --git a/stack-7.10.yaml b/stack-7.10.yaml
new file mode 100644
index 0000000..b8e2d6a
--- /dev/null
+++ b/stack-7.10.yaml
@@ -0,0 +1,16 @@
+resolver: lts-6.35
+packages:
+- '.'
+extra-deps:
+ - lockfree-queue-0.2.3.1
+ - simple-conduit-0.4.0
+ - transient-0.5.9.2
+ - http-conduit-2.2.2
+ - http-client-0.5.0
+ - http-client-tls-0.3.0
+ - SDL-0.6.5.1
+flags: {}
+extra-package-dbs: []
+# For mac ports installed SDL library on Mac OS X
+#extra-include-dirs:
+#- /opt/local/include
diff --git a/stack-8.0.yaml b/stack-8.0.yaml
new file mode 100644
index 0000000..df4e5e1
--- /dev/null
+++ b/stack-8.0.yaml
@@ -0,0 +1,17 @@
+resolver: lts-7.24
+packages:
+- '.'
+extra-deps:
+ - lockfree-queue-0.2.3.1
+ - simple-conduit-0.6.0
+ - transient-0.4.4
+ - monad-recorder-0.1.0
+ - http-conduit-2.2.2
+ - http-client-0.5.0
+ - http-client-tls-0.3.0
+ - SDL-0.6.5.1
+flags: {}
+extra-package-dbs: []
+# For mac ports installed SDL library on Mac OS X
+#extra-include-dirs:
+#- /opt/local/include
diff --git a/stack.yaml b/stack.yaml
new file mode 100644
index 0000000..c184a3e
--- /dev/null
+++ b/stack.yaml
@@ -0,0 +1,14 @@
+#resolver: lts-9.2
+resolver: nightly-2017-09-07
+packages:
+- '.'
+extra-deps:
+ - lockfree-queue-0.2.3.1
+ - simple-conduit-0.6.0
+ - SDL-0.6.5.1
+flags: {}
+extra-package-dbs: []
+rebuild-ghc-options: true
+# For mac ports installed SDL library on Mac OS X
+#extra-include-dirs:
+#- /opt/local/include
diff --git a/streamly.cabal b/streamly.cabal
new file mode 100644
index 0000000..70eda1a
--- /dev/null
+++ b/streamly.cabal
@@ -0,0 +1,234 @@
+name: streamly
+version: 0.1.0
+synopsis: Beautiful Streaming, Concurrent and Reactive Composition
+description:
+ Streamly is a monad transformer unifying non-determinism
+ (<https://hackage.haskell.org/package/list-t list-t>\/<https://hackage.haskell.org/package/logict logict>),
+ concurrency (<https://hackage.haskell.org/package/async async>),
+ streaming (<https://hackage.haskell.org/package/conduit conduit>\/<https://hackage.haskell.org/package/pipes pipes>),
+ and FRP (<https://hackage.haskell.org/package/Yampa Yampa>\/<https://hackage.haskell.org/package/reflex reflex>)
+ functionality in a concise and intuitive API.
+ High level concurrency makes concurrent applications almost indistinguishable
+ from non-concurrent ones. By changing a single combinator you can control
+ whether the code runs serially or concurrently. It naturally integrates
+ concurrency with streaming rather than adding it as an afterthought.
+ Moreover, it interworks with the popular streaming libraries.
+ .
+ See the README for an overview and the haddock documentation for full
+ reference. It is recommended to read the comprehensive tutorial module
+ "Streamly.Tutorial" first. Also see "Streamly.Examples" for some working
+ examples.
+
+homepage: http://github.com/harendra-kumar/streamly
+bug-reports: https://github.com/harendra-kumar/streamly/issues
+license: BSD3
+license-file: LICENSE
+tested-with: GHC==7.10.3, GHC==8.0.2, GHC==8.2.1
+author: Harendra Kumar
+maintainer: harendra.kumar@gmail.com
+copyright: 2017 Harendra Kumar
+category: Control, Concurrency, Streaming, Reactivity
+stability: Experimental
+build-type: Simple
+cabal-version: >= 1.10
+
+extra-source-files:
+ Changelog.md
+ README.md
+ stack-7.10.yaml
+ stack-8.0.yaml
+ stack.yaml
+
+source-repository head
+ type: git
+ location: https://github.com/harendra-kumar/streamly
+
+flag dev
+ description: Build development version
+ manual: True
+ default: False
+
+flag extra-benchmarks
+ description: Include comparative benchmarks
+ manual: True
+ default: False
+
+flag examples
+ description: Build examples
+ manual: True
+ default: False
+
+flag examples-sdl
+ description: Include examples that use SDL dependency
+ manual: True
+ default: False
+
+library
+ hs-source-dirs: src
+ other-modules: Streamly.Core
+ , Streamly.Streams
+
+ exposed-modules: Streamly.Prelude
+ , Streamly.Time
+ , Streamly.Tutorial
+ , Streamly
+
+ if flag(examples) || flag(examples-sdl)
+ exposed-modules: Streamly.Examples
+ , Streamly.Examples.SearchEngineQuery
+ , Streamly.Examples.ListDirRecursive
+ , Streamly.Examples.MergeSortedStreams
+ , Streamly.Examples.AcidRainGame
+
+ if flag(examples-sdl)
+ exposed-modules: Streamly.Examples.CirclingSquare
+
+ default-language: Haskell2010
+ ghc-options: -Wall
+
+ if flag(dev)
+ ghc-options: -Wmissed-specialisations
+ -Wall-missed-specialisations
+ -fno-ignore-asserts
+ if impl(ghc >= 8.0)
+ ghc-options: -Wcompat
+ -Wunrecognised-warning-flags
+ -Widentities
+ -Wincomplete-record-updates
+ -Wincomplete-uni-patterns
+ -Wredundant-constraints
+ -Wnoncanonical-monad-instances
+ -Wnoncanonical-monadfail-instances
+ if flag(examples-sdl)
+ cpp-options: -DEXAMPLES_SDL
+
+ build-depends: base >= 4.8 && < 5
+ , atomic-primops >= 0.8 && < 0.9
+ , containers >= 0.5 && < 0.6
+ , exceptions >= 0.8 && < 0.9
+ , lifted-base >= 0.2 && < 0.3
+ , lockfree-queue >= 0.2.3 && < 0.3
+ , monad-control >= 1.0 && < 2
+ , mtl >= 2.2 && < 3
+ , stm >= 2.4.3 && < 2.5
+ , transformers >= 0.4 && < 0.6
+ , transformers-base >= 0.4 && < 0.5
+
+ if impl(ghc < 8.0)
+ build-depends:
+ semigroups >= 0.18 && < 0.19
+
+ if flag(examples) || flag(examples-sdl)
+ build-Depends:
+ http-conduit >= 2.2.2 && < 2.3
+ , path-io >= 0.1.0 && < 1.4
+ , random >= 1.0.0 && < 1.2
+
+ if flag(examples-sdl)
+ build-Depends:
+ SDL >= 0.6.5 && < 0.7
+
+test-suite test
+ type: exitcode-stdio-1.0
+ main-is: Main.hs
+ hs-source-dirs: test
+ ghc-options: -O0 -Wall
+ if flag(dev)
+ ghc-options: -Wmissed-specialisations
+ -Wall-missed-specialisations
+ if impl(ghc >= 8.0)
+ ghc-options: -Wcompat
+ -Wunrecognised-warning-flags
+ -Widentities
+ -Wincomplete-record-updates
+ -Wincomplete-uni-patterns
+ -Wredundant-constraints
+ -Wnoncanonical-monad-instances
+ -Wnoncanonical-monadfail-instances
+ build-depends:
+ streamly
+ , base >= 4.8 && < 5
+ , hspec >= 2.0 && < 3
+ , containers >= 0.5 && < 0.6
+ if impl(ghc < 8.0)
+ build-depends:
+ transformers >= 0.4 && < 0.6
+ default-language: Haskell2010
+
+benchmark bench
+ type: exitcode-stdio-1.0
+ main-is: Main.hs
+ hs-source-dirs: benchmark
+ ghc-options: -O2 -Wall
+ if flag(dev)
+ ghc-options: -Wmissed-specialisations
+ -Wall-missed-specialisations
+ -fno-ignore-asserts
+ if impl(ghc >= 8.0)
+ ghc-options: -Wcompat
+ -Wunrecognised-warning-flags
+ -Widentities
+ -Wincomplete-record-updates
+ -Wincomplete-uni-patterns
+ -Wredundant-constraints
+ -Wnoncanonical-monad-instances
+ -Wnoncanonical-monadfail-instances
+ build-depends:
+ streamly
+ , atomic-primops >= 0.8 && < 0.9
+ , base >= 4.8 && < 5
+ , criterion >= 1 && < 2
+ , mtl >= 2.2 && < 3
+
+ if impl(ghc < 8.0)
+ build-depends:
+ transformers >= 0.4 && < 0.6
+
+ if flag(extra-benchmarks)
+ cpp-options: -DEXTRA_BENCHMARKS
+ build-depends:
+ list-t >= 0.4 && < 2
+ , logict >= 0.6 && < 0.7
+ , machines >= 0.5 && < 0.7
+ , simple-conduit >= 0.6 && < 0.7
+ , transient >= 0.4 && < 0.6
+ default-language: Haskell2010
+
+-------------------------------------------------------------------------------
+-- Examples
+-------------------------------------------------------------------------------
+
+executable loops
+ main-is: loops.hs
+ hs-source-dirs: examples
+ if flag(examples)
+ buildable: True
+ build-Depends:
+ streamly
+ , base >= 4.8 && < 5
+ else
+ buildable: False
+
+executable nested-loops
+ main-is: nested-loops.hs
+ hs-source-dirs: examples
+ if flag(examples)
+ buildable: True
+ build-Depends:
+ streamly
+ , base >= 4.8 && < 5
+ , random >= 1.0.0 && < 1.2
+ else
+ buildable: False
+
+executable parallel-loops
+ main-is: parallel-loops.hs
+ hs-source-dirs: examples
+ if flag(examples)
+ buildable: True
+ build-Depends:
+ streamly
+ , base >= 4.8 && < 5
+ , random >= 1.0.0 && < 1.2
+ else
+ buildable: False
diff --git a/test/Main.hs b/test/Main.hs
new file mode 100644
index 0000000..198e6d5
--- /dev/null
+++ b/test/Main.hs
@@ -0,0 +1,618 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE RankNTypes #-}
+
+module Main (main) where
+
+import Control.Concurrent (threadDelay)
+import Data.Foldable (forM_)
+import Data.List (sort)
+import Test.Hspec
+
+import Streamly
+import qualified Streamly.Prelude as A
+
+toListSerial :: StreamT IO a -> IO [a]
+toListSerial = A.toList . serially
+
+toListInterleaved :: InterleavedT IO a -> IO [a]
+toListInterleaved = A.toList . interleaving
+
+toListAsync :: AsyncT IO a -> IO [a]
+toListAsync = A.toList . asyncly
+
+toListParallel :: ParallelT IO a -> IO [a]
+toListParallel = A.toList . parallely
+
+main :: IO ()
+main = hspec $ do
+ describe "Runners" $ do
+ it "simple serially" $
+ (runStreaming . serially) (return (0 :: Int)) `shouldReturn` ()
+ it "simple serially with IO" $
+ (runStreaming . serially) (liftIO $ putStrLn "hello") `shouldReturn` ()
+ it "Captures a return value using toList" $
+ toListSerial (return 0) `shouldReturn` ([0] :: [Int])
+
+ describe "Empty" $ do
+ it "Monoid - mempty" $
+ (toListSerial mempty) `shouldReturn` ([] :: [Int])
+ it "Alternative - empty" $
+ (toListSerial empty) `shouldReturn` ([] :: [Int])
+ it "MonadPlus - mzero" $
+ (toListSerial mzero) `shouldReturn` ([] :: [Int])
+
+ ---------------------------------------------------------------------------
+ -- Functor
+ ---------------------------------------------------------------------------
+
+ describe "Functor (fmap)" $ do
+ it "Simple fmap" $
+ (toListSerial $ fmap (+1) (return 1)) `shouldReturn` ([2] :: [Int])
+ it "fmap on composed (<>)" $
+ (toListSerial $ fmap (+1) (return 1 <> return 2))
+ `shouldReturn` ([2,3] :: [Int])
+ it "fmap on composed (<|>)" $
+ (toListSerial $ fmap (+1) (return 1 <|> return 2))
+ `shouldReturn` ([2,3] :: [Int])
+
+ ---------------------------------------------------------------------------
+ -- Applicative
+ ---------------------------------------------------------------------------
+
+ describe "Applicative" $ do
+ it "Simple apply" $
+ (toListSerial $ (,) <$> (return 1) <*> (return 2))
+ `shouldReturn` ([(1,2)] :: [(Int, Int)])
+
+ it "Apply - serial composed first argument" $
+ (toListSerial $ (,) <$> (return 1 <> return 2) <*> (return 3))
+ `shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)])
+
+ it "Apply - serial composed second argument" $
+ (toListSerial $ (,) <$> (return 1) <*> (return 2 <> return 3))
+ `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
+
+ it "Apply - parallel composed first argument" $
+ (toListSerial $ (,) <$> (return 1 <|> return 2) <*> (return 3))
+ `shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)])
+
+ it "Apply - parallel composed second argument" $
+ (toListSerial $ (,) <$> (return 1) <*> (return 2 <|> return 3))
+ `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
+
+ ---------------------------------------------------------------------------
+ -- Binds
+ ---------------------------------------------------------------------------
+
+ describe "Bind then" thenBind
+ describe "Pure bind serial" $ pureBind toListSerial
+ describe "Pure bind serial interleaved" $ pureBind toListInterleaved
+ describe "Pure bind parallel DFS" $ pureBind toListAsync
+ describe "Pure bind parallel BFS" $ pureBind toListParallel
+
+ describe "Bind (>>=) with empty" $ bindEmpty toListSerial
+ describe "Bind (>->) with empty" $ bindEmpty toListInterleaved
+ describe "Bind (>|>) with empty" $ bindEmpty toListAsync
+ describe "Bind (>>|) with empty" $ bindEmpty toListParallel
+
+ ---------------------------------------------------------------------------
+ -- Monoidal Compositions
+ ---------------------------------------------------------------------------
+
+ describe "Serial Composition (<>)" $ compose (<>) id
+ describe "Serial Composition (mappend)" $ compose mappend id
+ describe "Interleaved Composition (<>)" $ compose (<=>) sort
+ describe "Left biased parallel Composition (<|)" $ compose (<|) sort
+ describe "Fair parallel Composition (<|>)" $ compose (<|>) sort
+ describe "Fair parallel Composition (mplus)" $ compose mplus sort
+
+ ---------------------------------------------------------------------------
+ -- Monoidal Composition ordering checks
+ ---------------------------------------------------------------------------
+
+ describe "Serial interleaved ordering check (<=>)" $ interleaveCheck (<=>)
+ describe "Parallel interleaved ordering check (<|>)" $ interleaveCheck (<|>)
+ describe "Left biased parallel time order check" $ parallelCheck (<|)
+ describe "Fair parallel time order check" $ parallelCheck (<|>)
+
+ ---------------------------------------------------------------------------
+ -- TBD Monoidal composition combinations
+ ---------------------------------------------------------------------------
+
+ -- TBD need more such combinations to be tested.
+ describe "<> and <>" $ composeAndComposeSimple (<>) (<>) (cycle [[1 .. 9]])
+
+ describe "<> and <=>" $ composeAndComposeSimple
+ (<>)
+ (<=>)
+ ([ [1 .. 9]
+ , [1 .. 9]
+ , [1, 3, 2, 4, 6, 5, 7, 9, 8]
+ , [1, 3, 2, 4, 6, 5, 7, 9, 8]
+ ])
+
+ describe "<=> and <=>" $ composeAndComposeSimple
+ (<=>)
+ (<=>)
+ ([ [1, 4, 2, 7, 3, 5, 8, 6, 9]
+ , [1, 7, 4, 8, 2, 9, 5, 3, 6]
+ , [1, 4, 3, 7, 2, 6, 9, 5, 8]
+ , [1, 7, 4, 9, 3, 8, 6, 2, 5]
+ ])
+
+ describe "<=> and <>" $ composeAndComposeSimple
+ (<=>)
+ (<>)
+ ([ [1, 4, 2, 7, 3, 5, 8, 6, 9]
+ , [1, 7, 4, 8, 2, 9, 5, 3, 6]
+ , [1, 4, 2, 7, 3, 5, 8, 6, 9]
+ , [1, 7, 4, 8, 2, 9, 5, 3, 6]
+ ])
+
+ describe "Nested parallel and serial compositions" $ do
+ {-
+ -- This is not correct, the result can also be [4,4,8,0,8,0,2,2]
+ -- because of parallelism of [8,0] and [8,0].
+ it "Nest <|>, <>, <|> (1)" $
+ let t = timed
+ in toListSerial (
+ ((t 8 <|> t 4) <> (t 2 <|> t 0))
+ <|> ((t 8 <|> t 4) <> (t 2 <|> t 0)))
+ `shouldReturn` ([4,4,8,8,0,0,2,2])
+ -}
+ it "Nest <|>, <>, <|> (2)" $
+ let t = timed
+ in toListSerial (
+ ((t 4 <|> t 8) <> (t 1 <|> t 2))
+ <|> ((t 4 <|> t 8) <> (t 1 <|> t 2)))
+ `shouldReturn` ([4,4,8,8,1,1,2,2])
+ -- FIXME: These two keep failing intermittently on Mac OS X
+ -- Need to examine and fix the tests.
+ {-
+ it "Nest <|>, <=>, <|> (1)" $
+ let t = timed
+ in toListSerial (
+ ((t 8 <|> t 4) <=> (t 2 <|> t 0))
+ <|> ((t 9 <|> t 4) <=> (t 2 <|> t 0)))
+ `shouldReturn` ([4,4,0,0,8,2,9,2])
+ it "Nest <|>, <=>, <|> (2)" $
+ let t = timed
+ in toListSerial (
+ ((t 4 <|> t 8) <=> (t 1 <|> t 2))
+ <|> ((t 4 <|> t 9) <=> (t 1 <|> t 2)))
+ `shouldReturn` ([4,4,1,1,8,2,9,2])
+ -}
+ it "Nest <|>, <|>, <|>" $
+ let t = timed
+ in toListSerial (
+ ((t 4 <|> t 8) <|> (t 0 <|> t 2))
+ <|> ((t 4 <|> t 8) <|> (t 0 <|> t 2)))
+ `shouldReturn` ([0,0,2,2,4,4,8,8])
+
+ ---------------------------------------------------------------------------
+ -- Monoidal composition recursion loops
+ ---------------------------------------------------------------------------
+
+ describe "Serial loops (<>)" $ loops (<>) id reverse
+ describe "Left biased parallel loops (<|)" $ loops (<|) sort sort
+ describe "Fair parallel loops (<|>)" $ loops (<|>) sort sort
+
+ ---------------------------------------------------------------------------
+ -- Bind and monoidal composition combinations
+ ---------------------------------------------------------------------------
+
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ describe "Bind and compose" $ bindAndComposeSimple toListSerial g
+
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ describe "Bind and compose" $ bindAndComposeSimple toListInterleaved g
+
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ describe "Bind and compose" $ bindAndComposeSimple toListAsync g
+
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ describe "Bind and compose" $ bindAndComposeSimple toListParallel g
+
+ let fldr f = foldr f empty
+ fldl f = foldl f empty
+ in do
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose" $
+ bindAndComposeHierarchy toListSerial (k g)
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose" $
+ bindAndComposeHierarchy toListInterleaved (k g)
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose" $
+ bindAndComposeHierarchy toListAsync (k g)
+ forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
+ forM_ [fldr, fldl] $ \k ->
+ describe "Bind and compose" $
+ bindAndComposeHierarchy toListParallel (k g)
+
+ -- Nest two lists using different styles of product compositions
+ it "Nests two streams using monadic serial composition" nestTwoSerial
+ it "Nests two streams using monadic interleaved composition" nestTwoInterleaved
+ it "Nests two streams using monadic async composition" nestTwoAsync
+ it "Nests two streams using monadic parallel composition" nestTwoParallel
+
+ it "Nests two streams using applicative serial composition" nestTwoSerialApp
+ it "Nests two streams using applicative interleaved composition" nestTwoInterleavedApp
+ it "Nests two streams using applicative async composition" nestTwoAsyncApp
+ it "Nests two streams using applicative parallel composition" nestTwoParallelApp
+
+ it "Nests two streams using Num serial composition" nestTwoSerialNum
+ it "Nests two streams using Num interleaved composition" nestTwoInterleavedNum
+ it "Nests two streams using Num async composition" nestTwoAsyncNum
+ it "Nests two streams using Num parallel composition" nestTwoParallelNum
+
+ ---------------------------------------------------------------------------
+ -- TBD Bind and Bind combinations
+ ---------------------------------------------------------------------------
+
+ -- TBD combine all binds and all compose in one example
+ describe "Miscellaneous combined examples" mixedOps
+
+ describe "Transformation" $ transformOps (<>)
+ describe "Serial zipping" $
+ zipOps A.zipWith A.zipWithM zipping
+ describe "Async zipping" $
+ zipOps A.zipAsyncWith A.zipAsyncWithM zippingAsync
+
+nestTwoSerial :: Expectation
+nestTwoSerial =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListSerial (do
+ x <- s1
+ y <- s2
+ return (x + y)
+ ) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
+nestTwoSerialApp :: Expectation
+nestTwoSerialApp =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListSerial ((+) <$> s1 <*> s2)
+ `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
+nestTwoSerialNum :: Expectation
+nestTwoSerialNum =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListSerial (s1 + s2)
+ `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
+nestTwoInterleaved :: Expectation
+nestTwoInterleaved =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListInterleaved (do
+ x <- s1
+ y <- s2
+ return (x + y)
+ ) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
+
+nestTwoInterleavedApp :: Expectation
+nestTwoInterleavedApp =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListInterleaved ((+) <$> s1 <*> s2)
+ `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
+
+nestTwoInterleavedNum :: Expectation
+nestTwoInterleavedNum =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListInterleaved (s1 + s2)
+ `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
+
+nestTwoAsync :: Expectation
+nestTwoAsync =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListAsync (do
+ x <- s1
+ y <- s2
+ return (x + y)
+ ) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
+nestTwoAsyncApp :: Expectation
+nestTwoAsyncApp =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListAsync ((+) <$> s1 <*> s2)
+ `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
+nestTwoAsyncNum :: Expectation
+nestTwoAsyncNum =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListAsync (s1 + s2)
+ `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
+
+nestTwoParallel :: Expectation
+nestTwoParallel =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListParallel (do
+ x <- s1
+ y <- s2
+ return (x + y)
+ ) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
+
+nestTwoParallelApp :: Expectation
+nestTwoParallelApp =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListParallel ((+) <$> s1 <*> s2)
+ `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
+
+nestTwoParallelNum :: Expectation
+nestTwoParallelNum =
+ let s1 = foldMapWith (<>) return [1..4]
+ s2 = foldMapWith (<>) return [5..8]
+ in toListParallel (s1 + s2)
+ `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
+
+zipOps :: (Streaming t, Applicative (t IO))
+ => (forall a b c. (a -> b -> c)
+ -> StreamT IO a -> StreamT IO b -> StreamT IO c)
+ -> (forall a b c. (a -> b -> StreamT IO c)
+ -> StreamT IO a -> StreamT IO b -> StreamT IO c)
+ -> (forall a. t IO a -> t IO a)
+ -> Spec
+zipOps z zM app = do
+ it "zipWith" $
+ let s1 = foldMapWith (<>) return [1..10]
+ s2 = foldMapWith (<>) return [1..]
+ in toListSerial (z (+) s1 s2)
+ `shouldReturn` ([2,4..20] :: [Int])
+
+ it "zipWithM" $
+ let s1 = foldMapWith (<>) return [1..10]
+ s2 = foldMapWith (<>) return [1..]
+ in toListSerial (zM (\a b -> return (a + b)) s1 s2)
+ `shouldReturn` ([2,4..20] :: [Int])
+
+ it "Applicative zip" $
+ let s1 = adapt $ serially $ foldMapWith (<>) return [1..10]
+ s2 = adapt $ serially $ foldMapWith (<>) return [1..]
+ in (A.toList . app) ((+) <$> s1 <*> s2)
+ `shouldReturn` ([2,4..20] :: [Int])
+
+timed :: Int -> StreamT IO Int
+timed x = liftIO (threadDelay (x * 100000)) >> return x
+
+thenBind :: Spec
+thenBind = do
+ it "Simple runStreaming and 'then' with IO" $
+ (runStreaming . serially) (liftIO (putStrLn "hello") >> liftIO (putStrLn "world"))
+ `shouldReturn` ()
+ it "Then and toList" $
+ toListSerial (return (1 :: Int) >> return 2) `shouldReturn` ([2] :: [Int])
+
+type ToListType s = (forall a. s IO a -> IO [a])
+pureBind :: Monad (s IO) => ToListType s -> Spec
+pureBind l = do
+ it "Bind and toList" $
+ l (return 1 `f` \x -> return 2 `f` \y -> return (x + y))
+ `shouldReturn` ([3] :: [Int])
+ where f = (>>=)
+
+bindEmpty :: (Monad (s IO), Alternative (s IO)) => ToListType s -> Spec
+bindEmpty l = it "Binds with empty" $
+ (l (return (1 :: Int) `f` \_ -> empty `f` \_ -> return 2))
+ `shouldReturn` ([] :: [Int])
+ where f = (>>=)
+
+interleaveCheck
+ :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int)
+ -> Spec
+interleaveCheck f =
+ it "Interleave four" $
+ toListSerial ((return 0 <> return 1) `f` (return 100 <> return 101))
+ `shouldReturn` ([0, 100, 1, 101])
+
+parallelCheck :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) -> Spec
+parallelCheck f = do
+ it "Parallel ordering left associated" $
+ toListSerial (((event 4 `f` event 3) `f` event 2) `f` event 1)
+ `shouldReturn` ([1..4])
+
+ it "Parallel ordering right associated" $
+ toListSerial (event 4 `f` (event 3 `f` (event 2 `f` event 1)))
+ `shouldReturn` ([1..4])
+
+ where event n = (liftIO $ threadDelay (n * 100000)) >> (return n)
+
+compose
+ :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int)
+ -> ([Int] -> [Int])
+ -> Spec
+compose f srt = do
+ it "Compose mempty, mempty" $
+ (tl (mempty `f` mempty)) `shouldReturn` []
+ it "Compose empty, empty" $
+ (tl (empty `f` empty)) `shouldReturn` []
+ it "Compose empty at the beginning" $
+ (tl $ (empty `f` return 1)) `shouldReturn` [1]
+ it "Compose empty at the end" $
+ (tl $ (return 1 `f` empty)) `shouldReturn` [1]
+ it "Compose two" $
+ (tl (return 0 `f` return 1) >>= return . srt)
+ `shouldReturn` [0, 1]
+ it "Compose three - empty in the middle" $
+ ((tl $ (return 0 `f` empty `f` return 1)) >>= return . srt)
+ `shouldReturn` [0, 1]
+ it "Compose left associated" $
+ ((tl $ (((return 0 `f` return 1) `f` return 2) `f` return 3))
+ >>= return . srt) `shouldReturn` [0, 1, 2, 3]
+ it "Compose right associated" $
+ ((tl $ (return 0 `f` (return 1 `f` (return 2 `f` return 3))))
+ >>= return . srt) `shouldReturn` [0, 1, 2, 3]
+ it "Compose many" $
+ ((tl $ forEachWith f [1..100] return) >>= return . srt)
+ `shouldReturn` [1..100]
+ it "Compose hierarchical (multiple levels)" $
+ ((tl $ (((return 0 `f` return 1) `f` (return 2 `f` return 3))
+ `f` ((return 4 `f` return 5) `f` (return 6 `f` return 7)))
+ ) >>= return . srt) `shouldReturn` [0..7]
+ where tl = toListSerial
+
+composeAndComposeSimple
+ :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int)
+ -> (StreamT IO Int -> StreamT IO Int -> StreamT IO Int)
+ -> [[Int]]
+ -> Spec
+composeAndComposeSimple f g answer = do
+ it "Compose right associated outer expr, right folded inner" $
+ let fold = foldMapWith g return
+ in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9])))
+ `shouldReturn` (answer !! 0)
+
+ it "Compose left associated outer expr, right folded inner" $
+ let fold = foldMapWith g return
+ in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9]))
+ `shouldReturn` (answer !! 1)
+
+ it "Compose right associated outer expr, left folded inner" $
+ let fold xs = foldl g empty $ map return xs
+ in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9])))
+ `shouldReturn` (answer !! 2)
+
+ it "Compose left associated outer expr, left folded inner" $
+ let fold xs = foldl g empty $ map return xs
+ in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9]))
+ `shouldReturn` (answer !! 3)
+
+
+loops
+ :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int)
+ -> ([Int] -> [Int])
+ -> ([Int] -> [Int])
+ -> Spec
+loops f tsrt hsrt = do
+ it "Tail recursive loop" $ (toListSerial (loopTail 0) >>= return . tsrt)
+ `shouldReturn` [0..3]
+
+ it "Head recursive loop" $ (toListSerial (loopHead 0) >>= return . hsrt)
+ `shouldReturn` [0..3]
+
+ where
+ loopHead x = do
+ -- this print line is important for the test (causes a bind)
+ liftIO $ putStrLn "LoopHead..."
+ (if x < 3 then loopHead (x + 1) else empty) `f` return x
+
+ loopTail x = do
+ -- this print line is important for the test (causes a bind)
+ liftIO $ putStrLn "LoopTail..."
+ return x `f` (if x < 3 then loopTail (x + 1) else empty)
+
+bindAndComposeSimple
+ :: (Streaming t, Alternative (t IO), Monad (t IO))
+ => (forall a. t IO a -> IO [a])
+ -> (t IO Int -> t IO Int -> t IO Int)
+ -> Spec
+bindAndComposeSimple tl g = do
+ it "Compose many (right fold) with bind" $
+ (tl (forEachWith g [1..10 :: Int] $ \x -> return x `f` (return . id))
+ >>= return . sort) `shouldReturn` [1..10]
+
+ it "Compose many (left fold) with bind" $
+ let forL xs k = foldl g empty $ map k xs
+ in (tl (forL [1..10 :: Int] $ \x -> return x `f` (return . id))
+ >>= return . sort) `shouldReturn` [1..10]
+ where f = (>>=)
+
+bindAndComposeHierarchy
+ :: Monad (s IO) => (forall a. s IO a -> IO [a])
+ -> ([s IO Int] -> s IO Int)
+ -> Spec
+bindAndComposeHierarchy tl g = do
+ it "Bind and compose nested" $
+ (tl bindComposeNested >>= return . sort)
+ `shouldReturn` (sort (
+ [12, 18]
+ ++ replicate 3 13
+ ++ replicate 3 17
+ ++ replicate 6 14
+ ++ replicate 6 16
+ ++ replicate 7 15) :: [Int])
+
+ where
+
+ -- bindComposeNested :: AsyncT IO Int
+ bindComposeNested =
+ let c1 = tripleCompose (return 1) (return 2) (return 3)
+ c2 = tripleCompose (return 4) (return 5) (return 6)
+ c3 = tripleCompose (return 7) (return 8) (return 9)
+ b = tripleBind c1 c2 c3
+-- it seems to be causing a huge space leak in hspec so disabling this for now
+-- c = tripleCompose b b b
+-- m = tripleBind c c c
+-- in m
+ in b
+
+ tripleCompose a b c = g [a, b, c]
+ tripleBind mx my mz =
+ mx `f` \x -> my
+ `f` \y -> mz
+ `f` \z -> return (x + y + z)
+ f = (>>=)
+
+mixedOps :: Spec
+mixedOps = do
+ it "Compose many ops" $
+ (toListSerial composeMixed >>= return . sort)
+ `shouldReturn` ([8,9,9,9,9,9,10,10,10,10,10,10,10,10,10,10,11,11
+ ,11,11,11,11,11,11,11,11,12,12,12,12,12,13
+ ] :: [Int])
+ where
+
+ composeMixed :: StreamT IO Int
+ composeMixed = do
+ liftIO $ return ()
+ liftIO $ putStr ""
+ x <- return 1
+ y <- return 2
+ z <- do
+ x1 <- return 1 <|> return 2
+ liftIO $ return ()
+ liftIO $ putStr ""
+ y1 <- return 1 <| return 2
+ z1 <- do
+ x11 <- return 1 <> return 2
+ y11 <- return 1 <| return 2
+ z11 <- return 1 <=> return 2
+ liftIO $ return ()
+ liftIO $ putStr ""
+ return (x11 + y11 + z11)
+ return (x1 + y1 + z1)
+ return (x + y + z)
+
+transformOps :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) -> Spec
+transformOps f = do
+ it "take all" $
+ (toListSerial $ A.take 10 $ foldMapWith f return [1..10])
+ `shouldReturn` [1..10]
+ it "take none" $
+ (toListSerial $ A.take 0 $ foldMapWith f return [1..10])
+ `shouldReturn` []
+ it "take 5" $
+ (toListSerial $ A.take 5 $ foldMapWith f return [1..10])
+ `shouldReturn` [1..5]
+
+ it "drop all" $
+ (toListSerial $ A.drop 10 $ foldMapWith f return [1..10])
+ `shouldReturn` []
+ it "drop none" $
+ (toListSerial $ A.drop 0 $ foldMapWith f return [1..10])
+ `shouldReturn` [1..10]
+ it "drop 5" $
+ (toListSerial $ A.drop 5 $ foldMapWith f return [1..10])
+ `shouldReturn` [6..10]