**diff options**

author | harendra <> | 2017-12-05 15:16:00 (GMT) |
---|---|---|

committer | hdiff <hdiff@hdiff.luite.com> | 2017-12-05 15:16:00 (GMT) |

commit | 9c09b5a4974f1be31231c759ba70e00ae4d8446a (patch) | |

tree | df62f8d9817d9cf5f67d66d6a4abae48e4fde94c |

version 0.1.00.1.0

-rw-r--r-- | Changelog.md | 3 | ||||

-rw-r--r-- | LICENSE | 27 | ||||

-rw-r--r-- | README.md | 172 | ||||

-rw-r--r-- | benchmark/Main.hs | 301 | ||||

-rw-r--r-- | examples/loops.hs | 88 | ||||

-rw-r--r-- | examples/nested-loops.hs | 22 | ||||

-rw-r--r-- | examples/parallel-loops.hs | 20 | ||||

-rw-r--r-- | src/Streamly.hs | 249 | ||||

-rw-r--r-- | src/Streamly/Core.hs | 651 | ||||

-rw-r--r-- | src/Streamly/Examples.hs | 60 | ||||

-rw-r--r-- | src/Streamly/Examples/AcidRainGame.hs | 46 | ||||

-rw-r--r-- | src/Streamly/Examples/CirclingSquare.hs | 90 | ||||

-rw-r--r-- | src/Streamly/Examples/ListDirRecursive.hs | 19 | ||||

-rw-r--r-- | src/Streamly/Examples/MergeSortedStreams.hs | 41 | ||||

-rw-r--r-- | src/Streamly/Examples/SearchEngineQuery.hs | 19 | ||||

-rw-r--r-- | src/Streamly/Prelude.hs | 430 | ||||

-rw-r--r-- | src/Streamly/Streams.hs | 985 | ||||

-rw-r--r-- | src/Streamly/Time.hs | 65 | ||||

-rw-r--r-- | src/Streamly/Tutorial.hs | 1042 | ||||

-rw-r--r-- | stack-7.10.yaml | 16 | ||||

-rw-r--r-- | stack-8.0.yaml | 17 | ||||

-rw-r--r-- | stack.yaml | 14 | ||||

-rw-r--r-- | streamly.cabal | 234 | ||||

-rw-r--r-- | test/Main.hs | 618 |

24 files changed, 5229 insertions, 0 deletions

diff --git a/Changelog.md b/Changelog.md new file mode 100644 index 0000000..1318780 --- /dev/null +++ b/Changelog.md @@ -0,0 +1,3 @@ +## 0.1.0 + +* Initial release @@ -0,0 +1,27 @@ +Copyright (c) 2017, Harendra Kumar +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..1d8d0ff --- /dev/null +++ b/README.md @@ -0,0 +1,172 @@ +# Streamly + +[![Gitter chat](https://badges.gitter.im/composewell/gitter.svg)](https://gitter.im/composewell/streamly) +[![Build Status](https://travis-ci.org/composewell/streamly.svg?branch=master)](https://travis-ci.org/composewell/streamly) +[![Windows Build status](https://ci.appveyor.com/api/projects/status/ajxg0c79raou9ned?svg=true)](https://ci.appveyor.com/project/harendra-kumar/streamly) +[![Coverage Status](https://coveralls.io/repos/composewell/streamly/badge.svg?branch=master&service=github)](https://coveralls.io/github/composewell/streamly?branch=master) + +## Stream`ing` `Concurrent`ly + +Streamly is a monad transformer unifying non-determinism +([list-t](https://hackage.haskell.org/package/list-t)/[logict](https://hackage.haskell.org/package/logict)), +concurrency ([async](https://hackage.haskell.org/package/async)), +streaming ([conduit](https://hackage.haskell.org/package/conduit)\/[pipes](https://hackage.haskell.org/package/pipes)), +and FRP ([Yampa](https://hackage.haskell.org/package/Yampa)\/[reflex](https://hackage.haskell.org/package/reflex)) +functionality in a concise and intuitive API. +High level concurrency makes concurrent applications almost indistinguishable +from non-concurrent ones. By changing a single combinator you can control +whether the code runs serially or concurrently. It naturally integrates +concurrency with streaming rather than adding it as an afterthought. +Moreover, it interworks with the popular streaming libraries. + +See the haddock documentation for full reference. It is recommended to read +the comprehensive tutorial module `Streamly.Tutorial` first. Also see +`Streamly.Examples` for some working examples. + +## Non-determinism + +The monad instance composes like a list monad. + +``` haskell +loops = $ do + x <- each [1,2] + y <- each [3,4] + liftIO $ putStrLn $ show (x, y) + +main = runStreaming $ serially $ loops +``` +``` +(1,3) +(1,4) +(2,3) +(2,4) +``` + +## Magical Concurrency + +To run the above code with demand-driven concurrency i.e. each iteration in the +loops can run concurrently depending on the consumer rate: + +``` haskell +main = runStreaming $ asyncly $ loops +``` + +To run it with full parallelism irrespective of demand: + +``` haskell +main = runStreaming $ parallely $ loops +``` + +To run it serially but interleaving the outer and inner loop iterations: + +``` haskell +main = runStreaming $ interleaving $ loops +``` + +You can fold multiple streams or IO actions using parallel combinators like +`<|`, `<|>`. For example, to concurrently generate the squares and then +concurrently sum the square roots of all combinations: + +``` haskell +main = do + print $ sum $ asyncly $ do + -- Squaring is concurrent (<|) + x2 <- forEachWith (<|) [1..100] $ \x -> return $ x * x + y2 <- forEachWith (<|) [1..100] $ \y -> return $ y * y + -- sqrt is concurrent (asyncly) + return $ sqrt (x2 + y2) +``` + +Of course, the actions running in parallel could be arbitrary IO actions. To +concurrently list the contents of a directory tree recursively: + +``` haskell +import Path.IO (listDir, getCurrentDir) +import Streamly + +main = runStreaming $ serially $ getCurrentDir >>= readdir + where readdir d = do + (dirs, files) <- lift $ listDir d + liftIO $ mapM_ putStrLn $ map show files + -- read the subdirs concurrently + foldMapWith (<|>) readdir dirs +``` + +In the above examples we do not think in terms of threads, locking or +synchronization, rather we think in terms of what can run in parallel, the rest +is taken care of automatically. With `asyncly` and `<|` the programmer does not +have to worry about how many threads are to be created they are automatically +adjusted based on the demand of the consumer. + +The concurrency facilities provided by streamly can be compared with +[OpenMP](https://en.wikipedia.org/wiki/OpenMP) and +[Cilk](https://en.wikipedia.org/wiki/Cilk) but with a more declarative +expression. Concurrency support does not compromise performance in +non-concurrent cases, the performance of the library is at par or better than +most of the existing streaming libraries. + +## Streaming + +Streaming is effortless, simple and straightforward. Streamly data type behaves +just like a list and combinators are provided in `Streamly.Prelude` to +transform or fold streamly streams. Unlike other libraries and like `streaming` +library the combinators explicitly consume a stream and produce a stream, +therefore, no special operator is needed to join stream stages, just a forward +(`$`) or reverse (`&`) function application operator is enough. + +```haskell +import Streamly +import Streamly.Prelude as S +import Data.Function ((&)) + +main = S.each [1..10] + & fmap (+ 1) + & S.drop 2 + & S.filter even + & fmap (* 3) + & S.takeWhile (< 25) + & S.mapM (\x -> putStrLn ("saw " ++ show x) >> return x) + & S.toList . serially + >>= print +``` + +Fold style combinators can be used to fold purely or monadically. You can also +use the beautiful `foldl` library for folding. + +```haskell +main = S.each [1..10] + & serially + & S.foldl (+) 0 id + >>= print +``` + +Streams can be combined together in multiple ways: + +```haskell +return 1 <> return 2 -- serial, combine atoms +S.each [1..10] <> S.each [11..20] -- serial +S.each [1..10] <| S.each [11..20] -- demand driven parallel +S.each [1..10] <=> S.each [11..20] -- serial but interleaved +S.each [1..10] <|> S.each [11..20] -- fully parallel +``` + +As we have already seen streams can be combined using monadic composition in a +non-deterministic manner. This allows arbitrary manipulation and combining of +streams. See `Streamly.Examples.MergeSortedStreams` for a more complicated +example. + +## Reactive Programming (FRP) + +Streamly is a foundation for first class reactive programming as well by virtue +of integrating concurrency and streaming. See `Streamly.Examples.AcidRainGame` +and `Streamly.Examples.CirclingSquare` for an SDL based animation example. + +## Contributing + +The code is available under BSD-3 license [on +github](https://github.com/composewell/streamly). Join the [gitter +chat](https://gitter.im/composewell/streamly) channel for discussions. All +contributions are welcome! + +This library was originally inspired by the `transient` package authored by +Alberto G. Corona. diff --git a/benchmark/Main.hs b/benchmark/Main.hs new file mode 100644 index 0000000..0e64889 --- /dev/null +++ b/benchmark/Main.hs @@ -0,0 +1,301 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} + +module Main where + +import Control.Applicative (Alternative(..)) +import Control.Exception (assert) +import Control.Monad (guard) +import Criterion.Main (defaultMain, bgroup, bench, nfIO) +import Data.Function ((&)) + +import qualified Streamly as A +import qualified Streamly.Prelude as A + +#ifdef EXTRA_BENCHMARKS +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Data.Atomics (atomicModifyIORefCAS) +import Data.IORef (IORef, newIORef, writeIORef) +import System.IO.Unsafe (unsafePerformIO) + +import qualified Conduit.Simple as S +import qualified Control.Monad.Logic as LG +import qualified Data.Machine as M +#if MIN_VERSION_transient(0,5,1) +import qualified Transient.Internals as T +import qualified Transient.Indeterminism as T +#endif +import qualified ListT as LT +#endif + +main :: IO () +main = do + -- XXX due to a GHC bug passing bind as an argument causes perf + -- degradation, so we should keep that in account when comparing. + let as = streamly_serial + ai = streamly_interleaved + aa = streamly_async + ap = streamly_parallel + defaultMain [ + bgroup "streamly" + [ bench "function style all serial" $ nfIO streamly_function_style + + , bgroup "serial bind" + [ bench "serial" $ nfIO (as (A.<>)) + , bench "fair serial" $ nfIO (as (A.<=>)) + , bench "left parallel" $ nfIO (as (A.<|)) + , bench "fair parallel" $ nfIO (as (A.<|>)) + ] + + , bgroup "interleaved bind" + [ bench "serial" $ nfIO (ai (A.<>)) + , bench "fair serial" $ nfIO (ai (A.<=>)) + , bench "left parallel" $ nfIO (ai (A.<|)) + , bench "fair parallel" $ nfIO (ai (A.<|>)) + ] + + , bgroup "async bind" + [ bench "serial" $ nfIO (aa (A.<>)) + , bench "fair serial" $ nfIO (aa (A.<=>)) + , bench "left parallel" $ nfIO (aa (A.<|)) + , bench "fair parallel" $ nfIO (aa (A.<|>)) + ] + + , bgroup "parallel bind" + [ bench "serial" $ nfIO (ap (A.<>)) + , bench "fair serial" $ nfIO (ap (A.<=>)) + , bench "left parallel" $ nfIO (ap (A.<|)) + , bench "fair parallel" $ nfIO (ap (A.<|>)) + ] + + -- Benchmark smallest possible actions composed together + , bgroup "serial bind nil" + [ bench "serial" $ nfIO (streamly_nil (A.<>)) + , bench "fair serial" $ nfIO (streamly_nil (A.<=>)) + , bench "left parallel" $ nfIO (streamly_nil (A.<|)) + , bench "fair parallel" $ nfIO (streamly_nil (A.<|>)) + ] + ] +#ifdef EXTRA_BENCHMARKS +#if MIN_VERSION_transient(0,5,1) + , bgroup "others" + [ bench "transient" $ nfIO transient_basic + , bench "transient-nil" $ nfIO transient_nil +#endif + , bench "logict" $ nfIO logict_basic + , bench "list-t" $ nfIO list_t_basic + , bench "simple-conduit" $ nfIO simple_conduit_basic + , bench "simple-conduit-bind" $ nfIO simple_conduit_bind + , bench "machines" $ nfIO machines_basic + ] +#endif + ] + +{-# INLINABLE map #-} +map :: Monad m => (a -> Int) -> a -> m Int +map f x = return $ f x + +{-# INLINABLE filter #-} +filter :: (Monad m, Alternative m) => (a -> Bool) -> a -> m a +filter cond x = guard (not $ cond x) >> return x + +amap :: Monad (s IO) => (Int -> Int) -> Int -> s IO Int +amap = Main.map + +afilter :: (Alternative (s IO), Monad (s IO)) => (Int -> Bool) -> Int -> s IO Int +afilter = Main.filter + +{-# INLINE streamly_basic #-} +streamly_basic + :: (Alternative (t IO), Monad (t IO), A.Streaming t) + => (forall a. t IO a -> IO [a]) + -> (t IO Int -> t IO Int -> t IO Int) + -> IO Int +streamly_basic tl g = do + xs <- tl $ do + A.drop 100 (A.forEachWith g [1..100000 :: Int] $ \x -> + afilter even x >>= amap (+1)) + >>= amap (+1) + >>= afilter (\y -> y `mod` 2 == 0) + assert (Prelude.length xs == 49900) $ + return (Prelude.length xs) + +{-# INLINE streamly_function_style #-} +streamly_function_style :: IO Int +streamly_function_style = do + xs <- A.toList $ A.serially $ + A.each [1..100000 :: Int] + & A.filter even + & fmap (+1) + & A.drop 100 + & fmap (+1) + & A.filter (\y -> y `mod` 2 == 0) + assert (Prelude.length xs == 49900) $ + return (Prelude.length xs) + +{-# INLINE streamly_serial #-} +streamly_serial + :: (A.StreamT IO Int -> A.StreamT IO Int -> A.StreamT IO Int) + -> IO Int +streamly_serial = streamly_basic (A.toList . A.serially) + +{-# INLINE streamly_interleaved #-} +streamly_interleaved + :: (A.InterleavedT IO Int -> A.InterleavedT IO Int -> A.InterleavedT IO Int) + -> IO Int +streamly_interleaved = streamly_basic (A.toList . A.interleaving) + +{-# INLINE streamly_async #-} +streamly_async + :: (A.AsyncT IO Int -> A.AsyncT IO Int -> A.AsyncT IO Int) + -> IO Int +streamly_async = streamly_basic (A.toList . A.asyncly) + +{-# INLINE streamly_parallel #-} +streamly_parallel + :: (A.ParallelT IO Int -> A.ParallelT IO Int -> A.ParallelT IO Int) + -> IO Int +streamly_parallel = streamly_basic (A.toList . A.parallely) + +{-# INLINE streamly_nil #-} +streamly_nil :: (A.StreamT IO Int -> A.StreamT IO Int -> A.StreamT IO Int) + -> IO Int +streamly_nil f = do + xs <- (A.toList . A.serially) $ do + (A.forEachWith f [1..100000:: Int] $ + \x -> return x >>= return . id) + assert (Prelude.length xs == 100000) $ + return (Prelude.length xs) + +#ifdef EXTRA_BENCHMARKS +#if MIN_VERSION_transient(0,5,1) + +{-# NOINLINE count #-} +count :: IORef Int +count = unsafePerformIO $ newIORef 0 + +drop :: (MonadIO m, Alternative m) => Int -> Int -> m Int +drop num x = do + + mn <- liftIO $ atomicModifyIORefCAS count $ \n -> + if n < num then (n + 1, False) else (n, True) + guard mn + return x + +tmap :: (a -> Int) -> a -> T.TransIO Int +tmap = Main.map + +tfilter :: (a -> Bool) -> a -> T.TransIO a +tfilter = Main.filter + +tdrop :: Int -> Int -> T.TransIO Int +tdrop = Main.drop + +transient_basic :: IO (Maybe Int) + +transient_basic = T.keep' $ T.threads 0 $ do + liftIO $ writeIORef count 0 + xs <- T.group 49900 $ do + T.choose [1..100000 :: Int] + >>= tfilter even + >>= tmap (+1) + >>= tdrop 100 + >>= tmap (+1) + >>= tfilter (\x -> x `mod` 2 == 0) + + assert (Prelude.length xs == 49900) $ + T.exit (Prelude.length xs) + +transient_nil :: IO (Maybe Int) +transient_nil = T.keep' $ T.threads 0 $ do + xs <- T.group 49900 $ do + T.choose [1..100000 :: Int] + assert (Prelude.length xs == 49900) $ + T.exit (Prelude.length xs) +#endif + +lfilter :: (Int -> Bool) -> Int -> LT.ListT IO Int +lfilter = Main.filter + +lmap :: (Int -> Int) -> Int -> LT.ListT IO Int +lmap = Main.map + +ldrop :: Int -> Int -> LT.ListT IO Int +ldrop = Main.drop + +list_t_basic :: IO Int +list_t_basic = do + writeIORef count 0 + xs <- LT.toList $ do + LT.fromFoldable [1..100000 :: Int] + >>= lfilter even + >>= lmap (+1) + >>= ldrop 100 + >>= lmap (+1) + >>= lfilter (\x -> x `mod` 2 == 0) + assert (Prelude.length xs == 49900) $ + return (Prelude.length xs) + +lgfilter :: (Int -> Bool) -> Int -> LG.LogicT IO Int +lgfilter = Main.filter + +lgmap :: (Int -> Int) -> Int -> LG.LogicT IO Int +lgmap = Main.map + +lgdrop :: Int -> Int -> LG.LogicT IO Int +lgdrop = Main.drop + +logict_basic :: IO Int +logict_basic = do + writeIORef count 0 + --xs <- LG.observeManyT 2900 $ do + xs <- LG.observeAllT $ do + LG.msum $ Prelude.map return [1..100000] + >>= lgfilter even + >>= lgmap (+1) + >>= lgdrop 100 + >>= lgmap (+1) + >>= lgfilter (\x -> x `mod` 2 == 0) + assert (Prelude.length xs == 49900) $ + return (Prelude.length xs) + +simple_conduit_basic :: IO Int +simple_conduit_basic = do + xs <- S.sourceList [1..100000] + S.$= S.filterC even + S.$= S.mapC ((+1) :: Int -> Int) + S.$= S.dropC 100 + S.$= S.mapC ((+1) :: Int -> Int) + S.$= S.filterC (\x -> x `mod` 2 == 0) + S.$$ S.sinkList + assert (Prelude.length xs == 49900) $ + return (Prelude.length (xs :: [Int])) + +smap :: Monad (s IO) => (Int -> Int) -> Int -> s IO Int +smap = Main.map + +sfilter :: (Alternative (s IO), Monad (s IO)) => (Int -> Bool) -> Int -> s IO Int +sfilter = Main.filter + +{-# INLINE simple_conduit_bind #-} +simple_conduit_bind :: IO Int +simple_conduit_bind = do + xs <- S.sinkList $ do + S.dropC 100 (S.sourceList [1..100000 :: Int] >>= \x -> + sfilter even x >>= smap (+1)) + >>= smap (+1) + >>= sfilter (\y -> y `mod` 2 == 0) + assert (Prelude.length xs == 49900) $ + return (Prelude.length xs) + +machines_basic :: IO Int +machines_basic = do + xs <- M.runT $ M.source [1..100000] + M.~> M.filtered even + M.~> M.mapping (+1) + M.~> M.dropping 100 + M.~> M.mapping (+1) + M.~> M.filtered (\x -> x `mod` 2 == 0) + assert (Prelude.length xs == 49900) $ + return (Prelude.length (xs ::[Int])) +#endif diff --git a/examples/loops.hs b/examples/loops.hs new file mode 100644 index 0000000..d0f149b --- /dev/null +++ b/examples/loops.hs @@ -0,0 +1,88 @@ +import Streamly +import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) + +main = do + liftIO $ hSetBuffering stdout LineBuffering + + putStrLn $ "\nloopTail:\n" + runStreamT $ do + x <- loopTail 0 + liftIO $ print (x :: Int) + + putStrLn $ "\nloopHead:\n" + runStreamT $ do + x <- loopHead 0 + liftIO $ print (x :: Int) + + putStrLn $ "\nloopTailA:\n" + runStreamT $ do + x <- loopTailA 0 + liftIO $ print (x :: Int) + + putStrLn $ "\nloopHeadA:\n" + runStreamT $ do + x <- loopHeadA 0 + liftIO $ print (x :: Int) + + putStrLn $ "\ninterleave:\n" + runStreamT $ do + x <- return 0 <> return 1 <=> return 100 <> return 101 + liftIO $ print (x :: Int) + + putStrLn $ "\nParallel interleave:\n" + runStreamT $ do + x <- return 0 <> return 1 <|> return 100 <> return 101 + liftIO $ print (x :: Int) + + where + +------------------------------------------------------------------------------- +-- Serial (single-threaded) stream generator loops +------------------------------------------------------------------------------- + + -- In a <> composition the action on the left is executed and only after it + -- finished then the action on the right is executed. In other words the + -- actions are run serially. + + -- Generates a value and then loops. Can be used to generate an infinite + -- stream. Interleaves the generator and the consumer. + loopTail :: Int -> StreamT IO Int + loopTail x = do + liftIO $ putStrLn "LoopTail..." + return x <> (if x < 3 then loopTail (x + 1) else empty) + + -- Loops and then generates a value. The consumer can run only after the + -- loop has finished. An infinite generator will not let the consumer run + -- at all. + loopHead :: Int -> StreamT IO Int + loopHead x = do + liftIO $ putStrLn "LoopHead..." + (if x < 3 then loopHead (x + 1) else empty) <> return x + +------------------------------------------------------------------------------- +-- Concurrent (multi-threaded) adaptive demand-based stream generator loops +------------------------------------------------------------------------------- + + -- In a <| composition the action on the left is executed first. However, + -- if it is not fast enough to generate results at the consumer's speed + -- then the action on the right is also spawned concurrently. In other + -- words, both actions may run concurrently based on the need. + + loopTailA :: Int -> StreamT IO Int + loopTailA x = do + liftIO $ putStrLn "LoopTailA..." + return x <| (if x < 3 then loopTailA (x + 1) else empty) + + loopHeadA :: Int -> StreamT IO Int + loopHeadA x = do + liftIO $ putStrLn "LoopHeadA..." + (if x < 3 then loopHeadA (x + 1) else empty) <| return x + +------------------------------------------------------------------------------- +-- Parallel (fairly scheduled, multi-threaded) stream generator loops +------------------------------------------------------------------------------- + + -- In a <|> composition both actions are run concurrently in a fair + -- manner, no one action is preferred over another. Both actions are + -- spawned right away in their own independent threads. In other words, the + -- actions will run concurrently. diff --git a/examples/nested-loops.hs b/examples/nested-loops.hs new file mode 100644 index 0000000..ea886c5 --- /dev/null +++ b/examples/nested-loops.hs @@ -0,0 +1,22 @@ +import Control.Applicative ((<|>), empty) +import Control.Concurrent (myThreadId) +import Control.Monad.IO.Class (liftIO) +import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) +import System.Random (randomIO) +import Streamly + +main = runStreamT $ do + liftIO $ hSetBuffering stdout LineBuffering + x <- loop "A " 2 + y <- loop "B " 2 + liftIO $ myThreadId >>= putStr . show + >> putStr " " + >> print (x, y) + + where + + loop name n = do + rnd <- liftIO (randomIO :: IO Int) + let result = (name ++ show rnd) + repeat = if n > 1 then loop name (n - 1) else empty + in (return result) <|> repeat diff --git a/examples/parallel-loops.hs b/examples/parallel-loops.hs new file mode 100644 index 0000000..61e1794 --- /dev/null +++ b/examples/parallel-loops.hs @@ -0,0 +1,20 @@ +import Control.Applicative ((<|>)) +import Control.Concurrent (myThreadId, threadDelay) +import Control.Monad.IO.Class (liftIO) +import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) +import System.Random (randomIO) +import Streamly + +main = runStreamT $ do + liftIO $ hSetBuffering stdout LineBuffering + x <- loop "A" <|> loop "B" + liftIO $ myThreadId >>= putStr . show + >> putStr " " + >> print x + + where + + loop name = do + liftIO $ threadDelay 1000000 + rnd <- liftIO (randomIO :: IO Int) + return (name, rnd) <|> loop name diff --git a/src/Streamly.hs b/src/Streamly.hs new file mode 100644 index 0000000..68efd82 --- /dev/null +++ b/src/Streamly.hs @@ -0,0 +1,249 @@ +-- | +-- Module : Streamly +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC + +module Streamly + ( + -- * Background + -- $background + + -- * Overview + -- $overview + + MonadAsync + , Streaming + + -- * Product Style Composition + -- $product + , StreamT + , InterleavedT + , AsyncT + , ParallelT + + -- * Zip Style Composition + -- $zipping + , ZipStream + , ZipAsync + + -- * Sum Style Composition + -- $sum + , (<=>) + , (<|) + + -- * Transformation + , async + + -- * Stream Type Adapters + -- $adapters + , serially + , interleaving + , asyncly + , parallely + , zipping + , zippingAsync + , adapt + + -- * Running Streams + , runStreaming + , runStreamT + , runInterleavedT + , runAsyncT + , runParallelT + , runZipStream + , runZipAsync + + -- * Fold Utilities + -- $foldutils + , foldWith + , foldMapWith + , forEachWith + + -- * Re-exports + , Monoid (..) + , Semigroup (..) + , Alternative (..) + , MonadPlus (..) + , MonadIO (..) + , MonadTrans (..) + ) +where + +import Streamly.Streams +import Data.Semigroup (Semigroup(..)) +import Control.Applicative (Alternative(..)) +import Control.Monad (MonadPlus(..)) +import Control.Monad.IO.Class (MonadIO (..)) +import Control.Monad.Trans.Class (MonadTrans (..)) + +-- $background +-- +-- Streamly provides a monad transformer that extends the product style +-- composition of monads to streams of many elements of the same type; it is a +-- functional programming equivalent of nested loops from imperative +-- programming. Composing each element in one stream with each element in the +-- other stream generalizes the monadic product of single elements. You can +-- think of the IO monad as a special case of the more general @StreamT IO@ +-- monad; with single element streams. List transformers and logic programming +-- monads also provide a similar product style composition of streams, however +-- streamly generalizes it with the time dimension; allowing streams to be +-- composed in an asynchronous and concurrent fashion in many different ways. +-- It also provides multiple alternative ways of composing streams e.g. +-- serial, interleaved or concurrent. +-- +-- The seemingly simple addition of asynchronicity and concurrency to product +-- style streaming composition unifies a number of disparate abstractions into +-- one powerful and elegant abstraction. A wide variety of programming +-- problems can be solved elegantly with this abstraction. In particular, it +-- unifies three major programming domains namely non-deterministic (logic) +-- programming, concurrent programming and functional reactive programming. In +-- other words, you can do everything with this one abstraction that you could +-- with list transformers (e.g. +-- <https://hackage.haskell.org/package/list-t list-t>), logic programming +-- monads (e.g. <https://hackage.haskell.org/package/logict logict>), +-- streaming libraries (a lot of what +-- <https://hackage.haskell.org/package/conduit conduit> or +-- <https://hackage.haskell.org/package/pipes pipes> can do), concurrency +-- libraries (e.g. <https://hackage.haskell.org/package/async async>) and FRP +-- libraries (e.g. <https://hackage.haskell.org/package/Yampa Yampa> or +-- <https://hackage.haskell.org/package/reflex reflex>). + +-- $overview +-- +-- Streamly provides six distinct stream types i.e. 'StreamT', 'InterleavedT', +-- 'AsyncT' and 'ParallelT', 'ZipStream' and 'ZipAsync', each representing a +-- stream of elements. All these types have the same underlying representation +-- and can be adapted from one to another using type adaptor combinators +-- described later. Each of these types belongs to the 'Streaming' type class +-- which helps converting the specific type to and from the underlying generic +-- stream type. +-- +-- The types 'StreamT', 'InterleavedT', 'AsyncT' and 'ParallelT' are 'Monad' +-- transformers with the monadic bind operation combining streams in a product +-- style in much the same way as a list monad or a list transformer i.e. each +-- element from one stream is combined with every element of the other stream. +-- However, the applicative and monadic composition of these types differ in +-- terms of the ordering and time sequence in which the elements from two +-- streams are combined. 'StreamT' and 'InterleavedT' compose streams serially +-- whereas 'AsyncT' and 'ParallelT' are their concurrent counterparts. See the +-- documentation of the respective types for more details. +-- +-- The types 'ZipStream' and 'ZipAsync' provide 'Applicative' instances to zip +-- two streams together i.e. each element in one stream is combined with the +-- corresponding element in the other stream. 'ZipStream' generates the streams +-- being zipped serially whereas 'ZipAsync' produces both the elements being +-- zipped concurrently. +-- +-- Two streams of the same type can be combined using a sum style composition +-- to generate a stream of the same type where the output stream would contain +-- all elements of both the streams. However, the sequence in which the +-- elements in the resulting stream are produced depends on the combining +-- operator. Four distinct sum style operators, '<>', '<=>', '<|' and '<|>' +-- combine two streams in different ways, each corresponding to the one of the +-- four ways of combining monadically. See the respective section below for +-- more details. +-- +-- Concurrent composition types 'AsyncT', 'ParallelT', 'ZipAsync' and +-- concurrent composition operators '<|' and '<|>' require the underlying monad +-- of the streaming monad transformer to be 'MonadAsync'. +-- +-- For more details please see the "Streamly.Tutorial" and "Streamly.Examples" +-- (the latter is available only when built with the 'examples' build flag). + +-- A simple inline example here illustrating applicative, monad and alternative +-- compositions. + +-- $product +-- +-- Streams that compose serially or non-concurrently come in two flavors i.e. +-- 'StreamT' and 'InterleavedT'. Both of these serial flavors have +-- corresponding concurrent equivalents, those are 'AsyncT' and 'ParallelT' +-- respectively. + +-- $zipping +-- +-- 'ZipStream' and 'ZipAsync', provide 'Applicative' instances for zipping the +-- corresponding elements of two streams together. Note that these types are +-- not monads. + +-- $sum +-- +-- Just like product style composition there are four distinct ways to combine +-- streams in sum style each directly corresponding to one of the product style +-- composition. +-- +-- The standard semigroup append '<>' operator appends two streams serially, +-- this style corresponds to the 'StreamT' style of monadic composition. +-- +-- @ +-- main = ('toList' . 'serially' $ (return 1 <> return 2) <> (return 3 <> return 4)) >>= print +-- @ +-- @ +-- [1,2,3,4] +-- @ +-- +-- The standard 'Alternative' operator '<|>' fairly interleaves two streams in +-- parallel, this operator corresponds to the 'ParallelT' style. +-- +-- @ +-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<|\> (return 3 <> return 4)) >>= print +-- @ +-- @ +-- [1,3,2,4] +-- @ +-- +-- Unlike '<|', this operator cannot be used to fold infinite containers since +-- that might accumulate too many partially drained streams. To be clear, it +-- can combine infinite streams but not infinite number of streams. +-- +-- Two additional sum style composition operators that streamly introduces are +-- described below. + +-- $adapters +-- +-- Code using streamly is usually written such that it is agnostic of any +-- specific streaming type. We use a type variable (polymorphic type) with the +-- 'Streaming' class constraint. Finally, when running the monad we can specify +-- the actual type that we want to use to interpret the code. However, in +-- certain cases we may want to use a specific type to force a certain type of +-- composition. These combinators can be used to convert the stream types from +-- one to another at no cost as all the types have the same underlying +-- representation. +-- +-- If you see an @ambiguous type variable@ error then most likely it is because +-- you have not specified the stream type. You either need a type annotation or +-- one of the following combinators to specify what type of stream you mean. +-- +-- This code: +-- +-- @ +-- main = ('toList' $ (return 1 <> return 2)) >>= print +-- @ +-- +-- will result in a type error like this: +-- +-- @ +-- Ambiguous type variable â€˜t0â€™ arising from a use of ... +-- @ +-- +-- To fix the error just tell 'toList' what kind of stream are we feeding it: +-- +-- @ +-- main = ('toList' $ 'serially' $ (return 1 <> return 2)) >>= print +-- @ +-- @ +-- main = ('toList' $ (return 1 <> return 2 :: StreamT IO Int)) >>= print +-- @ +-- +-- Note that using the combinators is easier as you do not have to think about +-- the specific types, they are just inferred. +-- + +-- $foldutils +-- +-- These are some convenience functions to fold any 'Foldable' container using +-- one of the sum composition operators to convert it into a streamly stream. diff --git a/src/Streamly/Core.hs b/src/Streamly/Core.hs new file mode 100644 index 0000000..5663ec5 --- /dev/null +++ b/src/Streamly/Core.hs @@ -0,0 +1,651 @@ +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE UndecidableInstances #-} -- XXX + +-- | +-- Module : Streamly.Core +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- +module Streamly.Core + ( + MonadAsync + + -- * Streams + , Stream (..) + + -- * Construction + , scons + , snil + + -- * Composition + , interleave + + -- * Concurrent Stream Vars (SVars) + , SVar + , SVarSched (..) + , SVarTag (..) + , SVarStyle (..) + , newEmptySVar + , newStreamVar1 + , newStreamVar2 + , joinStreamVar2 + , fromStreamVar + , toStreamVar + + -- * Concurrent Streams + , parAlt + , parLeft + ) +where + +import Control.Applicative (Alternative (..)) +import Control.Concurrent (ThreadId, forkIO, + myThreadId, threadDelay) +import Control.Concurrent.MVar (MVar, newEmptyMVar, tryTakeMVar, + tryPutMVar, takeMVar) +import Control.Exception (SomeException (..)) +import qualified Control.Exception.Lifted as EL +import Control.Monad (MonadPlus(..), mzero, when) +import Control.Monad.Base (MonadBase (..), liftBaseDefault) +import Control.Monad.Catch (MonadThrow, throwM) +import Control.Monad.Error.Class (MonadError(..)) +import Control.Monad.IO.Class (MonadIO(..)) +import Control.Monad.Reader.Class (MonadReader(..)) +import Control.Monad.State.Class (MonadState(..)) +import Control.Monad.Trans.Class (MonadTrans (lift)) +import Control.Monad.Trans.Control (MonadBaseControl, liftBaseWith) +import Data.Atomics (atomicModifyIORefCAS, + atomicModifyIORefCAS_) +import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, pushL, + tryPopR, nullQ) +import Data.Functor (void) +import Data.IORef (IORef, modifyIORef, newIORef, + readIORef) +import Data.Maybe (isNothing) +import Data.Semigroup (Semigroup(..)) +import Data.Set (Set) +import qualified Data.Set as S + +------------------------------------------------------------------------------ +-- Parent child thread communication type +------------------------------------------------------------------------------ + +-- | Events that a child thread may send to a parent thread. +data ChildEvent a = + ChildYield a + | ChildStop ThreadId (Maybe SomeException) + +------------------------------------------------------------------------------ +-- State threaded around the monad for thread management +------------------------------------------------------------------------------ + +-- | Conjunction is used for monadic/product style composition. Disjunction is +-- used for fold/sum style composition. We need to distiguish the two types of +-- SVars so that the scheduling of the two is independent. +data SVarTag = Conjunction | Disjunction deriving Eq + +-- | For fairly interleaved parallel composition the sched policy is FIFO +-- whereas for left biased parallel composition it is LIFO. +data SVarSched = LIFO | FIFO deriving Eq + +-- | Identify the type of the SVar. Two computations using the same style can +-- be scheduled on the same SVar. +data SVarStyle = SVarStyle SVarTag SVarSched deriving Eq + +-- | An SVar or a Stream Var is a conduit to the output from multiple streams +-- running concurrently and asynchronously. An SVar can be thought of as an +-- asynchronous IO handle. We can write any number of streams to an SVar in a +-- non-blocking manner and then read them back at any time at any pace. The +-- SVar would run the streams asynchronously and accumulate results. An SVar +-- may not really execute the stream completely and accumulate all the results. +-- However, it ensures that the reader can read the results at whatever paces +-- it wants to read. The SVar monitors and adapts to the consumer's pace. +-- +-- An SVar is a mini scheduler, it has an associated runqueue that holds the +-- stream tasks to be picked and run by a pool of worker threads. It has an +-- associated output queue where the output stream elements are placed by the +-- worker threads. A doorBell is used by the worker threads to intimate the +-- consumer thread about availability of new results in the output queue. More +-- workers are added to the SVar by 'fromStreamVar' on demand if the output +-- produced is not keeping pace with the consumer. On bounded SVars, workers +-- block on the output queue to provide throttling of the producer when the +-- consumer is not pulling fast enough. The number of workers may even get +-- reduced depending on the consuming pace. +-- +-- New work is enqueued either at the time of creation of the SVar or as a +-- result of executing the parallel combinators i.e. '<|' and '<|>' when the +-- already enqueued computations get evaluated. See 'joinStreamVar2'. +-- +data SVar m a = + SVar { outputQueue :: IORef [ChildEvent a] + , doorBell :: MVar Bool -- wakeup mechanism for outQ + , enqueue :: Stream m a -> IO () + , runqueue :: m () + , runningThreads :: IORef (Set ThreadId) + , queueEmpty :: m Bool + , svarStyle :: SVarStyle + } + +------------------------------------------------------------------------------ +-- The stream type +------------------------------------------------------------------------------ + +-- TBD use a functor instead of the bare type a? +-- XXX remove the Maybe, use "empty" as the base case + +-- | Represents a monadic stream of values of type 'a' constructed using +-- actions in monad 'm'. Streams can be composed sequentially or in parallel; +-- in product style compositions (monadic bind multiplies streams in a ListT +-- fashion) or in sum style compositions like 'Semigroup', 'Monoid', +-- 'Alternative' or variants of these. +newtype Stream m a = + Stream { + runStream :: forall r. + Maybe (SVar m a) -- local state + -> m r -- stop + -> (a -> Maybe (Stream m a) -> m r) -- yield + -> m r + } + +-- | A monad that can perform asynchronous/concurrent IO operations. Streams +-- that can be composed concurrently require the underlying monad to be +-- 'MonadAsync'. +type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) + +scons :: a -> Maybe (Stream m a) -> Stream m a +scons a r = Stream $ \_ _ yld -> yld a r + +snil :: Stream m a +snil = Stream $ \_ stp _ -> stp + +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +-- | '<>' concatenates two streams sequentially i.e. the first stream is +-- exhausted completely before yielding any element from the second stream. +instance Semigroup (Stream m a) where + m1 <> m2 = go m1 + where + go (Stream m) = Stream $ \_ stp yld -> + let stop = (runStream m2) Nothing stp yld + yield a Nothing = yld a (Just m2) + yield a (Just r) = yld a (Just (go r)) + in m Nothing stop yield + +------------------------------------------------------------------------------ +-- Monoid +------------------------------------------------------------------------------ + +instance Monoid (Stream m a) where + mempty = Stream $ \_ stp _ -> stp + mappend = (<>) + +------------------------------------------------------------------------------ +-- Interleave +------------------------------------------------------------------------------ + +-- | Same as '<=>'. +interleave :: Stream m a -> Stream m a -> Stream m a +interleave m1 m2 = Stream $ \_ stp yld -> do + let stop = (runStream m2) Nothing stp yld + yield a Nothing = yld a (Just m2) + yield a (Just r) = yld a (Just (interleave m2 r)) + (runStream m1) Nothing stop yield + +------------------------------------------------------------------------------ +-- Spawning threads and collecting result in streamed fashion +------------------------------------------------------------------------------ + +{-# INLINE doFork #-} +doFork :: MonadBaseControl IO m + => m () + -> (SomeException -> m ()) + -> m ThreadId +doFork action exHandler = + EL.mask $ \restore -> + liftBaseWith $ \runInIO -> forkIO $ do + -- XXX test the exception handling + _ <- runInIO $ EL.catch (restore action) exHandler + -- XXX restore state here? + return () + +-- XXX exception safety of all atomic/MVar operations + +{-# INLINE send #-} +send :: MonadIO m => SVar m a -> ChildEvent a -> m () +send sv msg = liftIO $ do + atomicModifyIORefCAS_ (outputQueue sv) $ \es -> msg : es + -- XXX need a memory barrier? The wake up must happen only after the + -- store has finished otherwise we can have lost wakeup problems. + void $ tryPutMVar (doorBell sv) True + +{-# INLINE sendStop #-} +sendStop :: MonadIO m => SVar m a -> m () +sendStop sv = liftIO myThreadId >>= \tid -> send sv (ChildStop tid Nothing) + +-- Note: Left associated compositions can grow this queue to a large size +{-# INLINE enqueueLIFO #-} +enqueueLIFO :: IORef [Stream m a] -> Stream m a -> IO () +enqueueLIFO q m = atomicModifyIORefCAS_ q $ \ ms -> m : ms + +runqueueLIFO :: MonadIO m => SVar m a -> IORef [Stream m a] -> m () +runqueueLIFO sv q = run + + where + + run = do + work <- dequeue + case work of + Nothing -> sendStop sv + Just m -> (runStream m) (Just sv) run yield + + sendit a = send sv (ChildYield a) + yield a Nothing = sendit a >> run + yield a (Just r) = sendit a >> (runStream r) (Just sv) run yield + + dequeue = liftIO $ atomicModifyIORefCAS q $ \ ms -> + case ms of + [] -> ([], Nothing) + x : xs -> (xs, Just x) + +{-# INLINE enqueueFIFO #-} +enqueueFIFO :: LinkedQueue (Stream m a) -> Stream m a -> IO () +enqueueFIFO = pushL + +runqueueFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m () +runqueueFIFO sv q = run + + where + + run = do + work <- dequeue + case work of + Nothing -> sendStop sv + Just m -> (runStream m) (Just sv) run yield + + dequeue = liftIO $ tryPopR q + sendit a = send sv (ChildYield a) + yield a Nothing = sendit a >> run + yield a (Just r) = sendit a >> liftIO (enqueueFIFO q r) >> run + +-- Thread tracking is needed for two reasons: +-- +-- 1) Killing threads on exceptions. Threads may not be allowed to go away by +-- themselves because they may run for significant times before going away or +-- worse they may be stuck in IO and never go away. +-- +-- 2) To know when all threads are done. + +{-# NOINLINE addThread #-} +addThread :: MonadIO m => SVar m a -> ThreadId -> m () +addThread sv tid = + liftIO $ modifyIORef (runningThreads sv) $ (\s -> S.insert tid s) + +{-# INLINE delThread #-} +delThread :: MonadIO m => SVar m a -> ThreadId -> m () +delThread sv tid = + liftIO $ modifyIORef (runningThreads sv) $ (\s -> S.delete tid s) + +{-# INLINE allThreadsDone #-} +allThreadsDone :: MonadIO m => SVar m a -> m Bool +allThreadsDone sv = liftIO $ do + readIORef (runningThreads sv) >>= return . S.null + +{-# NOINLINE handleChildException #-} +handleChildException :: MonadIO m => SVar m a -> SomeException -> m () +handleChildException sv e = do + tid <- liftIO myThreadId + send sv (ChildStop tid (Just e)) + +{-# NOINLINE pushWorker #-} +pushWorker :: MonadAsync m => SVar m a -> m () +pushWorker sv = + doFork (runqueue sv) (handleChildException sv) >>= addThread sv + +-- XXX When the queue is LIFO we can put a limit on the number of dispatches. +-- Also, if a worker blocks on the output queue we can decide if we want to +-- block or make it go away entirely, depending on the number of workers and +-- the type of the queue. +{-# INLINE sendWorkerWait #-} +sendWorkerWait :: MonadAsync m => SVar m a -> m () +sendWorkerWait sv = do + case svarStyle sv of + SVarStyle _ LIFO -> liftIO $ threadDelay 200 + SVarStyle _ FIFO -> liftIO $ threadDelay 0 + + output <- liftIO $ readIORef (outputQueue sv) + when (null output) $ do + done <- queueEmpty sv + if (not done) + then (pushWorker sv) >> sendWorkerWait sv + else void (liftIO $ takeMVar (doorBell sv)) + +-- | Pull a stream from an SVar. +{-# NOINLINE fromStreamVar #-} +fromStreamVar :: MonadAsync m => SVar m a -> Stream m a +fromStreamVar sv = Stream $ \_ stp yld -> do + -- XXX if reading the IORef is costly we can use a flag in the SVar to + -- indicate we are done. + done <- allThreadsDone sv + if done + then stp + else do + res <- liftIO $ tryTakeMVar (doorBell sv) + when (isNothing res) $ sendWorkerWait sv + list <- liftIO $ atomicModifyIORefCAS (outputQueue sv) $ \x -> ([], x) + -- To avoid lock overhead we read all events at once instead of reading + -- one at a time. We just reverse the list to process the events in the + -- order they arrived. Maybe we can use a queue instead? + (runStream $ processEvents (reverse list)) Nothing stp yld + + where + + handleException e tid = do + delThread sv tid + -- XXX implement kill async exception handling + -- liftIO $ readIORef (runningThreads sv) >>= mapM_ killThread + throwM e + + {-# INLINE processEvents #-} + processEvents [] = Stream $ \_ stp yld -> do + done <- allThreadsDone sv + if not done + then (runStream (fromStreamVar sv)) Nothing stp yld + else stp + + processEvents (ev : es) = Stream $ \_ stp yld -> do + let continue = (runStream (processEvents es)) Nothing stp yld + yield a = yld a (Just (processEvents es)) + + case ev of + ChildYield a -> yield a + ChildStop tid e -> + case e of + Nothing -> delThread sv tid >> continue + Just ex -> handleException ex tid + +getFifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a) +getFifoSVar ctype = do + outQ <- newIORef [] + outQMv <- newEmptyMVar + running <- newIORef S.empty + q <- newQ + let sv = + SVar { outputQueue = outQ + , doorBell = outQMv + , runningThreads = running + , runqueue = runqueueFIFO sv q + , enqueue = pushL q + , queueEmpty = liftIO $ nullQ q + , svarStyle = ctype + } + in return sv + +getLifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a) +getLifoSVar ctype = do + outQ <- newIORef [] + outQMv <- newEmptyMVar + running <- newIORef S.empty + q <- newIORef [] + let checkEmpty = liftIO (readIORef q) >>= return . null + let sv = + SVar { outputQueue = outQ + , doorBell = outQMv + , runningThreads = running + , runqueue = runqueueLIFO sv q + , enqueue = enqueueLIFO q + , queueEmpty = checkEmpty + , svarStyle = ctype + } + in return sv + +-- | Create a new empty SVar. +newEmptySVar :: MonadAsync m => SVarStyle -> m (SVar m a) +newEmptySVar style = do + sv <- liftIO $ + case style of + SVarStyle _ FIFO -> do + c <- getFifoSVar style + return c + SVarStyle _ LIFO -> do + c <- getLifoSVar style + return c + return sv + +-- | Create a new SVar and enqueue one stream computation on it. +newStreamVar1 :: MonadAsync m => SVarStyle -> Stream m a -> m (SVar m a) +newStreamVar1 style m = do + sv <- newEmptySVar style + -- Note: We must have all the work on the queue before sending the + -- pushworker, otherwise the pushworker may exit before we even get a + -- chance to push. + liftIO $ (enqueue sv) m + pushWorker sv + return sv + +-- | Create a new SVar and enqueue two stream computations on it. +newStreamVar2 :: MonadAsync m + => SVarStyle -> Stream m a -> Stream m a -> m (SVar m a) +newStreamVar2 style m1 m2 = do + -- Note: We must have all the work on the queue before sending the + -- pushworker, otherwise the pushworker may exit before we even get a + -- chance to push. + sv <- liftIO $ + case style of + SVarStyle _ FIFO -> do + c <- getFifoSVar style + (enqueue c) m1 >> (enqueue c) m2 + return c + SVarStyle _ LIFO -> do + c <- getLifoSVar style + (enqueue c) m2 >> (enqueue c) m1 + return c + pushWorker sv + return sv + +-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then +-- be read back from the SVar using 'fromSVar'. +toStreamVar :: MonadAsync m => SVar m a -> Stream m a -> m () +toStreamVar sv m = do + liftIO $ (enqueue sv) m + done <- allThreadsDone sv + -- XXX there may be a race here unless we are running in the consumer + -- thread. This is safe only when called from the consumer thread or when + -- no consumer is present. + when done $ pushWorker sv + +------------------------------------------------------------------------------ +-- Running streams concurrently +------------------------------------------------------------------------------ + +-- Concurrency rate control. Our objective is to create more threads on demand +-- if the consumer is running faster than us. As soon as we encounter an +-- Alternative composition we create a push pull pair of threads. We use a +-- channel for communication between the consumer pulling from the channel and +-- the producer who pushing to the channel. The producer creates more threads +-- if no output is seen on the channel, that is the consumer is running faster. +-- However this mechanism can be problematic if the initial production latency +-- is high, we may end up creating too many threads. So we need some way to +-- monitor and use the latency as well. +-- +-- TBD We may run computations at the lower level of the composition tree +-- serially even if they are composed using a parallel combinator. We can use +-- <> in place of <| and <=> in place of <|>. If we find that a parallel +-- channel immediately above a computation becomes empty we can switch to +-- parallelizing the computation. For that we can use a state flag to fork the +-- rest of the computation at any point of time inside the Monad bind operation +-- if the consumer is running at a faster speed. +-- +-- TBD the alternative composition allows us to dispatch a chunkSize of only 1. +-- If we have to dispatch in arbitrary chunksizes we will need to compose the +-- parallel actions using a data constructor (Free Alternative) instead so that +-- we can divide it in chunks of arbitrary size before dispatch. If the stream +-- is composed of hierarchically composed grains of different sizes then we can +-- always switch to a desired granularity depending on the consumer speed. +-- +-- TBD for pure work (when we are not in the IO monad) we can divide it into +-- just the number of CPUs. + +{-# NOINLINE withNewSVar2 #-} +withNewSVar2 :: MonadAsync m + => SVarStyle -> Stream m a -> Stream m a -> Stream m a +withNewSVar2 style m1 m2 = Stream $ \_ stp yld -> do + sv <- newStreamVar2 style m1 m2 + (runStream (fromStreamVar sv)) Nothing stp yld + +-- | Join two computations on the currently running 'SVar' queue for concurrent +-- execution. The 'SVarStyle' required by the current composition context is +-- passed as one of the parameters. If the style does not match with the style +-- of the current 'SVar' we create a new 'SVar' and schedule the computations +-- on that. The newly created SVar joins as one of the computations on the +-- current SVar queue. +-- +-- When we are using parallel composition, an SVar is passed around as a state +-- variable. We try to schedule a new parallel computation on the SVar passed +-- to us. The first time, when no SVar exists, a new SVar is created. +-- Subsequently, 'joinStreamVar2' may get called when a computation already +-- scheduled on the SVar is further evaluated. For example, when (a \<|> b) is +-- evaluated it calls a 'joinStreamVar2' to put 'a' and 'b' on the current scheduler +-- queue. However, if the scheduling and composition style of the new +-- computation being scheduled is different than the style of the current SVar, +-- then we create a new SVar and schedule it on that. +-- +-- For example: +-- +-- * (x \<|> y) \<|> (t \<|> u) -- all of them get scheduled on the same SVar +-- * (x \<|> y) \<|> (t \<| u) -- @t@ and @u@ get scheduled on a new child SVar +-- because of the scheduling policy change. +-- * if we 'adapt' a stream of type 'AsyncT' to a stream of type +-- 'ParallelT', we create a new SVar at the transitioning bind. +-- * When the stream is switching from disjunctive composition to conjunctive +-- composition and vice-versa we create a new SVar to isolate the scheduling +-- of the two. +-- +{-# INLINE joinStreamVar2 #-} +joinStreamVar2 :: MonadAsync m + => SVarStyle -> Stream m a -> Stream m a -> Stream m a +joinStreamVar2 style m1 m2 = Stream $ \st stp yld -> do + case st of + Just sv | svarStyle sv == style -> + liftIO ((enqueue sv) m2) >> (runStream m1) st stp yld + _ -> (runStream (withNewSVar2 style m1 m2)) Nothing stp yld + +------------------------------------------------------------------------------ +-- Semigroup and Monoid style compositions for parallel actions +------------------------------------------------------------------------------ + +{- +-- | Same as '<>|'. +parAhead :: Stream m a -> Stream m a -> Stream m a +parAhead = undefined + +-- | Sequential composition similar to '<>' except that it can execute the +-- action on the right in parallel ahead of time. Returns the results in +-- sequential order like '<>' from left to right. +(<>|) :: Stream m a -> Stream m a -> Stream m a +(<>|) = parAhead +-} + +-- | Same as '<|>'. Since this schedules all the composed streams fairly you +-- cannot fold infinite number of streams using this operation. +{-# INLINE parAlt #-} +parAlt :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +parAlt = joinStreamVar2 (SVarStyle Disjunction FIFO) + +-- | Same as '<|'. Since this schedules the left side computation first you can +-- right fold an infinite container using this operator. However a left fold +-- will not work well as it first unpeels the whole structure before scheduling +-- a computation requiring an amount of memory proportional to the size of the +-- structure. +{-# INLINE parLeft #-} +parLeft :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +parLeft = joinStreamVar2 (SVarStyle Disjunction LIFO) + +------------------------------------------------------------------------------- +-- Instances (only used for deriving newtype instances) +------------------------------------------------------------------------------- + +-- Stream type is not exposed, these instances are only for deriving instances +-- for the newtype wrappers based on Stream. + +-- Dummy Instances, defined to enable the definition of other instances that +-- require a Monad constraint. Must be defined by the newtypes. + +instance Monad m => Functor (Stream m) where + fmap = undefined + +instance Monad m => Applicative (Stream m) where + pure = undefined + (<*>) = undefined + +instance Monad m => Monad (Stream m) where + return = pure + (>>=) = undefined + +------------------------------------------------------------------------------ +-- Alternative & MonadPlus +------------------------------------------------------------------------------ + +-- | `empty` represents an action that takes non-zero time to complete. Since +-- all actions take non-zero time, an `Alternative` composition ('<|>') is a +-- monoidal composition executing all actions in parallel, it is similar to +-- '<>' except that it runs all the actions in parallel and interleaves their +-- results fairly. +instance MonadAsync m => Alternative (Stream m) where + empty = mempty + (<|>) = parAlt + +instance MonadAsync m => MonadPlus (Stream m) where + mzero = empty + mplus = (<|>) + +------------------------------------------------------------------------------- +-- Transformer +------------------------------------------------------------------------------- + +instance MonadTrans Stream where + lift mx = Stream $ \_ _ yld -> mx >>= (\a -> (yld a Nothing)) + +instance (MonadBase b m, Monad m) => MonadBase b (Stream m) where + liftBase = liftBaseDefault + +------------------------------------------------------------------------------ +-- Standard transformer instances +------------------------------------------------------------------------------ + +instance MonadIO m => MonadIO (Stream m) where + liftIO = lift . liftIO + +instance MonadThrow m => MonadThrow (Stream m) where + throwM = lift . throwM + +-- XXX handle and test cross thread state transfer +instance MonadError e m => MonadError e (Stream m) where + throwError = lift . throwError + catchError m h = Stream $ \st stp yld -> + let handle r = r `catchError` \e -> (runStream (h e)) st stp yld + yield a Nothing = yld a Nothing + yield a (Just r) = yld a (Just (catchError r h)) + in handle $ (runStream m) st stp yield + +instance MonadReader r m => MonadReader r (Stream m) where + ask = lift ask + local f m = Stream $ \st stp yld -> + let yield a Nothing = local f $ yld a Nothing + yield a (Just r) = local f $ yld a (Just (local f r)) + in (runStream m) st (local f stp) yield + +instance MonadState s m => MonadState s (Stream m) where + get = lift get + put x = lift (put x) + state k = lift (state k) diff --git a/src/Streamly/Examples.hs b/src/Streamly/Examples.hs new file mode 100644 index 0000000..e0e3890 --- /dev/null +++ b/src/Streamly/Examples.hs @@ -0,0 +1,60 @@ +{-# LANGUAGE CPP #-} +-- | +-- Module : Streamly.Examples +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- To run these examples: +-- +-- You need to build the library with the "examples" flag on e.g. +-- @stack build --flag streamly:examples@. To include the SDL examples as well +-- use @stack build --flag streamly:examples-sdl@. You will have to make sure +-- that you have the SDL OS package installed on your system and the headers +-- are visible to Haskell build tool. +-- +-- You can directly evaluate the respective file and its main function using +-- ghc, like this (this may not work when built with @examples-sdl@ flag): +-- +-- @ +-- \$ stack ghc -- -e acidRainGame src\/Streamly\/Examples\/AcidRainGame.hs +-- @ +-- +-- Alternatively, you can create a file calling the main function and compile +-- it: +-- +-- @ +-- \$ cat ex.hs +-- import Streamly.Examples +-- main = acidRainGame +-- \$ stack ghc ex.hs +-- @ +-- +-- Alternatively, you can just import "Streamly.Examples" and evaluate the +-- respective function in GHCi. +-- +module Streamly.Examples + ( + -- Reactive Programming + acidRainGame +#ifdef EXAMPLES_SDL + , circlingSquare +#endif + + -- Concurrent Programming + , listDirRecursive + , mergeSortedStreams + , searchEngineQuery + ) +where + +import Streamly.Examples.AcidRainGame +#ifdef EXAMPLES_SDL +import Streamly.Examples.CirclingSquare +#endif +import Streamly.Examples.ListDirRecursive +import Streamly.Examples.MergeSortedStreams +import Streamly.Examples.SearchEngineQuery diff --git a/src/Streamly/Examples/AcidRainGame.hs b/src/Streamly/Examples/AcidRainGame.hs new file mode 100644 index 0000000..8f4ce02 --- /dev/null +++ b/src/Streamly/Examples/AcidRainGame.hs @@ -0,0 +1,46 @@ +{-# LANGUAGE FlexibleContexts #-} + +-- This example is adapted from Gabriel Gonzalez's pipes-concurrency package. +-- https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html + +module Streamly.Examples.AcidRainGame where + +import Streamly +import Control.Concurrent (threadDelay) +import Control.Monad (when) +import Control.Monad.State (MonadState, get, modify, runStateT) +import Data.Semigroup (cycle1) + +data Event = Harm Int | Heal Int | Quit deriving (Show) + +userAction :: MonadIO m => StreamT m Event +userAction = cycle1 $ liftIO askUser + where + askUser = do + command <- getLine + case command of + "potion" -> return (Heal 10) + "quit" -> return Quit + _ -> putStrLn "What?" >> askUser + +acidRain :: MonadIO m => StreamT m Event +acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1) + +game :: (MonadAsync m, MonadState Int m) => StreamT m () +game = do + event <- userAction <|> acidRain + case event of + Harm n -> modify $ \h -> h - n + Heal n -> modify $ \h -> h + n + Quit -> fail "quit" + + h <- get + when (h <= 0) $ fail "You die!" + liftIO $ putStrLn $ "Health = " ++ show h + +acidRainGame :: IO () +acidRainGame = do + putStrLn "Your health is deteriorating due to acid rain,\ + \ type \"potion\" or \"quit\"" + _ <- runStateT (runStreamT game) 60 + return () diff --git a/src/Streamly/Examples/CirclingSquare.hs b/src/Streamly/Examples/CirclingSquare.hs new file mode 100644 index 0000000..08151d6 --- /dev/null +++ b/src/Streamly/Examples/CirclingSquare.hs @@ -0,0 +1,90 @@ +-- Adapted from the Yampa package. +-- Displays a square moving in a circle. To move the position drag the mouse. +-- +-- Requires the SDL package, assuming streamly has already been built, you can +-- compile it like this: +-- stack ghc --package SDL circle-mouse.hs + +module Streamly.Examples.CirclingSquare where + +import Data.IORef +import Graphics.UI.SDL as SDL +import Streamly +import Streamly.Time + +------------------------------------------------------------------------------ +-- SDL Graphics Init +------------------------------------------------------------------------------ + +sdlInit :: IO () +sdlInit = do + SDL.init [InitVideo] + + let width = 640 + height = 480 + _ <- SDL.setVideoMode width height 16 [SWSurface] + SDL.setCaption "Test" "" + +------------------------------------------------------------------------------ +-- Display a box at a given coordinates +------------------------------------------------------------------------------ + +display :: (Double, Double) -> IO () +display (playerX, playerY) = do + screen <- getVideoSurface + + -- Paint screen green + let format = surfaceGetPixelFormat screen + bgColor <- mapRGB format 55 60 64 + _ <- fillRect screen Nothing bgColor + + -- Paint small red square, at an angle 'angle' with respect to the center + foreC <- mapRGB format 212 108 73 + let side = 10 + x = round playerX + y = round playerY + _ <- fillRect screen (Just (Rect x y side side)) foreC + + -- Double buffering + SDL.flip screen + +------------------------------------------------------------------------------ +-- Wait and update Controller Position if it changes +------------------------------------------------------------------------------ + +refreshRate :: Int +refreshRate = 40 + +updateController :: IORef (Double, Double) -> IO () +updateController ref = periodic refreshRate $ do + e <- pollEvent + case e of + MouseMotion x y _ _ -> do + writeIORef ref (fromIntegral x, fromIntegral y) + _ -> return () + +------------------------------------------------------------------------------ +-- Periodically refresh the output display +------------------------------------------------------------------------------ + +updateDisplay :: IORef (Double, Double) -> IO () +updateDisplay cref = withClock clock refreshRate displaySquare + + where + + clock = do + t <- SDL.getTicks + return ((fromIntegral t) * 1000) + + speed = 8 + radius = 30 + displaySquare time = do + (x, y) <- readIORef cref + let t = (fromIntegral time) * speed / 1000000 + in display (x + cos t * radius, y + sin t * radius) + +circlingSquare :: IO () +circlingSquare = do + sdlInit + cref <- newIORef (0,0) + runStreamT $ liftIO (updateController cref) <|> liftIO (updateDisplay cref) diff --git a/src/Streamly/Examples/ListDirRecursive.hs b/src/Streamly/Examples/ListDirRecursive.hs new file mode 100644 index 0000000..e52ac7e --- /dev/null +++ b/src/Streamly/Examples/ListDirRecursive.hs @@ -0,0 +1,19 @@ +{-# LANGUAGE FlexibleContexts #-} + +module Streamly.Examples.ListDirRecursive where + +import Path.IO (listDir, getCurrentDir) +import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) +import Streamly + +listDirRecursive :: IO () +listDirRecursive = do + liftIO $ hSetBuffering stdout LineBuffering + runStreamT $ getCurrentDir >>= readdir + where readdir d = do + (ds, fs) <- lift $ listDir d + liftIO $ mapM_ putStrLn $ map show fs ++ map show ds + --foldWith (<>) $ map readdir ds -- serial + --foldWith (<=>) $ map readdir ds -- serial interleaved + foldWith (<|) $ map readdir ds -- concurrent left biased + --foldWith (<|>) $ map readdir ds -- concurrent interleaved diff --git a/src/Streamly/Examples/MergeSortedStreams.hs b/src/Streamly/Examples/MergeSortedStreams.hs new file mode 100644 index 0000000..d39a8a7 --- /dev/null +++ b/src/Streamly/Examples/MergeSortedStreams.hs @@ -0,0 +1,41 @@ +{-# LANGUAGE FlexibleContexts #-} + +module Streamly.Examples.MergeSortedStreams where + +import Data.Word +import System.Random (getStdGen, randoms) +import Data.List (sort) +import Streamly +import qualified Streamly.Prelude as A + +getSorted :: MonadIO m => StreamT m Word16 +getSorted = do + g <- liftIO getStdGen + let ls = take 100000 (randoms g) :: [Word16] + foldMapWith (<>) return (sort ls) + +mergeAsync :: (Ord a, MonadAsync m) + => StreamT m a -> StreamT m a -> StreamT m a +mergeAsync a b = do + x <- lift $ async a + y <- lift $ async b + merge x y + +merge :: (Ord a, MonadAsync m) => StreamT m a -> StreamT m a -> StreamT m a +merge a b = do + a1 <- lift $ A.uncons a + case a1 of + Nothing -> b + Just (x, ma) -> do + b1 <- lift $ A.uncons b + case b1 of + Nothing -> return x <> ma + Just (y, mb) -> + if (y < x) + then (return y) <> merge (return x <> ma) mb + else (return x) <> merge ma (return y <> mb) + +mergeSortedStreams :: IO () +mergeSortedStreams = do + xs <- A.toList $ mergeAsync getSorted getSorted + putStrLn $ show $ length xs diff --git a/src/Streamly/Examples/SearchEngineQuery.hs b/src/Streamly/Examples/SearchEngineQuery.hs new file mode 100644 index 0000000..313ea5d --- /dev/null +++ b/src/Streamly/Examples/SearchEngineQuery.hs @@ -0,0 +1,19 @@ +module Streamly.Examples.SearchEngineQuery where + +import Streamly +import Network.HTTP.Simple + +-- Runs three search engine queries in parallel. +searchEngineQuery :: IO () +searchEngineQuery = do + putStrLn "Using parallel alternative" + runStreamT $ google <|> bing <|> duckduckgo + + putStrLn "\nUsing parallel applicative zip" + runZipAsync $ (,,) <$> pure google <*> pure bing <*> pure duckduckgo + + where + get s = liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s)) + google = get "https://www.google.com/search?q=haskell" + bing = get "https://www.bing.com/search?q=haskell" + duckduckgo = get "https://www.duckduckgo.com/?q=haskell" diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs new file mode 100644 index 0000000..0871526 --- /dev/null +++ b/src/Streamly/Prelude.hs @@ -0,0 +1,430 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving#-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE UndecidableInstances #-} -- XXX + +-- | +-- Module : Streamly.Prelude +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- +module Streamly.Prelude + ( + -- * Construction + cons + , nil + , unfoldr + , unfoldrM + , each + , fromHandle + + -- * Elimination + , foldr + , foldrM + , foldl + , foldlM + , uncons + + -- * Elimination Special Folds + , toList + , toHandle + , all + , any + , sum + , product + , head + , last + , length + , elem + , notElem + , maximum + , minimum + + -- * Filtering + , filter + , take + , takeWhile + , drop + , dropWhile + + -- * Transformation + , mapM + , mapM_ + , sequence + + -- * Zipping + , zipWith + , zipWithM + , zipAsyncWith + , zipAsyncWithM + ) +where + +import Control.Monad (liftM) +import Control.Monad.IO.Class (MonadIO(..)) +import Data.Semigroup (Semigroup(..)) +import Prelude hiding (filter, drop, dropWhile, take, + takeWhile, zipWith, foldr, foldl, + mapM, mapM_, sequence, all, any, + sum, product, elem, notElem, + maximum, minimum, head, last, + length) +import qualified Prelude as Prelude +import qualified System.IO as IO + +import Streamly.Core +import Streamly.Streams + +------------------------------------------------------------------------------ +-- Construction +------------------------------------------------------------------------------ + +-- | Build a Stream by unfolding pure steps starting from a seed. +unfoldr :: Streaming t => (b -> Maybe (a, b)) -> b -> t m a +unfoldr step = fromStream . go + where + go s = Stream $ \_ stp yld -> do + case step s of + Nothing -> stp + Just (a, b) -> yld a (Just (go b)) + +-- | Build a Stream by unfolding monadic steps starting from a seed. +unfoldrM :: (Streaming t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a +unfoldrM step = fromStream . go + where + go s = Stream $ \_ stp yld -> do + mayb <- step s + case mayb of + Nothing -> stp + Just (a, b) -> yld a (Just (go b)) + +-- XXX need eachInterleaved, eachAsync, eachParallel +-- | Same as @foldWith (<>)@ but more efficient. +{-# INLINE each #-} +each :: (Foldable f, Streaming t) => f a -> t m a +each xs = Prelude.foldr cons nil xs + +-- | Read lines from an IO Handle into a stream of Strings. +fromHandle :: (MonadIO m, Streaming t) => IO.Handle -> t m String +fromHandle h = fromStream $ go + where + go = Stream $ \_ stp yld -> do + eof <- liftIO $ IO.hIsEOF h + if eof + then stp + else do + str <- liftIO $ IO.hGetLine h + yld str (Just go) + +------------------------------------------------------------------------------ +-- Elimination +------------------------------------------------------------------------------ + +-- Parallel variants of folds? + +-- | Right fold. +foldr :: (Monad m, Streaming t) => (a -> b -> b) -> b -> t m a -> m b +foldr step acc m = go (toStream m) + where + go m1 = + let stop = return acc + yield a Nothing = return (step a acc) + yield a (Just x) = go x >>= \b -> return (step a b) + in (runStream m1) Nothing stop yield + +-- | Right fold with a monadic step function. See 'toList' for an example use. +{-# INLINE foldrM #-} +foldrM :: Streaming t => (a -> m b -> m b) -> m b -> t m a -> m b +foldrM step acc m = go (toStream m) + where + go m1 = + let stop = acc + yield a Nothing = step a acc + yield a (Just x) = step a (go x) + in (runStream m1) Nothing stop yield + +-- | Strict left fold. This is typed to work with the foldl package. To use +-- directly pass 'id' as the third argument. +foldl :: (Monad m, Streaming t) + => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b +foldl step begin done m = go begin (toStream m) + where + go !acc m1 = + let stop = return (done acc) + yield a Nothing = return (done (step acc a)) + yield a (Just x) = go (step acc a) x + in (runStream m1) Nothing stop yield + +-- | Strict left fold, with monadic step function. This is typed to work +-- with the foldl package. To use directly pass 'id' as the third argument. +foldlM :: (Monad m, Streaming t) + => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b +foldlM step begin done m = go begin (toStream m) + where + go !acc m1 = + let stop = acc >>= done + yield a Nothing = acc >>= \b -> step b a >>= done + yield a (Just x) = acc >>= \b -> go (step b a) x + in (runStream m1) Nothing stop yield + +-- | Decompose a stream into its head and tail. If the stream is empty, returns +-- 'Nothing'. If the stream is non-empty, returns 'Just (a, ma)', where 'a' is +-- the head of the stream and 'ma' its tail. +uncons :: (Streaming t, Monad m) => t m a -> m (Maybe (a, t m a)) +uncons m = + let stop = return Nothing + yield a Nothing = return (Just (a, nil)) + yield a (Just x) = return (Just (a, (fromStream x))) + in (runStream (toStream m)) Nothing stop yield + +-- | Write a stream of Strings to an IO Handle. +toHandle :: (Streaming t, MonadIO m) => IO.Handle -> t m String -> m () +toHandle h m = go (toStream m) + where + go m1 = + let stop = return () + yield a Nothing = liftIO (IO.hPutStrLn h a) + yield a (Just x) = liftIO (IO.hPutStrLn h a) >> go x + in (runStream m1) Nothing stop yield + +------------------------------------------------------------------------------ +-- Special folds +------------------------------------------------------------------------------ + +-- | Convert a stream into a list in the underlying monad. +{-# INLINABLE toList #-} +toList :: (Monad m, Streaming t) => t m a -> m [a] +toList = foldrM (\a xs -> liftM (a :) xs) (return []) + +-- | Take first 'n' elements from the stream and discard the rest. +take :: Streaming t => Int -> t m a -> t m a +take n m = fromStream $ go n (toStream m) + where + go n1 m1 = Stream $ \ctx stp yld -> do + let yield a Nothing = yld a Nothing + yield a (Just x) = yld a (Just (go (n1 - 1) x)) + if (n1 <= 0) + then stp + else (runStream m1) ctx stp yield + +-- XXX This is not as efficient as it could be. We need a short circuiting at +-- a lower level. Compare with simple-conduit, filtering there cuts down time +-- due to short circuting whereas the time spent remains the same here. + +-- | Include only those elements that pass a predicate. +{-# INLINE filter #-} +filter :: (Streaming t, Monad (t m)) => (a -> Bool) -> t m a -> t m a +filter p m = m >>= \x -> if p x then return x else nil + +-- | End the stream as soon as the predicate fails on an element. +takeWhile :: Streaming t => (a -> Bool) -> t m a -> t m a +takeWhile p m = fromStream $ go (toStream m) + where + go m1 = Stream $ \ctx stp yld -> do + let yield a Nothing | p a = yld a Nothing + | otherwise = stp + yield a (Just x) | p a = yld a (Just (go x)) + | otherwise = stp + in (runStream m1) ctx stp yield + +-- | Discard first 'n' elements from the stream and take the rest. +drop :: Streaming t => Int -> t m a -> t m a +drop n m = fromStream $ go n (toStream m) + where + go n1 m1 = Stream $ \ctx stp yld -> do + let yield _ Nothing = stp + yield _ (Just x) = (runStream $ go (n1 - 1) x) ctx stp yld + if (n1 <= 0) + then (runStream m1) ctx stp yld + else (runStream m1) ctx stp yield + +-- | Drop elements in the stream as long as the predicate succeeds and then +-- take the rest of the stream. +dropWhile :: Streaming t => (a -> Bool) -> t m a -> t m a +dropWhile p m = fromStream $ go (toStream m) + where + go m1 = Stream $ \ctx stp yld -> do + let yield a Nothing | p a = stp + | otherwise = yld a Nothing + yield a (Just x) | p a = (runStream (go x)) ctx stp yield + | otherwise = yld a (Just x) + in (runStream m1) ctx stp yield + +-- | Determine whether all elements of a stream satisfy a predicate. +all :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool +all p m = go (toStream m) + where + go m1 = + let yield a Nothing | p a = return True + | otherwise = return False + yield a (Just x) | p a = go x + | otherwise = return False + in (runStream m1) Nothing (return True) yield + +-- | Determine whether any of the elements of a stream satisfy a predicate. +any :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool +any p m = go (toStream m) + where + go m1 = + let yield a Nothing | p a = return True + | otherwise = return False + yield a (Just x) | p a = return True + | otherwise = go x + in (runStream m1) Nothing (return False) yield + +-- | Determine the sum of all elements of a stream of numbers +sum :: (Streaming t, Monad m, Num a) => t m a -> m a +sum = foldl (+) 0 id + +-- | Determine the product of all elements of a stream of numbers +product :: (Streaming t, Monad m, Num a) => t m a -> m a +product = foldl (*) 0 id + +-- | Extract the first element of the stream, if any. +head :: (Streaming t, Monad m) => t m a -> m (Maybe a) +head m = + let stop = return Nothing + yield a _ = return (Just a) + in (runStream (toStream m)) Nothing stop yield + +-- | Extract the last element of the stream, if any. +last :: (Streaming t, Monad m) => t m a -> m (Maybe a) +last m = go (toStream m) + where + go m1 = + let stop = return Nothing + yield a Nothing = return (Just a) + yield _ (Just x) = go x + in (runStream m1) Nothing stop yield + +-- | Determine whether an element is present in the stream. +elem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool +elem e m = go (toStream m) + where + go m1 = + let stop = return False + yield a Nothing = return (a == e) + yield a (Just x) = if (a == e) then return True else go x + in (runStream m1) Nothing stop yield + +-- | Determine whether an element is not present in the stream. +notElem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool +notElem e m = go (toStream m) + where + go m1 = + let stop = return True + yield a Nothing = return (a /= e) + yield a (Just x) = if (a == e) then return False else go x + in (runStream m1) Nothing stop yield + +-- | Determine the length of the stream. +length :: (Streaming t, Monad m) => t m a -> m Int +length = foldl (\n _ -> n + 1) 0 id + +-- | Determine the minimum element in a stream. +minimum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a) +minimum m = go Nothing (toStream m) + where + go r m1 = + let stop = return r + yield a Nothing = return $ min_ a r + yield a (Just x) = go (min_ a r) x + in (runStream m1) Nothing stop yield + + min_ a r = case r of + Nothing -> Just a + Just e -> Just $ min a e + +-- | Determine the maximum element in a stream. +maximum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a) +maximum m = go Nothing (toStream m) + where + go r m1 = + let stop = return r + yield a Nothing = return $ max_ a r + yield a (Just x) = go (max_ a r) x + in (runStream m1) Nothing stop yield + + max_ a r = case r of + Nothing -> Just a + Just e -> Just $ max a e + +------------------------------------------------------------------------------ +-- Transformation +------------------------------------------------------------------------------ + +-- XXX Parallel variants of these? mapMWith et al. sequenceWith. + +-- | Replace each element of the stream with the result of a monadic action +-- applied on the element. +mapM :: (Streaming t, Monad m) => (a -> m b) -> t m a -> t m b +mapM f m = fromStream $ go (toStream m) + where + go m1 = Stream $ \_ stp yld -> do + let stop = stp + yield a Nothing = f a >>= \b -> yld b Nothing + yield a (Just x) = f a >>= \b -> yld b (Just (go x)) + in (runStream m1) Nothing stop yield + +-- | Apply a monadic action to each element of the stream and discard the +-- output of the action. +mapM_ :: (Streaming t, Monad m) => (a -> m b) -> t m a -> m () +mapM_ f m = go (toStream m) + where + go m1 = + let stop = return () + yield a Nothing = f a >> return () + yield a (Just x) = f a >> go x + in (runStream m1) Nothing stop yield + +-- | Reduce a stream of monadic actions to a stream of the output of those +-- actions. +sequence :: (Streaming t, Monad m) => t m (m a) -> t m a +sequence m = fromStream $ go (toStream m) + where + go m1 = Stream $ \_ stp yld -> do + let stop = stp + yield a Nothing = a >>= \b -> yld b Nothing + yield a (Just x) = a >>= \b -> yld b (Just (go x)) + in (runStream m1) Nothing stop yield + +------------------------------------------------------------------------------ +-- Serially Zipping Streams +------------------------------------------------------------------------------ + +-- | Zip two streams serially using a monadic zipping function. +zipWithM :: Streaming t => (a -> b -> t m c) -> t m a -> t m b -> t m c +zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2) + where + go mx my = Stream $ \_ stp yld -> do + let merge a ra = + let yield2 b Nothing = (runStream (g a b)) Nothing stp yld + yield2 b (Just rb) = + (runStream ((g a b) <> (go ra rb))) Nothing stp yld + in (runStream my) Nothing stp yield2 + let yield1 a Nothing = merge a snil + yield1 a (Just ra) = merge a ra + (runStream mx) Nothing stp yield1 + g a b = toStream $ f a b + +------------------------------------------------------------------------------ +-- Parallely Zipping Streams +------------------------------------------------------------------------------ + +-- | Zip two streams asyncly (i.e. both the elements being zipped are generated +-- concurrently) using a monadic zipping function. +zipAsyncWithM :: (Streaming t, MonadAsync m) + => (a -> b -> t m c) -> t m a -> t m b -> t m c +zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp yld -> do + ma <- async m1 + mb <- async m2 + (runStream (toStream (zipWithM f ma mb))) Nothing stp yld diff --git a/src/Streamly/Streams.hs b/src/Streamly/Streams.hs new file mode 100644 index 0000000..4a297d4 --- /dev/null +++ b/src/Streamly/Streams.hs @@ -0,0 +1,985 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving#-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE UndecidableInstances #-} -- XXX + +-- | +-- Module : Streamly.Streams +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- +module Streamly.Streams + ( + Streaming (..) + , MonadAsync + + -- * SVars + , SVarSched (..) + , SVarTag (..) + , SVarStyle (..) + , SVar + , newEmptySVar + + -- * Construction + , streamBuild + , fromCallback + , fromSVar + + -- * Elimination + , cons + , nil + , streamFold + , runStreaming + , toSVar + + -- * Transformation + , async + + -- * Stream Styles + , StreamT + , InterleavedT + , AsyncT + , ParallelT + , ZipStream + , ZipAsync + + -- * Type Adapters + , serially + , interleaving + , asyncly + , parallely + , zipping + , zippingAsync + , adapt + + -- * Running Streams + , runStreamT + , runInterleavedT + , runAsyncT + , runParallelT + , runZipStream + , runZipAsync + + -- * Zipping + , zipWith + , zipAsyncWith + + -- * Sum Style Composition + , (<=>) + , (<|) + + -- * Fold Utilities + -- $foldutils + , foldWith + , foldMapWith + , forEachWith + ) +where + +import Control.Applicative (Alternative (..), liftA2) +import Control.Monad (MonadPlus(..), ap) +import Control.Monad.Base (MonadBase (..)) +import Control.Monad.Catch (MonadThrow) +import Control.Monad.Error.Class (MonadError(..)) +import Control.Monad.IO.Class (MonadIO(..)) +import Control.Monad.Reader.Class (MonadReader(..)) +import Control.Monad.State.Class (MonadState(..)) +import Control.Monad.Trans.Class (MonadTrans) +import Data.Semigroup (Semigroup(..)) +import Prelude hiding (drop, take, zipWith) +import Streamly.Core + +------------------------------------------------------------------------------ +-- Types that can behave as a Stream +------------------------------------------------------------------------------ + +-- | Class of types that can represent a stream of elements of some type 'a' in +-- some monad 'm'. +class Streaming t where + toStream :: t m a -> Stream m a + fromStream :: Stream m a -> t m a + +------------------------------------------------------------------------------ +-- Constructing a stream +------------------------------------------------------------------------------ + +-- | Add an element a the head of a stream. +cons :: (Streaming t) => a -> t m a -> t m a +cons a r = fromStream $ scons a (Just (toStream r)) + +-- | An empty stream. +nil :: Streaming t => t m a +nil = fromStream $ snil + +-- | Build a stream from its church encoding. The function passed maps +-- directly to the underlying representation of the stream type. The second +-- parameter to the function is the "yield" function yielding a value and the +-- remaining stream if any otherwise 'Nothing'. The third parameter is to +-- represent an "empty" stream. +streamBuild :: Streaming t + => (forall r. Maybe (SVar m a) + -> (a -> Maybe (t m a) -> m r) + -> m r + -> m r) + -> t m a +streamBuild k = fromStream $ Stream $ \sv stp yld -> + let yield a Nothing = yld a Nothing + yield a (Just r) = yld a (Just (toStream r)) + in k sv yield stp + +-- | Build a singleton stream from a callback function. +fromCallback :: (Streaming t) => (forall r. (a -> m r) -> m r) -> t m a +fromCallback k = fromStream $ Stream $ \_ _ yld -> k (\a -> yld a Nothing) + +-- | Read an SVar to get a stream. +fromSVar :: (MonadAsync m, Streaming t) => SVar m a -> t m a +fromSVar sv = fromStream $ fromStreamVar sv + +------------------------------------------------------------------------------ +-- Destroying a stream +------------------------------------------------------------------------------ + +-- | Fold a stream using its church encoding. The second argument is the "step" +-- function consuming an element and the remaining stream, if any. The third +-- argument is for consuming an "empty" stream that yields nothing. +streamFold :: Streaming t + => Maybe (SVar m a) -> (a -> Maybe (t m a) -> m r) -> m r -> t m a -> m r +streamFold sv step blank m = + let yield a Nothing = step a Nothing + yield a (Just x) = step a (Just (fromStream x)) + in (runStream (toStream m)) sv blank yield + +-- | Run a streaming composition, discard the results. +runStreaming :: (Monad m, Streaming t) => t m a -> m () +runStreaming m = go (toStream m) + where + go m1 = + let stop = return () + yield _ Nothing = stop + yield _ (Just x) = go x + in (runStream m1) Nothing stop yield + +-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then +-- be read back from the SVar using 'fromSVar'. +toSVar :: (Streaming t, MonadAsync m) => SVar m a -> t m a -> m () +toSVar sv m = toStreamVar sv (toStream m) + +------------------------------------------------------------------------------ +-- Transformation +------------------------------------------------------------------------------ + +-- XXX Get rid of this? +-- | Make a stream asynchronous, triggers the computation and returns a stream +-- in the underlying monad representing the output generated by the original +-- computation. The returned action is exhaustible and must be drained once. If +-- not drained fully we may have a thread blocked forever and once exhausted it +-- will always return 'empty'. + +async :: (Streaming t, MonadAsync m) => t m a -> m (t m a) +async m = do + sv <- newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m) + return $ fromSVar sv + +------------------------------------------------------------------------------ +-- StreamT +------------------------------------------------------------------------------ + +-- | The 'Monad' instance of 'StreamT' runs the /monadic continuation/ for each +-- element of the stream, serially. +-- +-- @ +-- main = 'runStreamT' $ do +-- x <- return 1 \<\> return 2 +-- liftIO $ print x +-- @ +-- @ +-- 1 +-- 2 +-- @ +-- +-- 'StreamT' nests streams serially in a depth first manner. +-- +-- @ +-- main = 'runStreamT' $ do +-- x <- return 1 \<\> return 2 +-- y <- return 3 \<\> return 4 +-- liftIO $ print (x, y) +-- @ +-- @ +-- (1,3) +-- (1,4) +-- (2,3) +-- (2,4) +-- @ +-- +-- This behavior is exactly like a list transformer. We call the monadic code +-- being run for each element of the stream a monadic continuation. In +-- imperative paradigm we can think of this composition as nested @for@ loops +-- and the monadic continuation is the body of the loop. The loop iterates for +-- all elements of the stream. +-- +newtype StreamT m a = StreamT {getStreamT :: Stream m a} + deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow) + +deriving instance MonadAsync m => Alternative (StreamT m) +deriving instance MonadAsync m => MonadPlus (StreamT m) +deriving instance (MonadBase b m, Monad m) => MonadBase b (StreamT m) +deriving instance MonadError e m => MonadError e (StreamT m) +deriving instance MonadReader r m => MonadReader r (StreamT m) +deriving instance MonadState s m => MonadState s (StreamT m) + +instance Streaming StreamT where + toStream = getStreamT + fromStream = StreamT + +-- XXX The Functor/Applicative/Num instances for all the types are exactly the +-- same, how can we reduce this boilerplate (use TH)? We cannot derive them +-- from a single base type because they depend on the Monad instance which is +-- different for each type. + +------------------------------------------------------------------------------ +-- Monad +------------------------------------------------------------------------------ + +instance Monad m => Monad (StreamT m) where + return = pure + (StreamT (Stream m)) >>= f = StreamT $ Stream $ \_ stp yld -> + let run x = (runStream x) Nothing stp yld + yield a Nothing = run $ getStreamT (f a) + yield a (Just r) = run $ getStreamT (f a) + <> getStreamT (StreamT r >>= f) + in m Nothing stp yield + +------------------------------------------------------------------------------ +-- Applicative +------------------------------------------------------------------------------ + +instance Monad m => Applicative (StreamT m) where + pure a = StreamT $ scons a Nothing + (<*>) = ap + +------------------------------------------------------------------------------ +-- Functor +------------------------------------------------------------------------------ + +instance Monad m => Functor (StreamT m) where + fmap f (StreamT (Stream m)) = StreamT $ Stream $ \_ stp yld -> + let yield a Nothing = yld (f a) Nothing + yield a (Just r) = yld (f a) + (Just (getStreamT (fmap f (StreamT r)))) + in m Nothing stp yield + +------------------------------------------------------------------------------ +-- Num +------------------------------------------------------------------------------ + +instance (Monad m, Num a) => Num (StreamT m a) where + fromInteger n = pure (fromInteger n) + + negate = fmap negate + abs = fmap abs + signum = fmap signum + + (+) = liftA2 (+) + (*) = liftA2 (*) + (-) = liftA2 (-) + +instance (Monad m, Fractional a) => Fractional (StreamT m a) where + fromRational n = pure (fromRational n) + + recip = fmap recip + + (/) = liftA2 (/) + +instance (Monad m, Floating a) => Floating (StreamT m a) where + pi = pure pi + + exp = fmap exp + sqrt = fmap sqrt + log = fmap log + sin = fmap sin + tan = fmap tan + cos = fmap cos + asin = fmap asin + atan = fmap atan + acos = fmap acos + sinh = fmap sinh + tanh = fmap tanh + cosh = fmap cosh + asinh = fmap asinh + atanh = fmap atanh + acosh = fmap acosh + + (**) = liftA2 (**) + logBase = liftA2 logBase + +------------------------------------------------------------------------------ +-- InterleavedT +------------------------------------------------------------------------------ + +-- | Like 'StreamT' but different in nesting behavior. It fairly interleaves +-- the iterations of the inner and the outer loop, nesting loops in a breadth +-- first manner. +-- +-- +-- @ +-- main = 'runInterleavedT' $ do +-- x <- return 1 \<\> return 2 +-- y <- return 3 \<\> return 4 +-- liftIO $ print (x, y) +-- @ +-- @ +-- (1,3) +-- (2,3) +-- (1,4) +-- (2,4) +-- @ +-- +newtype InterleavedT m a = InterleavedT {getInterleavedT :: Stream m a} + deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow) + +deriving instance MonadAsync m => Alternative (InterleavedT m) +deriving instance MonadAsync m => MonadPlus (InterleavedT m) +deriving instance (MonadBase b m, Monad m) => MonadBase b (InterleavedT m) +deriving instance MonadError e m => MonadError e (InterleavedT m) +deriving instance MonadReader r m => MonadReader r (InterleavedT m) +deriving instance MonadState s m => MonadState s (InterleavedT m) + +instance Streaming InterleavedT where + toStream = getInterleavedT + fromStream = InterleavedT + +instance Monad m => Monad (InterleavedT m) where + return = pure + (InterleavedT (Stream m)) >>= f = InterleavedT $ Stream $ \_ stp yld -> + let run x = (runStream x) Nothing stp yld + yield a Nothing = run $ getInterleavedT (f a) + yield a (Just r) = run $ getInterleavedT (f a) + `interleave` + getInterleavedT (InterleavedT r >>= f) + in m Nothing stp yield + +------------------------------------------------------------------------------ +-- Applicative +------------------------------------------------------------------------------ + +instance Monad m => Applicative (InterleavedT m) where + pure a = InterleavedT $ scons a Nothing + (<*>) = ap + +------------------------------------------------------------------------------ +-- Functor +------------------------------------------------------------------------------ + +instance Monad m => Functor (InterleavedT m) where + fmap f (InterleavedT (Stream m)) = InterleavedT $ Stream $ \_ stp yld -> + let yield a Nothing = yld (f a) Nothing + yield a (Just r) = + yld (f a) (Just (getInterleavedT (fmap f (InterleavedT r)))) + in m Nothing stp yield + +------------------------------------------------------------------------------ +-- Num +------------------------------------------------------------------------------ + +instance (Monad m, Num a) => Num (InterleavedT m a) where + fromInteger n = pure (fromInteger n) + + negate = fmap negate + abs = fmap abs + signum = fmap signum + + (+) = liftA2 (+) + (*) = liftA2 (*) + (-) = liftA2 (-) + +instance (Monad m, Fractional a) => Fractional (InterleavedT m a) where + fromRational n = pure (fromRational n) + + recip = fmap recip + + (/) = liftA2 (/) + +instance (Monad m, Floating a) => Floating (InterleavedT m a) where + pi = pure pi + + exp = fmap exp + sqrt = fmap sqrt + log = fmap log + sin = fmap sin + tan = fmap tan + cos = fmap cos + asin = fmap asin + atan = fmap atan + acos = fmap acos + sinh = fmap sinh + tanh = fmap tanh + cosh = fmap cosh + asinh = fmap asinh + atanh = fmap atanh + acosh = fmap acosh + + (**) = liftA2 (**) + logBase = liftA2 logBase + +------------------------------------------------------------------------------ +-- AsyncT +------------------------------------------------------------------------------ + +-- | Like 'StreamT' but /may/ run each iteration concurrently using demand +-- driven concurrency. More concurrent iterations are started only if the +-- previous iterations are not able to produce enough output for the consumer. +-- +-- @ +-- import "Streamly" +-- import Control.Concurrent +-- +-- main = 'runAsyncT' $ do +-- n <- return 3 \<\> return 2 \<\> return 1 +-- liftIO $ do +-- threadDelay (n * 1000000) +-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n) +-- @ +-- @ +-- ThreadId 40: Delay 1 +-- ThreadId 39: Delay 2 +-- ThreadId 38: Delay 3 +-- @ +-- +-- All iterations may run in the same thread if they do not block. +newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a} + deriving (Semigroup, Monoid, MonadTrans) + +deriving instance MonadAsync m => Alternative (AsyncT m) +deriving instance MonadAsync m => MonadPlus (AsyncT m) +deriving instance MonadAsync m => MonadIO (AsyncT m) +deriving instance MonadAsync m => MonadThrow (AsyncT m) +deriving instance (MonadBase b m, MonadAsync m) => MonadBase b (AsyncT m) +deriving instance (MonadError e m, MonadAsync m) => MonadError e (AsyncT m) +deriving instance (MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) +deriving instance (MonadState s m, MonadAsync m) => MonadState s (AsyncT m) + +instance Streaming AsyncT where + toStream = getAsyncT + fromStream = AsyncT + +{-# INLINE parbind #-} +parbind + :: (forall c. Stream m c -> Stream m c -> Stream m c) + -> Stream m a + -> (a -> Stream m b) + -> Stream m b +parbind par m f = go m + where + go (Stream g) = + Stream $ \ctx stp yld -> + let run x = (runStream x) ctx stp yld + yield a Nothing = run $ f a + yield a (Just r) = run $ f a `par` (go r) + in g Nothing stp yield + +instance MonadAsync m => Monad (AsyncT m) where + return = pure + (AsyncT m) >>= f = AsyncT $ parbind par m g + where g x = getAsyncT (f x) + par = joinStreamVar2 (SVarStyle Conjunction LIFO) + +------------------------------------------------------------------------------ +-- Applicative +------------------------------------------------------------------------------ + +instance MonadAsync m => Applicative (AsyncT m) where + pure a = AsyncT $ scons a Nothing + (<*>) = ap + +------------------------------------------------------------------------------ +-- Functor +------------------------------------------------------------------------------ + +instance Monad m => Functor (AsyncT m) where + fmap f (AsyncT (Stream m)) = AsyncT $ Stream $ \_ stp yld -> + let yield a Nothing = yld (f a) Nothing + yield a (Just r) = yld (f a) (Just (getAsyncT (fmap f (AsyncT r)))) + in m Nothing stp yield + +------------------------------------------------------------------------------ +-- Num +------------------------------------------------------------------------------ + +instance (MonadAsync m, Num a) => Num (AsyncT m a) where + fromInteger n = pure (fromInteger n) + + negate = fmap negate + abs = fmap abs + signum = fmap signum + + (+) = liftA2 (+) + (*) = liftA2 (*) + (-) = liftA2 (-) + +instance (MonadAsync m, Fractional a) => Fractional (AsyncT m a) where + fromRational n = pure (fromRational n) + + recip = fmap recip + + (/) = liftA2 (/) + +instance (MonadAsync m, Floating a) => Floating (AsyncT m a) where + pi = pure pi + + exp = fmap exp + sqrt = fmap sqrt + log = fmap log + sin = fmap sin + tan = fmap tan + cos = fmap cos + asin = fmap asin + atan = fmap atan + acos = fmap acos + sinh = fmap sinh + tanh = fmap tanh + cosh = fmap cosh + asinh = fmap asinh + atanh = fmap atanh + acosh = fmap acosh + + (**) = liftA2 (**) + logBase = liftA2 logBase + +------------------------------------------------------------------------------ +-- ParallelT +------------------------------------------------------------------------------ + +-- | Like 'StreamT' but runs /all/ iterations fairly concurrently using a round +-- robin scheduling. +-- +-- @ +-- import "Streamly" +-- import Control.Concurrent +-- +-- main = 'runParallelT' $ do +-- n <- return 3 \<\> return 2 \<\> return 1 +-- liftIO $ do +-- threadDelay (n * 1000000) +-- myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n) +-- @ +-- @ +-- ThreadId 40: Delay 1 +-- ThreadId 39: Delay 2 +-- ThreadId 38: Delay 3 +-- @ +-- +-- Unlike 'AsyncT' all iterations are guaranteed to run fairly concurrently, +-- unconditionally. +newtype ParallelT m a = ParallelT {getParallelT :: Stream m a} + deriving (Semigroup, Monoid, MonadTrans) + +deriving instance MonadAsync m => Alternative (ParallelT m) +deriving instance MonadAsync m => MonadPlus (ParallelT m) +deriving instance MonadAsync m => MonadIO (ParallelT m) +deriving instance MonadAsync m => MonadThrow (ParallelT m) +deriving instance (MonadBase b m, MonadAsync m) => MonadBase b (ParallelT m) +deriving instance (MonadError e m, MonadAsync m) => MonadError e (ParallelT m) +deriving instance (MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) +deriving instance (MonadState s m, MonadAsync m) => MonadState s (ParallelT m) + +instance Streaming ParallelT where + toStream = getParallelT + fromStream = ParallelT + +instance MonadAsync m => Monad (ParallelT m) where + return = pure + (ParallelT m) >>= f = ParallelT $ parbind par m g + where g x = getParallelT (f x) + par = joinStreamVar2 (SVarStyle Conjunction FIFO) + +------------------------------------------------------------------------------ +-- Applicative +------------------------------------------------------------------------------ + +instance MonadAsync m => Applicative (ParallelT m) where + pure a = ParallelT $ scons a Nothing + (<*>) = ap + +------------------------------------------------------------------------------ +-- Functor +------------------------------------------------------------------------------ + +instance Monad m => Functor (ParallelT m) where + fmap f (ParallelT (Stream m)) = ParallelT $ Stream $ \_ stp yld -> + let yield a Nothing = yld (f a) Nothing + yield a (Just r) = yld (f a) + (Just (getParallelT (fmap f (ParallelT r)))) + in m Nothing stp yield + +------------------------------------------------------------------------------ +-- Num +------------------------------------------------------------------------------ + +instance (MonadAsync m, Num a) => Num (ParallelT m a) where + fromInteger n = pure (fromInteger n) + + negate = fmap negate + abs = fmap abs + signum = fmap signum + + (+) = liftA2 (+) + (*) = liftA2 (*) + (-) = liftA2 (-) + +instance (MonadAsync m, Fractional a) => Fractional (ParallelT m a) where + fromRational n = pure (fromRational n) + + recip = fmap recip + + (/) = liftA2 (/) + +instance (MonadAsync m, Floating a) => Floating (ParallelT m a) where + pi = pure pi + + exp = fmap exp + sqrt = fmap sqrt + log = fmap log + sin = fmap sin + tan = fmap tan + cos = fmap cos + asin = fmap asin + atan = fmap atan + acos = fmap acos + sinh = fmap sinh + tanh = fmap tanh + cosh = fmap cosh + asinh = fmap asinh + atanh = fmap atanh + acosh = fmap acosh + + (**) = liftA2 (**) + logBase = liftA2 logBase + +------------------------------------------------------------------------------ +-- Serially Zipping Streams +------------------------------------------------------------------------------ + +-- | Zip two streams serially using a pure zipping function. +zipWith :: Streaming t => (a -> b -> c) -> t m a -> t m b -> t m c +zipWith f m1 m2 = fromStream $ go (toStream m1) (toStream m2) + where + go mx my = Stream $ \_ stp yld -> do + let merge a ra = + let yield2 b Nothing = yld (f a b) Nothing + yield2 b (Just rb) = yld (f a b) (Just (go ra rb)) + in (runStream my) Nothing stp yield2 + let yield1 a Nothing = merge a snil + yield1 a (Just ra) = merge a ra + (runStream mx) Nothing stp yield1 + +-- | 'ZipStream' zips serially i.e. it produces one element from each stream +-- serially and then zips the two elements. Note, for convenience we have used +-- the 'zipping' combinator in the following example instead of using a type +-- annotation. +-- +-- @ +-- main = (toList . 'zipping' $ (,) \<$\> s1 \<*\> s2) >>= print +-- where s1 = pure 1 <> pure 2 +-- s2 = pure 3 <> pure 4 +-- @ +-- @ +-- [(1,3),(2,4)] +-- @ +-- +-- This applicative operation can be seen as the zipping equivalent of +-- interleaving with '<=>'. +newtype ZipStream m a = ZipStream {getZipStream :: Stream m a} + deriving (Semigroup, Monoid) + +deriving instance MonadAsync m => Alternative (ZipStream m) + +instance Monad m => Functor (ZipStream m) where + fmap f (ZipStream (Stream m)) = ZipStream $ Stream $ \_ stp yld -> + let yield a Nothing = yld (f a) Nothing + yield a (Just r) = yld (f a) + (Just (getZipStream (fmap f (ZipStream r)))) + in m Nothing stp yield + +instance Monad m => Applicative (ZipStream m) where + pure a = ZipStream $ scons a Nothing + (<*>) = zipWith id + +instance Streaming ZipStream where + toStream = getZipStream + fromStream = ZipStream + +instance (Monad m, Num a) => Num (ZipStream m a) where + fromInteger n = pure (fromInteger n) + + negate = fmap negate + abs = fmap abs + signum = fmap signum + + (+) = liftA2 (+) + (*) = liftA2 (*) + (-) = liftA2 (-) + +instance (Monad m, Fractional a) => Fractional (ZipStream m a) where + fromRational n = pure (fromRational n) + + recip = fmap recip + + (/) = liftA2 (/) + +instance (Monad m, Floating a) => Floating (ZipStream m a) where + pi = pure pi + + exp = fmap exp + sqrt = fmap sqrt + log = fmap log + sin = fmap sin + tan = fmap tan + cos = fmap cos + asin = fmap asin + atan = fmap atan + acos = fmap acos + sinh = fmap sinh + tanh = fmap tanh + cosh = fmap cosh + asinh = fmap asinh + atanh = fmap atanh + acosh = fmap acosh + + (**) = liftA2 (**) + logBase = liftA2 logBase + +------------------------------------------------------------------------------ +-- Parallely Zipping Streams +------------------------------------------------------------------------------ + +-- | Zip two streams asyncly (i.e. both the elements being zipped are generated +-- concurrently) using a pure zipping function. +zipAsyncWith :: (Streaming t, MonadAsync m) + => (a -> b -> c) -> t m a -> t m b -> t m c +zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do + ma <- async m1 + mb <- async m2 + (runStream (toStream (zipWith f ma mb))) Nothing stp yld + +-- | Like 'ZipStream' but zips in parallel, it generates both the elements to +-- be zipped concurrently. +-- +-- @ +-- main = (toList . 'zippingAsync' $ (,) \<$\> s1 \<*\> s2) >>= print +-- where s1 = pure 1 <> pure 2 +-- s2 = pure 3 <> pure 4 +-- @ +-- @ +-- [(1,3),(2,4)] +-- @ +-- +-- This applicative operation can be seen as the zipping equivalent of +-- parallel composition with '<|>'. +newtype ZipAsync m a = ZipAsync {getZipAsync :: Stream m a} + deriving (Semigroup, Monoid) + +deriving instance MonadAsync m => Alternative (ZipAsync m) + +instance Monad m => Functor (ZipAsync m) where + fmap f (ZipAsync (Stream m)) = ZipAsync $ Stream $ \_ stp yld -> + let yield a Nothing = yld (f a) Nothing + yield a (Just r) = yld (f a) + (Just (getZipAsync (fmap f (ZipAsync r)))) + in m Nothing stp yield + +instance MonadAsync m => Applicative (ZipAsync m) where + pure a = ZipAsync $ scons a Nothing + (<*>) = zipAsyncWith id + +instance Streaming ZipAsync where + toStream = getZipAsync + fromStream = ZipAsync + +instance (MonadAsync m, Num a) => Num (ZipAsync m a) where + fromInteger n = pure (fromInteger n) + + negate = fmap negate + abs = fmap abs + signum = fmap signum + + (+) = liftA2 (+) + (*) = liftA2 (*) + (-) = liftA2 (-) + +instance (MonadAsync m, Fractional a) => Fractional (ZipAsync m a) where + fromRational n = pure (fromRational n) + + recip = fmap recip + + (/) = liftA2 (/) + +instance (MonadAsync m, Floating a) => Floating (ZipAsync m a) where + pi = pure pi + + exp = fmap exp + sqrt = fmap sqrt + log = fmap log + sin = fmap sin + tan = fmap tan + cos = fmap cos + asin = fmap asin + atan = fmap atan + acos = fmap acos + sinh = fmap sinh + tanh = fmap tanh + cosh = fmap cosh + asinh = fmap asinh + atanh = fmap atanh + acosh = fmap acosh + + (**) = liftA2 (**) + logBase = liftA2 logBase + +------------------------------------------------------------------------------- +-- Type adapting combinators +------------------------------------------------------------------------------- + +-- | Adapt one streaming type to another. +adapt :: (Streaming t1, Streaming t2) => t1 m a -> t2 m a +adapt = fromStream . toStream + +-- | Interpret an ambiguously typed stream as 'StreamT'. +serially :: StreamT m a -> StreamT m a +serially x = x + +-- | Interpret an ambiguously typed stream as 'InterleavedT'. +interleaving :: InterleavedT m a -> InterleavedT m a +interleaving x = x + +-- | Interpret an ambiguously typed stream as 'AsyncT'. +asyncly :: AsyncT m a -> AsyncT m a +asyncly x = x + +-- | Interpret an ambiguously typed stream as 'ParallelT'. +parallely :: ParallelT m a -> ParallelT m a +parallely x = x + +-- | Interpret an ambiguously typed stream as 'ZipStream'. +zipping :: ZipStream m a -> ZipStream m a +zipping x = x + +-- | Interpret an ambiguously typed stream as 'ZipAsync'. +zippingAsync :: ZipAsync m a -> ZipAsync m a +zippingAsync x = x + +------------------------------------------------------------------------------- +-- Running Streams, convenience functions specialized to types +------------------------------------------------------------------------------- + +-- | Same as @runStreaming . serially@. +runStreamT :: Monad m => StreamT m a -> m () +runStreamT = runStreaming + +-- | Same as @runStreaming . interleaving@. +runInterleavedT :: Monad m => InterleavedT m a -> m () +runInterleavedT = runStreaming + +-- | Same as @runStreaming . asyncly@. +runAsyncT :: Monad m => AsyncT m a -> m () +runAsyncT = runStreaming + +-- | Same as @runStreaming . parallely@. +runParallelT :: Monad m => ParallelT m a -> m () +runParallelT = runStreaming + +-- | Same as @runStreaming . zipping@. +runZipStream :: Monad m => ZipStream m a -> m () +runZipStream = runStreaming + +-- | Same as @runStreaming . zippingAsync@. +runZipAsync :: Monad m => ZipAsync m a -> m () +runZipAsync = runStreaming + +------------------------------------------------------------------------------ +-- Sum Style Composition +------------------------------------------------------------------------------ + +infixr 5 <=> + +-- | Sequential interleaved composition, in contrast to '<>' this operator +-- fairly interleaves two streams instead of appending them; yielding one +-- element from each stream alternately. +-- +-- @ +-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<=\> (return 3 <> return 4)) >>= print +-- @ +-- @ +-- [1,3,2,4] +-- @ +-- +-- This operator corresponds to the 'InterleavedT' style. Unlike '<>', this +-- operator cannot be used to fold infinite containers since that might +-- accumulate too many partially drained streams. To be clear, it can combine +-- infinite streams but not infinite number of streams. +{-# INLINE (<=>) #-} +(<=>) :: Streaming t => t m a -> t m a -> t m a +m1 <=> m2 = fromStream $ interleave (toStream m1) (toStream m2) + +-- | Demand driven concurrent composition. In contrast to '<|>' this operator +-- concurrently "merges" streams in a left biased manner rather than fairly +-- interleaving them. It keeps yielding from the stream on the left as long as +-- it can. If the left stream blocks or cannot keep up with the pace of the +-- consumer it can concurrently yield from the stream on the right in parallel. +-- +-- @ +-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<| (return 3 <> return 4)) >>= print +-- @ +-- @ +-- [1,2,3,4] +-- @ +-- +-- Unlike '<|>' it can be used to fold infinite containers of streams. This +-- operator corresponds to the 'AsyncT' type for product style composition. +-- +{-# INLINE (<|) #-} +(<|) :: (Streaming t, MonadAsync m) => t m a -> t m a -> t m a +m1 <| m2 = fromStream $ parLeft (toStream m1) (toStream m2) + +------------------------------------------------------------------------------ +-- Fold Utilities +------------------------------------------------------------------------------ + +-- $foldutils +-- These utilities are designed to pass the first argument as one of the sum +-- style composition operators (i.e. '<>', '<=>', '<|', '<|>') to conveniently +-- fold a container using any style of stream composition. + +-- | Like the 'Prelude' 'fold' but allows you to specify a binary sum style +-- stream composition operator to fold a container of streams. +-- +-- @foldWith (<>) $ map return [1..3]@ +{-# INLINABLE foldWith #-} +foldWith :: (Streaming t, Foldable f) + => (t m a -> t m a -> t m a) -> f (t m a) -> t m a +foldWith f = foldr f nil + +-- | Like 'foldMap' but allows you to specify a binary sum style composition +-- operator to fold a container of streams. Maps a monadic streaming action on +-- the container before folding it. +-- +-- @foldMapWith (<>) return [1..3]@ +{-# INLINABLE foldMapWith #-} +foldMapWith :: (Streaming t, Foldable f) + => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b +foldMapWith f g = foldr (f . g) nil + +-- | Like 'foldMapWith' but with the last two arguments reversed i.e. the +-- monadic streaming function is the last argument. +{-# INLINABLE forEachWith #-} +forEachWith :: (Streaming t, Foldable f) + => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b +forEachWith f xs g = foldr (f . g) nil xs diff --git a/src/Streamly/Time.hs b/src/Streamly/Time.hs new file mode 100644 index 0000000..8d17897 --- /dev/null +++ b/src/Streamly/Time.hs @@ -0,0 +1,65 @@ +-- | +-- Module : Streamly.Time +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- Time utilities for reactive programming. + +module Streamly.Time + ( periodic + , withClock + ) +where + +import Control.Monad (when) +import Control.Concurrent (threadDelay) + +-- | Run an action forever periodically at the given frequency specified in per +-- second (Hz). +periodic :: Int -> IO () -> IO () +periodic freq action = do + action + threadDelay (1000000 `div` freq) + periodic freq action + +-- | Run a computation on every clock tick, the clock runs at the specified +-- frequency. It allows running a computation at high frequency efficiently by +-- maintaining a local clock and adjusting it with the provided base clock at +-- longer intervals. The first argument is a base clock returning some notion +-- of time in microseconds. The second argument is the frequency in per second +-- (Hz). The third argument is the action to run, the action is provided the +-- local time as an argument. +withClock :: IO Int -> Int -> (Int -> IO ()) -> IO () +withClock clock freq action = do + t <- clock + go t period period t 0 + + where + + period = 1000000 `div` freq + + -- Note that localTime is roughly but not exactly equal to (lastAdj + tick + -- * n). That is because we do not abruptly adjust the clock skew instead + -- we adjust the tick size. + go lastAdj delay tick localTime n = do + action localTime + when (delay > 0) $ threadDelay delay + + if (n == freq) + then do + (t, newTick, newDelay) <- adjustClock lastAdj localTime delay + go t newDelay newTick (localTime + newTick) 0 + else go lastAdj delay tick (localTime + tick) (n + 1) + + -- Adjust the tick size rather than the clock to avoid abrupt changes + -- resulting in jittery behavior at the end of every interval. + adjustClock lastAdj localTime delay = do + baseTime <- clock + let newTick = period + (baseTime - localTime) `div` freq + lastPeriod = (baseTime - lastAdj) `div` freq + newDelay = max 0 (delay + period - lastPeriod) + return (baseTime, newTick, newDelay) diff --git a/src/Streamly/Tutorial.hs b/src/Streamly/Tutorial.hs new file mode 100644 index 0000000..b357b3c --- /dev/null +++ b/src/Streamly/Tutorial.hs @@ -0,0 +1,1042 @@ +{-# OPTIONS_GHC -fno-warn-unused-imports #-} +-- | +-- Module : Streamly.Tutorial +-- Copyright : (c) 2017 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- +-- Streamly, short for stream concurrently, combines the essence of +-- non-determinism, streaming and concurrency in functional programming. +-- Concurrent and non-concurrent applications are almost indistinguisable, +-- concurrency capability does not at all impact the performance of +-- non-concurrent case. +-- Streaming enables writing modular, composable and scalable applications with +-- ease and concurrency allows you to make them scale and perform well. +-- Streamly enables writing concurrent applications without being aware of +-- threads or synchronization. No explicit thread control is needed, where +-- applicable the concurrency rate is automatically controlled based on the +-- demand by the consumer. However, combinators are provided to fine tune the +-- concurrency control. +-- Streaming and concurrency together enable expressing reactive applications +-- conveniently. See "Streamly.Examples" for a simple SDL based FRP example. +-- +-- Streamly streams are very much like the Haskell lists and most of the +-- functions that work on lists have a counterpart that works on streams. +-- However, streamly streams can be generated, consumed or combined +-- concurrently. In this tutorial we will go over the basic concepts and how to +-- use the library. The documentation of @Streamly@ module has more details on +-- core APIs. For more APIs for constructing, folding, filtering, mapping and +-- zipping etc. see the documentation of "Streamly.Prelude" module. For +-- examples and other ways to use the library see the module +-- "Streamly.Examples" as well. + +module Streamly.Tutorial + ( + -- * Streams + -- $streams + + -- ** Generating Streams + -- $generating + + -- ** Eliminating Streams + -- $eliminating + + -- * Combining Streams + -- $combining + + -- ** Semigroup Style + -- $semigroup + + -- *** Serial composition ('<>') + -- $serial + + -- *** Async composition ('<|') + -- $parallel + + -- *** Interleaved composition ('<=>') + -- $interleaved + + -- *** Fair Concurrent composition ('<|>') + -- $fairParallel + + -- *** Custom composition + -- $custom + + -- ** Monoid Style + -- $monoid + + -- * Transforming Streams + -- $transforming + + -- ** Monad + -- $monad + + -- *** Serial Composition ('StreamT') + -- $regularSerial + + -- *** Async Composition ('AsyncT') + -- $concurrentNesting + + -- *** Interleaved Composition ('InterleavedT') + -- $interleavedNesting + + -- *** Fair Concurrent Composition ('ParallelT') + -- $fairlyConcurrentNesting + + -- *** Exercise + -- $monadExercise + + -- ** Applicative + -- $applicative + + -- ** Functor + -- $functor + + -- * Zipping Streams + -- $zipping + + -- ** Serial Zipping + -- $serialzip + + -- ** Parallel Zipping + -- $parallelzip + + -- * Summary of Compositions + -- $compositionSummary + + -- * Concurrent Programming + -- $concurrent + + -- * Reactive Programming + -- $reactive + + -- * Performance + -- $performance + + -- * Interoperation with Streaming Libraries + -- $interop + + -- * Comparison with Existing Packages + -- $comparison + ) +where + +import Streamly +import Streamly.Prelude +import Data.Semigroup +import Control.Applicative +import Control.Monad +import Control.Monad.IO.Class (MonadIO(..)) +import Control.Monad.Trans.Class (MonadTrans (lift)) + +-- $streams +-- +-- Streamly provides many different stream types depending on the desired +-- composition style. The simplest type is 'StreamT'. 'StreamT' is a monad +-- transformer, the type @StreamT m a@ represents a stream of values of type +-- 'a' in some underlying monad 'm'. For example, @StreamT IO Int@ is a stream +-- of 'Int' in 'IO' monad. + +-- $generating +-- +-- Pure values can be placed into the stream type using 'return' or 'pure'. +-- Effects in the IO monad can be lifted to the stream type using the 'liftIO' +-- combinator. In a transformer stack we can lift actions from the lower monad +-- using the 'lift' combinator. Some examples of streams with a single element: +-- +-- @ +-- return 1 :: 'StreamT' IO Int +-- @ +-- @ +-- liftIO $ putStrLn "Hello world!" :: 'StreamT' IO () +-- @ +-- +-- We can combine streams using '<>' to create streams of many elements: +-- +-- @ +-- return 1 <> return 2 <> return 3 :: 'StreamT' IO Int +-- @ +-- +-- For more ways to construct or generate a stream see the module +-- "Streamly.Prelude". + +-- $eliminating +-- +-- 'runStreamT' runs a composed 'StreamT' computation, lowering the type into +-- the underlying monad and discarding the result stream: +-- +-- @ +-- import "Streamly" +-- +-- main = 'runStreamT' $ liftIO $ putStrLn "Hello world!" +-- @ +-- +-- 'toList' runs a stream computation and collects the result stream in a list +-- in the underlying monad. 'toList' is a polymorphic function that works on +-- multiple stream types belonging to the class 'Streaming'. Therefore, before +-- you run a stream you need to tell how you want to interpret the stream by +-- using one of the stream type combinators ('serially', 'asyncly', 'parallely' +-- etc.). The combinator 'serially' is equivalent to annotating the type as @:: +-- StreamT@. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = do +-- xs \<- 'toList' $ 'serially' $ return 1 <> return 2 +-- print xs +-- @ +-- +-- For other ways to eliminate or fold a stream see the module +-- "Streamly.Prelude". + +-- $semigroup +-- Streams of the same type can be combined into a composite stream in many +-- different ways using one of the semigroup style binary composition operators +-- i.e. '<>', '<=>', '<|', '<|>', 'mplus'. These operators work on all stream +-- types ('StreamT', 'AsyncT' etc.) uniformly. +-- +-- To illustrate the concurrent aspects, we will use the following @delay@ +-- function to introduce a delay specified in seconds. +-- +-- @ +-- import "Streamly" +-- import Control.Concurrent +-- +-- delay n = liftIO $ do +-- threadDelay (n * 1000000) +-- tid \<- myThreadId +-- putStrLn (show tid ++ ": Delay " ++ show n) +-- @ + +-- $serial +-- +-- We have already seen, the '<>' operator. It composes two streams in series +-- i.e. the first stream is completely exhausted and then the second stream is +-- processed. The following example prints the sequence 3, 2, 1 and takes a +-- total of 6 seconds because everything is serial: +-- +-- @ +-- main = 'runStreamT' $ delay 3 <> delay 2 <> delay 1 +-- @ +-- @ +-- ThreadId 36: Delay 3 +-- ThreadId 36: Delay 2 +-- ThreadId 36: Delay 1 +-- @ + +-- $interleaved +-- The '<=>' operator is serial like '<>' but it interleaves the two streams +-- i.e. it yields one element from the first stream and then one element from +-- the second stream, and so on. The following example prints the sequence 1, +-- 3, 2, 4 and takes a total of 10 seconds because everything is serial: +-- +-- @ +-- main = 'runStreamT' $ (delay 1 <> delay 2) '<=>' (delay 3 <> delay 4) +-- @ +-- @ +-- ThreadId 36: Delay 1 +-- ThreadId 36: Delay 3 +-- ThreadId 36: Delay 2 +-- ThreadId 36: Delay 4 +-- @ +-- +-- Note that this operator cannot be used to fold infinite containers since it +-- requires preserving the state until a stream is finished. To be clear, it +-- can combine infinite streams but not infinite number of streams. + +-- $parallel +-- +-- The '<|' operator can run both computations concurrently, /when needed/. +-- In the following example since the first computation blocks we start the +-- next one in a separate thread and so on: +-- +-- @ +-- main = 'runStreamT' $ delay 3 '<|' delay 2 '<|' delay 1 +-- @ +-- @ +-- ThreadId 42: Delay 1 +-- ThreadId 41: Delay 2 +-- ThreadId 40: Delay 3 +-- @ +-- +-- This is the concurrent version of the '<>' operator. The computations are +-- triggered in the same order as '<>' except that they are concurrent. When +-- we have a tree of computations composed using this operator, the tree is +-- traversed in DFS style just like '<>'. +-- +-- @ +-- main = 'runStreamT' $ (p 1 '<|' p 2) '<|' (p 3 '<|' p 4) +-- where p = liftIO . print +-- @ +-- @ +-- 1 +-- 2 +-- 3 +-- 4 +-- @ +-- +-- Concurrency provided by this operator is demand driven. The second +-- computation is run concurrently with the first only if the first computation +-- is not producing enough output to keep the stream consumer busy otherwise +-- the second computation is run serially after the previous one. The number of +-- concurrent threads is adapted dynamically based on the pull rate of the +-- consumer of the stream. +-- As you can see, in the following example the computations are run in a +-- single thread one after another, because none of them blocks. However, if +-- the thread consuming the stream were faster than the producer then it would +-- have started parallel threads for each computation to keep up even if none +-- of them blocks: +-- +-- @ +-- main = 'runStreamT' $ traced (sqrt 9) '<|' traced (sqrt 16) '<|' traced (sqrt 25) +-- @ +-- @ +-- ThreadId 40 +-- ThreadId 40 +-- ThreadId 40 +-- @ +-- +-- Since the concurrency provided by this operator is demand driven it cannot +-- be used when the composed computations have timers that are relative to each +-- other because all computations may not be started at the same time and +-- therefore timers in all of them may not start at the same time. When +-- relative timing among all computations is important or when we need to start +-- all computations at once for some reason '<|>' must be used instead. +-- However, '<|' is useful in situations when we want to optimally utilize the +-- resources and we know that the computations can run in parallel but we do +-- not care if they actually run in parallel or not, that decision is left to +-- the scheduler. Also, note that this operator can be used to fold infinite +-- containers in contrast to '<|>', because it does not require us to run all +-- of them at the same time. +-- +-- The left bias (or the DFS style) of the operator '<|' is suggested by its +-- shape. You can also think of this as an unbalanced version of the fairly +-- parallel operator '<|>'. + +-- $fairParallel +-- +-- The 'Alternative' composition operator '<|>', like '<|', runs the composed +-- computations concurrently. However, unlike '<|' it runs all of the +-- computations in fairly parallel manner using a round robin scheduling +-- mechanism. This can be considered as the concurrent version of the fairly +-- interleaved serial operation '<=>'. Note that this cannot be used on +-- infinite containers, as it will lead to an infinite sized scheduling queue. +-- +-- The following example sends a query to three search engines in parallel and +-- prints the name of the search engine as a response arrives: +-- +-- @ +-- import "Streamly" +-- import Network.HTTP.Simple +-- +-- main = 'runStreamT' $ google \<|> bing \<|> duckduckgo +-- where +-- google = get "https://www.google.com/search?q=haskell" +-- bing = get "https://www.bing.com/search?q=haskell" +-- duckduckgo = get "https://www.duckduckgo.com/?q=haskell" +-- get s = liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s)) +-- @ + +-- $custom +-- +-- The 'async' API can be used to create references to asynchronously running +-- stream computations. We can then use 'uncons' to explore the streams +-- arbitrarily and then recompose individual elements to create a new stream. +-- This way we can dynamically decide which stream to explore at any given +-- time. Take an example of a merge sort of two sorted streams. We need to +-- keep consuming items from the stream which has the lowest item in the sort +-- order. This can be achieved using async references to streams. See +-- "Streamly.Examples.MergeSortedStreams". + +-- $monoid +-- +-- Each of the semigroup compositions described has an identity that can be +-- used to fold a possibly empty container. An empty stream is represented by +-- 'nil' which can be represented in various standard forms as 'mempty', +-- 'empty' or 'mzero'. +-- Some fold utilities are also provided by the library for convenience: +-- +-- * 'foldWith' folds a 'Foldable' container of stream computations using the +-- given composition operator. +-- * 'foldMapWith' folds like foldWith but also maps a function before folding. +-- * 'forEachWith' is like foldMapwith but the container argument comes before +-- the function argument. +-- * The 'each' primitive from "Streamly.Prelude" folds a 'Foldable' container +-- using the '<>' operator: +-- +-- All of the following are equivalent: +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = do +-- 'toList' . 'serially' $ 'foldWith' (<>) (map return [1..10]) >>= print +-- 'toList' . 'serially' $ 'foldMapWith' (<>) return [1..10] >>= print +-- 'toList' . 'serially' $ 'forEachWith' (<>) [1..10] return >>= print +-- 'toList' . 'serially' $ 'each' [1..10] >>= print +-- @ + +-- $transforming +-- +-- The previous section discussed ways to merge the elements of two streams +-- without doing any transformation on them. In this section we will explore +-- how to transform streams using 'Functor', 'Applicative' or 'Monad' style +-- compositions. The applicative and monad composition of all 'Streaming' types +-- behave exactly the same way as a list transformer. For simplicity of +-- illustration we are using streams of pure values in the following examples. +-- However, the real application of streams arises when these streams are +-- generated using monadic actions. + +-- $monad +-- +-- In functional programmer's parlance the 'Monad' instance of 'Streaming' +-- types implement non-determinism, exploring all possible combination of +-- choices from both the streams. From an imperative programmer's point of view +-- it behaves like nested loops i.e. for each element in the first stream and +-- for each element in the second stream apply the body of the loop. If you are +-- familiar with list transformer this behavior is exactly the same as that of +-- a list transformer. +-- +-- Just like we saw in sum style compositions earlier, monadic composition also +-- has multiple variants each of which exactly corresponds to one of the sum +-- style composition variant. + +-- $regularSerial +-- +-- When we interpret the monadic composition as 'StreamT' we get a standard +-- list transformer like serial composition. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = 'runStreamT' $ do +-- x <- 'each' [3,2,1] +-- delay x +-- @ +-- @ +-- ThreadId 30: Delay 3 +-- ThreadId 30: Delay 2 +-- ThreadId 30: Delay 1 +-- @ +-- +-- As you can see the code after the @each@ statement is run three times, once +-- for each value of @x@. All the three iterations are serial and run in the +-- same thread one after another. When compared to imperative programming, this +-- can also be viewed as a @for@ loop with three iterations. +-- +-- A console echo loop copying standard input to standard output can simply be +-- written like this: +-- +-- @ +-- import "Streamly" +-- import Data.Semigroup (cycle1) +-- +-- main = 'runStreamT' $ cycle1 (liftIO getLine) >>= liftIO . putStrLn +-- @ +-- +-- When multiple streams are composed using this style they nest in a DFS +-- manner i.e. nested iterations of an iteration are executed before we proceed +-- to the next iteration at higher level. This behaves just like nested @for@ +-- loops in imperative programming. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = 'runStreamT' $ do +-- x <- 'each' [1,2] +-- y <- 'each' [3,4] +-- liftIO $ putStrLn $ show (x, y) +-- @ +-- @ +-- (1,3) +-- (1,4) +-- (2,3) +-- (2,4) +-- @ +-- +-- You will also notice that this is the monadic equivalent of the sum style +-- composition using '<>'. + +-- $concurrentNesting +-- +-- When we interpret the monadic composition as 'AsyncT' we get a /concurrent/ +-- list-transformer like composition. Multiple monadic continuations (or loop +-- iterations) may be started concurrently. Concurrency is demand driven +-- i.e. more concurrent iterations are started only if the previous iterations +-- are not able to produce enough output for the consumer of the output stream. +-- This is the concurrent version of 'StreamT'. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = 'runAsyncT' $ do +-- x <- 'each' [3,2,1] +-- delay x +-- @ +-- @ +-- ThreadId 40: Delay 1 +-- ThreadId 39: Delay 2 +-- ThreadId 38: Delay 3 +-- @ +-- +-- As you can see the code after the @each@ statement is run three times, once +-- for each value of @x@. All the three iterations are concurrent and run in +-- different threads. The iteration with least delay finishes first. When +-- compared to imperative programming, this can be viewed as a @for@ loop +-- with three concurrent iterations. +-- +-- Concurrency is demand driven just as in the case of '<|'. When multiple +-- streams are composed using this style the iterations are triggered in a DFS +-- manner just like 'StreamT' i.e. nested iterations are executed before we +-- proceed to the next iteration at higher level. However, unlike 'StreamT' +-- more than one iterations may be started concurrently, and based on the +-- demand from the consumer. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = 'runAsyncT' $ do +-- x <- 'each' [1,2] +-- y <- 'each' [3,4] +-- liftIO $ putStrLn $ show (x, y) +-- @ +-- @ +-- (1,3) +-- (1,4) +-- (2,3) +-- (2,4) +-- @ +-- +-- You will notice that this is the monadic equivalent of the '<|' style +-- sum composition. The same caveats apply to this as the '<|' operation. + +-- $interleavedNesting +-- +-- When we interpret the monadic composition as 'InterleavedT' we get a serial +-- but fairly interleaved list-transformer like composition. The monadic +-- continuations or iterations of the outer loop are fairly interleaved with +-- the continuations or iterations of the inner loop. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = 'runInterleavedT' $ do +-- x <- 'each' [1,2] +-- y <- 'each' [3,4] +-- liftIO $ putStrLn $ show (x, y) +-- @ +-- @ +-- (1,3) +-- (2,3) +-- (1,4) +-- (2,4) +-- @ +-- +-- You will notice that this is the monadic equivalent of the '<=>' style +-- sum composition. The same caveats apply to this as the '<=>' operation. + +-- $fairlyConcurrentNesting +-- +-- When we interpret the monadic composition as 'ParallelT' we get a +-- /concurrent/ list-transformer like composition just like 'AsyncT'. The +-- difference is that this is fully parallel with all iterations starting +-- concurrently instead of the demand driven concurrency of 'AsyncT'. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- main = 'runParallelT' $ do +-- x <- 'each' [3,2,1] +-- delay x +-- @ +-- @ +-- ThreadId 40: Delay 1 +-- ThreadId 39: Delay 2 +-- ThreadId 38: Delay 3 +-- @ +-- +-- You will notice that this is the monadic equivalent of the '<|>' style +-- sum composition. The same caveats apply to this as the '<|>' operation. + +-- $monadExercise +-- +-- The streamly code is usually written in a way that is agnostic of the +-- specific monadic composition type. We use a polymorphic type with a +-- 'Streaming' type class constraint. When running the stream we can choose the +-- specific mode of composition. For example look at the following code. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- +-- +-- composed :: 'Streaming' t => t m a +-- composed = do +-- sz <- sizes +-- cl <- colors +-- sh <- shapes +-- liftIO $ putStrLn $ show (sz, cl, sh) +-- +-- where +-- +-- sizes = 'each' [1, 2, 3] +-- colors = 'each' ["red", "green", "blue"] +-- shapes = 'each' ["triangle", "square", "circle"] +-- @ +-- +-- Now we can interpret this in whatever way we want: +-- +-- @ +-- main = 'runStreamT' composed +-- main = 'runAsyncT' composed +-- main = 'runInterleavedT' composed +-- main = 'runParallelT' composed +-- @ +-- +-- Equivalently, we can also write it using the type adapter combinators, like +-- this: +-- +-- @ +-- main = 'runStreaming' $ 'serially' $ composed +-- main = 'runStreaming' $ 'asyncly' $ composed +-- main = 'runStreaming' $ 'interleaving' $ composed +-- main = 'runStreaming' $ 'parallely' $ composed +-- @ +-- +-- As an exercise try to figure out the output of this code for each mode of +-- composition. + +-- $functor +-- +-- 'fmap' transforms a stream by mapping a function on all elements of the +-- stream. The functor instance of each stream type defines 'fmap' to be +-- precisely the same as 'liftM', and therefore 'fmap' is always serial +-- irrespective of the type. For concurrent mapping, alternative versions of +-- 'fmap', namely, 'asyncMap' and 'parMap' are provided. +-- +-- @ +-- import "Streamly" +-- +-- main = ('toList' $ 'serially' $ fmap show $ 'each' [1..10]) >>= print +-- @ +-- +-- Also see the 'mapM' and 'sequence' functions for mapping actions, in the +-- "Streamly.Prelude" module. + +-- $applicative +-- +-- Applicative is precisely the same as the 'ap' operation of 'Monad'. For +-- zipping and parallel applicatives separate types 'ZipStream' and 'ZipAsync' +-- are provided. +-- +-- The following example runs all iterations serially and takes a total 17 +-- seconds (1 + 3 + 4 + 2 + 3 + 4): +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- import Control.Concurrent +-- +-- s1 = d 1 <> d 2 +-- s2 = d 3 <> d 4 +-- d n = delay n >> return n +-- +-- main = ('toList' . 'serially' $ (,) \<$> s1 \<*> s2) >>= print +-- @ +-- @ +-- ThreadId 36: Delay 1 +-- ThreadId 36: Delay 3 +-- ThreadId 36: Delay 4 +-- ThreadId 36: Delay 2 +-- ThreadId 36: Delay 3 +-- ThreadId 36: Delay 4 +-- [(1,3),(1,4),(2,3),(2,4)] +-- @ +-- +-- Similalrly interleaving runs the iterations in an interleaved order but +-- since it is serial it takes a total of 17 seconds: +-- +-- @ +-- main = ('toList' . 'interleaving' $ (,) \<$> s1 \<*> s2) >>= print +-- @ +-- @ +-- ThreadId 36: Delay 1 +-- ThreadId 36: Delay 3 +-- ThreadId 36: Delay 2 +-- ThreadId 36: Delay 3 +-- ThreadId 36: Delay 4 +-- ThreadId 36: Delay 4 +-- [(1,3),(2,3),(1,4),(2,4)] +-- @ +-- +-- 'AsyncT' can run the iterations concurrently and therefore takes a total +-- of 10 seconds (1 + 2 + 3 + 4): +-- +-- @ +-- main = ('toList' . 'asyncly' $ (,) \<$> s1 \<*> s2) >>= print +-- @ +-- @ +-- ThreadId 34: Delay 1 +-- ThreadId 36: Delay 2 +-- ThreadId 35: Delay 3 +-- ThreadId 36: Delay 3 +-- ThreadId 35: Delay 4 +-- ThreadId 36: Delay 4 +-- [(1,3),(2,3),(1,4),(2,4)] +-- @ +-- +-- Similalrly 'ParallelT' as well can run the iterations concurrently and +-- therefore takes a total of 10 seconds (1 + 2 + 3 + 4): +-- +-- @ +-- main = ('toList' . 'parallely' $ (,) \<$> s1 \<*> s2) >>= print +-- @ +-- @ +-- ThreadId 34: Delay 1 +-- ThreadId 36: Delay 2 +-- ThreadId 35: Delay 3 +-- ThreadId 36: Delay 3 +-- ThreadId 35: Delay 4 +-- ThreadId 36: Delay 4 +-- [(1,3),(2,3),(1,4),(2,4)] +-- @ + +-- $compositionSummary +-- +-- The following table summarizes the types for monadic compositions and the +-- operators for sum style compositions. This table captures the essence of +-- streamly. +-- +-- @ +-- +-----+--------------+------------+ +-- | | Serial | Concurrent | +-- +=====+==============+============+ +-- | DFS | 'StreamT' | 'AsyncT' | +-- | +--------------+------------+ +-- | | '<>' | '<|' | +-- +-----+--------------+------------+ +-- | BFS | 'InterleavedT' | 'ParallelT' | +-- | +--------------+------------+ +-- | | '<=>' | '<|>' | +-- +-----+--------------+------------+ +-- @ + +-- $zipping +-- +-- Zipping is a special transformation where the corresponding elements of two +-- streams are combined together using a zip function producing a new stream of +-- outputs. Two different types are provided for serial and concurrent zipping. +-- These types provide an applicative instance that zips the argument streams. +-- Also see the zipping function in the "Streamly.Prelude" module. + +-- $serialzip +-- +-- 'ZipStream' zips streams serially: +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- import Control.Concurrent +-- +-- d n = delay n >> return n +-- s1 = 'adapt' . 'serially' $ d 1 <> d 2 +-- s2 = 'adapt' . 'serially' $ d 3 <> d 4 +-- +-- main = ('toList' . 'zipping' $ (,) \<$> s1 \<*> s2) >>= print +-- @ +-- +-- This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since +-- everything runs serially: +-- +-- @ +-- ThreadId 29: Delay 1 +-- ThreadId 29: Delay 3 +-- ThreadId 29: Delay 2 +-- ThreadId 29: Delay 4 +-- [(1,3),(2,4)] +-- @ + +-- $parallelzip +-- +-- 'ZipAsync' zips streams concurrently: +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- import Control.Concurrent +-- import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) +-- +-- d n = delay n >> return n +-- s1 = 'adapt' . 'serially' $ d 1 <> d 2 +-- s2 = 'adapt' . 'serially' $ d 3 <> d 4 +-- +-- main = do +-- liftIO $ hSetBuffering stdout LineBuffering +-- ('toList' . 'zippingAsync' $ (,) \<$> s1 \<*> s2) >>= print +-- @ +-- +-- This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 +-- are produced concurrently, and 2 and 4 are produced concurrently: +-- +-- @ +-- ThreadId 32: Delay 1 +-- ThreadId 32: Delay 2 +-- ThreadId 33: Delay 3 +-- ThreadId 33: Delay 4 +-- [(1,3),(2,4)] +-- @ + +-- $concurrent +-- +-- When writing concurrent programs there are two distinct places where the +-- programmer chooses the type of concurrency. First, when /generating/ a +-- stream by combining other streams we can use one of the sum style operators +-- to combine them concurrently or serially. Second, when /processing/ a stream +-- in a monadic composition we can choose one of the monad composition types to +-- choose the desired type of concurrency. +-- +-- In the following example the squares of @x@ and @y@ are computed +-- concurrently using the '<|' operator and the square roots of their sum are +-- also computed concurrently by using the 'asyncly' combinator. We can choose +-- different combinators e.g. '<>' and 'serially', to control the concurrency. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" (toList) +-- import Data.List (sum) +-- +-- main = do +-- z \<- 'toList' +-- $ 'asyncly' -- Concurrent monadic processing (sqrt below) +-- $ do +-- x2 \<- 'forEachWith' ('<|') [1..100] $ -- Concurrent @"for"@ loop +-- \\x -> return $ x * x -- body of the loop +-- y2 \<- 'forEachWith' ('<|') [1..100] $ +-- \\y -> return $ y * y +-- return $ sqrt (x2 + y2) +-- print $ sum z +-- @ +-- +-- You can see how this directly maps to the imperative style +-- <https://en.wikipedia.org/wiki/OpenMP OpenMP> model, we use combinators +-- and operators instead of the ugly pragmas. +-- +-- For more concurrent programming examples see, +-- "Streamly.Examples.ListDirRecursive", "Streamly.Examples.MergeSortedStreams" +-- and "Streamly.Examples.SearchEngineQuery". + +-- $reactive +-- +-- Reactive programming is nothing but concurrent streaming which is what +-- streamly is all about. With streamly we can generate streams of events, +-- merge streams that are generated concurrently and process events +-- concurrently. We can do all this without any knowledge about the specifics +-- of the implementation of concurrency. In the following example you will see +-- that the code is just regular Haskell code without much streamly APIs used +-- (active hyperlinks are the streamly APIs) and yet it is a reactive +-- application. +-- +-- +-- This application has two independent and concurrent sources of event +-- streams, @acidRain@ and @userAction@. @acidRain@ continuously generates +-- events that deteriorate the health of the game character. @userAction@ can +-- be "potion" or "quit". When the user types "potion" the health improves and +-- the game continues. +-- +-- @ +-- {-\# LANGUAGE FlexibleContexts #-} +-- +-- import "Streamly" +-- import Control.Concurrent (threadDelay) +-- import Control.Monad (when) +-- import Control.Monad.State +-- import Data.Semigroup (cycle1) +-- +-- data Event = Harm Int | Heal Int | Quit deriving (Show) +-- +-- userAction :: MonadIO m => 'StreamT' m Event +-- userAction = cycle1 $ liftIO askUser +-- where +-- askUser = do +-- command <- getLine +-- case command of +-- "potion" -> return (Heal 10) +-- "quit" -> return Quit +-- _ -> putStrLn "What?" >> askUser +-- +-- acidRain :: MonadIO m => 'StreamT' m Event +-- acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1) +-- +-- game :: ('MonadAsync' m, MonadState Int m) => 'StreamT' m () +-- game = do +-- event \<- userAction \<|> acidRain +-- case event of +-- Harm n -> modify $ \\h -> h - n +-- Heal n -> modify $ \\h -> h + n +-- Quit -> fail "quit" +-- +-- h <- get +-- when (h <= 0) $ fail "You die!" +-- liftIO $ putStrLn $ "Health = " ++ show h +-- +-- main = do +-- putStrLn "Your health is deteriorating due to acid rain,\\ +-- \\ type \\"potion\\" or \\"quit\\"" +-- _ <- runStateT ('runStreamT' game) 60 +-- return () +-- @ +-- +-- You can also find the source of this example in +-- "Streamly.Examples.AcidRainGame". It has been adapted from Gabriel's +-- <https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html pipes-concurrency> +-- package. +-- This is much simpler compared to the pipes version because of the builtin +-- concurrency in streamly. You can also find a SDL based reactive programming +-- example adapted from Yampa in "Streamly.Examples.CirclingSquare". + +-- $performance +-- +-- Streamly is highly optimized for performance, it is designed for serious +-- high performing, concurrent and scalable applications. We have created the +-- <https://hackage.haskell.org/package/streaming-benchmarks streaming-benchmarks> +-- package which is specifically and carefully designed to measure the +-- performance of Haskell streaming libraries fairly and squarely in the right +-- way. Streamly performs at par or even better than most streaming libraries +-- for common operations even though it needs to deal with the concurrency +-- capability. + +-- $interop +-- +-- We can use @unfoldr@ and @uncons@ to convert one streaming type to another. +-- We will assume the following common code to be available in the examples +-- demonstrated below. +-- +-- @ +-- import "Streamly" +-- import "Streamly.Prelude" +-- import System.IO (stdin) +-- +-- -- Adapt uncons to return an Either instead of Maybe +-- unconsE s = 'uncons' s >>= maybe (return $ Left ()) (return . Right) +-- stdinLn = 'serially' $ 'fromHandle' stdin +-- @ +-- +-- Interop with @pipes@: +-- +-- @ +-- import qualified Pipes as P +-- import qualified Pipes.Prelude as P +-- +-- main = do +-- -- streamly to pipe +-- P.runEffect $ P.for (P.unfoldr unconsE stdinLn) (lift . putStrLn) +-- +-- -- pipe to streamly +-- -- Adapt P.next to return a Maybe instead of Either +-- let nextM p = P.next p >>= either (\\_ -> return Nothing) (return . Just) +-- 'runStreamT' $ 'unfoldrM' nextM P.stdinLn >>= lift . putStrLn +-- @ +-- +-- Interop with @streaming@: +-- +-- @ +-- import qualified Streaming as S +-- import qualified Streaming.Prelude as S +-- +-- main = do +-- -- streamly to streaming +-- S.stdoutLn $ S.unfoldr unconsE stdinLn +-- +-- -- streaming to streamly +-- 'runStreamT' $ unfoldrM S.uncons S.stdinLn >>= lift . putStrLn +-- +-- @ +-- +-- Interop with @conduit@: +-- +-- @ +-- import qualified Data.Conduit as C +-- import qualified Data.Conduit.List as C +-- import qualified Data.Conduit.Combinators as C +-- +-- -- streamly to conduit +-- main = (C.unfoldM 'uncons' stdinLn) C.$$ C.print +-- @ + +-- $comparison +-- +-- Streamly unifies non-determinism, streaming, concurrency and FRP +-- functionality that is otherwise covered by several disparate packages, and +-- it does that with a surprisingly concise API. Here is a list of popular and +-- well-known packages in all these areas: +-- +-- @ +-- +-----------------+----------------+ +-- | Non-determinism | <https://hackage.haskell.org/package/list-t list-t> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/logict logict> | +-- +-----------------+----------------+ +-- | Streaming | <https://hackage.haskell.org/package/streaming streaming> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/conduit conduit> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/pipes pipes> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/simple-conduit simple-conduit> | +-- +-----------------+----------------+ +-- | Concurrency | <https://hackage.haskell.org/package/async async> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/transient transient> | +-- +-----------------+----------------+ +-- | FRP | <https://hackage.haskell.org/package/Yampa Yampa> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/dunai dunai> | +-- | +----------------+ +-- | | <https://hackage.haskell.org/package/reflex reflex> | +-- +-----------------+----------------+ +-- @ +-- +-- Streamly covers all the functionality provided by both the non-determinism +-- packages listed above and provides better performance in comparison to +-- those. In fact, at the core streamly is a list transformer but it naturally +-- integrates the concurrency dimension to the basic list transformer +-- functionality. +-- +-- When it comes to streaming, in terms of core concepts, @simple-conduit@ is +-- the package that is closest to streamly if we set aside the concurrency +-- dimension, both are streaming packages with list transformer like monad +-- composition. However, in terms of API it is more like the @streaming@ +-- package. Streamly can be used to achieve more or less the functionality +-- provided by any of the streaming packages listed above. The types and API of +-- streamly are much simpler in comparison to conduit and pipes. It is more or +-- less like the standard Haskell list APIs. +-- +-- When it comes to concurrency, streamly can do everything that the @async@ +-- package can do and more. async provides applicative concurrency whereas +-- streamly provides both applicative and monadic concurrency. The 'ZipAsync' +-- type behaves like the applicative instance of async. This work was +-- originally inspired by the concurrency implementation in @transient@ though +-- it has no resemblence with that. Streamly provides concurrency as transient +-- does but in a sort of dual manner, it can lazily stream the output. In +-- comparison to transient streamly has a first class streaming interface and +-- is a monad transformer that can be used universally in any Haskell monad +-- transformer stack. +-- +-- The non-determinism, concurrency and streaming combination make streamly a +-- strong FRP capable library as well. FRP is fundamentally stream of events +-- that can be processed concurrently. The example in this tutorial as well as +-- the "Streamly.Examples.CirclingSquare" example from Yampa demonstrate the +-- basic FRP capability of streamly. In core concepts streamly is strikingly +-- similar to @dunai@. dunai was designed from a FRP perspective and streamly +-- wa original designed from a concurrency perspective. However, both have +-- similarity at the core. diff --git a/stack-7.10.yaml b/stack-7.10.yaml new file mode 100644 index 0000000..b8e2d6a --- /dev/null +++ b/stack-7.10.yaml @@ -0,0 +1,16 @@ +resolver: lts-6.35 +packages: +- '.' +extra-deps: + - lockfree-queue-0.2.3.1 + - simple-conduit-0.4.0 + - transient-0.5.9.2 + - http-conduit-2.2.2 + - http-client-0.5.0 + - http-client-tls-0.3.0 + - SDL-0.6.5.1 +flags: {} +extra-package-dbs: [] +# For mac ports installed SDL library on Mac OS X +#extra-include-dirs: +#- /opt/local/include diff --git a/stack-8.0.yaml b/stack-8.0.yaml new file mode 100644 index 0000000..df4e5e1 --- /dev/null +++ b/stack-8.0.yaml @@ -0,0 +1,17 @@ +resolver: lts-7.24 +packages: +- '.' +extra-deps: + - lockfree-queue-0.2.3.1 + - simple-conduit-0.6.0 + - transient-0.4.4 + - monad-recorder-0.1.0 + - http-conduit-2.2.2 + - http-client-0.5.0 + - http-client-tls-0.3.0 + - SDL-0.6.5.1 +flags: {} +extra-package-dbs: [] +# For mac ports installed SDL library on Mac OS X +#extra-include-dirs: +#- /opt/local/include diff --git a/stack.yaml b/stack.yaml new file mode 100644 index 0000000..c184a3e --- /dev/null +++ b/stack.yaml @@ -0,0 +1,14 @@ +#resolver: lts-9.2 +resolver: nightly-2017-09-07 +packages: +- '.' +extra-deps: + - lockfree-queue-0.2.3.1 + - simple-conduit-0.6.0 + - SDL-0.6.5.1 +flags: {} +extra-package-dbs: [] +rebuild-ghc-options: true +# For mac ports installed SDL library on Mac OS X +#extra-include-dirs: +#- /opt/local/include diff --git a/streamly.cabal b/streamly.cabal new file mode 100644 index 0000000..70eda1a --- /dev/null +++ b/streamly.cabal @@ -0,0 +1,234 @@ +name: streamly +version: 0.1.0 +synopsis: Beautiful Streaming, Concurrent and Reactive Composition +description: + Streamly is a monad transformer unifying non-determinism + (<https://hackage.haskell.org/package/list-t list-t>\/<https://hackage.haskell.org/package/logict logict>), + concurrency (<https://hackage.haskell.org/package/async async>), + streaming (<https://hackage.haskell.org/package/conduit conduit>\/<https://hackage.haskell.org/package/pipes pipes>), + and FRP (<https://hackage.haskell.org/package/Yampa Yampa>\/<https://hackage.haskell.org/package/reflex reflex>) + functionality in a concise and intuitive API. + High level concurrency makes concurrent applications almost indistinguishable + from non-concurrent ones. By changing a single combinator you can control + whether the code runs serially or concurrently. It naturally integrates + concurrency with streaming rather than adding it as an afterthought. + Moreover, it interworks with the popular streaming libraries. + . + See the README for an overview and the haddock documentation for full + reference. It is recommended to read the comprehensive tutorial module + "Streamly.Tutorial" first. Also see "Streamly.Examples" for some working + examples. + +homepage: http://github.com/harendra-kumar/streamly +bug-reports: https://github.com/harendra-kumar/streamly/issues +license: BSD3 +license-file: LICENSE +tested-with: GHC==7.10.3, GHC==8.0.2, GHC==8.2.1 +author: Harendra Kumar +maintainer: harendra.kumar@gmail.com +copyright: 2017 Harendra Kumar +category: Control, Concurrency, Streaming, Reactivity +stability: Experimental +build-type: Simple +cabal-version: >= 1.10 + +extra-source-files: + Changelog.md + README.md + stack-7.10.yaml + stack-8.0.yaml + stack.yaml + +source-repository head + type: git + location: https://github.com/harendra-kumar/streamly + +flag dev + description: Build development version + manual: True + default: False + +flag extra-benchmarks + description: Include comparative benchmarks + manual: True + default: False + +flag examples + description: Build examples + manual: True + default: False + +flag examples-sdl + description: Include examples that use SDL dependency + manual: True + default: False + +library + hs-source-dirs: src + other-modules: Streamly.Core + , Streamly.Streams + + exposed-modules: Streamly.Prelude + , Streamly.Time + , Streamly.Tutorial + , Streamly + + if flag(examples) || flag(examples-sdl) + exposed-modules: Streamly.Examples + , Streamly.Examples.SearchEngineQuery + , Streamly.Examples.ListDirRecursive + , Streamly.Examples.MergeSortedStreams + , Streamly.Examples.AcidRainGame + + if flag(examples-sdl) + exposed-modules: Streamly.Examples.CirclingSquare + + default-language: Haskell2010 + ghc-options: -Wall + + if flag(dev) + ghc-options: -Wmissed-specialisations + -Wall-missed-specialisations + -fno-ignore-asserts + if impl(ghc >= 8.0) + ghc-options: -Wcompat + -Wunrecognised-warning-flags + -Widentities + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wredundant-constraints + -Wnoncanonical-monad-instances + -Wnoncanonical-monadfail-instances + if flag(examples-sdl) + cpp-options: -DEXAMPLES_SDL + + build-depends: base >= 4.8 && < 5 + , atomic-primops >= 0.8 && < 0.9 + , containers >= 0.5 && < 0.6 + , exceptions >= 0.8 && < 0.9 + , lifted-base >= 0.2 && < 0.3 + , lockfree-queue >= 0.2.3 && < 0.3 + , monad-control >= 1.0 && < 2 + , mtl >= 2.2 && < 3 + , stm >= 2.4.3 && < 2.5 + , transformers >= 0.4 && < 0.6 + , transformers-base >= 0.4 && < 0.5 + + if impl(ghc < 8.0) + build-depends: + semigroups >= 0.18 && < 0.19 + + if flag(examples) || flag(examples-sdl) + build-Depends: + http-conduit >= 2.2.2 && < 2.3 + , path-io >= 0.1.0 && < 1.4 + , random >= 1.0.0 && < 1.2 + + if flag(examples-sdl) + build-Depends: + SDL >= 0.6.5 && < 0.7 + +test-suite test + type: exitcode-stdio-1.0 + main-is: Main.hs + hs-source-dirs: test + ghc-options: -O0 -Wall + if flag(dev) + ghc-options: -Wmissed-specialisations + -Wall-missed-specialisations + if impl(ghc >= 8.0) + ghc-options: -Wcompat + -Wunrecognised-warning-flags + -Widentities + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wredundant-constraints + -Wnoncanonical-monad-instances + -Wnoncanonical-monadfail-instances + build-depends: + streamly + , base >= 4.8 && < 5 + , hspec >= 2.0 && < 3 + , containers >= 0.5 && < 0.6 + if impl(ghc < 8.0) + build-depends: + transformers >= 0.4 && < 0.6 + default-language: Haskell2010 + +benchmark bench + type: exitcode-stdio-1.0 + main-is: Main.hs + hs-source-dirs: benchmark + ghc-options: -O2 -Wall + if flag(dev) + ghc-options: -Wmissed-specialisations + -Wall-missed-specialisations + -fno-ignore-asserts + if impl(ghc >= 8.0) + ghc-options: -Wcompat + -Wunrecognised-warning-flags + -Widentities + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wredundant-constraints + -Wnoncanonical-monad-instances + -Wnoncanonical-monadfail-instances + build-depends: + streamly + , atomic-primops >= 0.8 && < 0.9 + , base >= 4.8 && < 5 + , criterion >= 1 && < 2 + , mtl >= 2.2 && < 3 + + if impl(ghc < 8.0) + build-depends: + transformers >= 0.4 && < 0.6 + + if flag(extra-benchmarks) + cpp-options: -DEXTRA_BENCHMARKS + build-depends: + list-t >= 0.4 && < 2 + , logict >= 0.6 && < 0.7 + , machines >= 0.5 && < 0.7 + , simple-conduit >= 0.6 && < 0.7 + , transient >= 0.4 && < 0.6 + default-language: Haskell2010 + +------------------------------------------------------------------------------- +-- Examples +------------------------------------------------------------------------------- + +executable loops + main-is: loops.hs + hs-source-dirs: examples + if flag(examples) + buildable: True + build-Depends: + streamly + , base >= 4.8 && < 5 + else + buildable: False + +executable nested-loops + main-is: nested-loops.hs + hs-source-dirs: examples + if flag(examples) + buildable: True + build-Depends: + streamly + , base >= 4.8 && < 5 + , random >= 1.0.0 && < 1.2 + else + buildable: False + +executable parallel-loops + main-is: parallel-loops.hs + hs-source-dirs: examples + if flag(examples) + buildable: True + build-Depends: + streamly + , base >= 4.8 && < 5 + , random >= 1.0.0 && < 1.2 + else + buildable: False diff --git a/test/Main.hs b/test/Main.hs new file mode 100644 index 0000000..198e6d5 --- /dev/null +++ b/test/Main.hs @@ -0,0 +1,618 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} + +module Main (main) where + +import Control.Concurrent (threadDelay) +import Data.Foldable (forM_) +import Data.List (sort) +import Test.Hspec + +import Streamly +import qualified Streamly.Prelude as A + +toListSerial :: StreamT IO a -> IO [a] +toListSerial = A.toList . serially + +toListInterleaved :: InterleavedT IO a -> IO [a] +toListInterleaved = A.toList . interleaving + +toListAsync :: AsyncT IO a -> IO [a] +toListAsync = A.toList . asyncly + +toListParallel :: ParallelT IO a -> IO [a] +toListParallel = A.toList . parallely + +main :: IO () +main = hspec $ do + describe "Runners" $ do + it "simple serially" $ + (runStreaming . serially) (return (0 :: Int)) `shouldReturn` () + it "simple serially with IO" $ + (runStreaming . serially) (liftIO $ putStrLn "hello") `shouldReturn` () + it "Captures a return value using toList" $ + toListSerial (return 0) `shouldReturn` ([0] :: [Int]) + + describe "Empty" $ do + it "Monoid - mempty" $ + (toListSerial mempty) `shouldReturn` ([] :: [Int]) + it "Alternative - empty" $ + (toListSerial empty) `shouldReturn` ([] :: [Int]) + it "MonadPlus - mzero" $ + (toListSerial mzero) `shouldReturn` ([] :: [Int]) + + --------------------------------------------------------------------------- + -- Functor + --------------------------------------------------------------------------- + + describe "Functor (fmap)" $ do + it "Simple fmap" $ + (toListSerial $ fmap (+1) (return 1)) `shouldReturn` ([2] :: [Int]) + it "fmap on composed (<>)" $ + (toListSerial $ fmap (+1) (return 1 <> return 2)) + `shouldReturn` ([2,3] :: [Int]) + it "fmap on composed (<|>)" $ + (toListSerial $ fmap (+1) (return 1 <|> return 2)) + `shouldReturn` ([2,3] :: [Int]) + + --------------------------------------------------------------------------- + -- Applicative + --------------------------------------------------------------------------- + + describe "Applicative" $ do + it "Simple apply" $ + (toListSerial $ (,) <$> (return 1) <*> (return 2)) + `shouldReturn` ([(1,2)] :: [(Int, Int)]) + + it "Apply - serial composed first argument" $ + (toListSerial $ (,) <$> (return 1 <> return 2) <*> (return 3)) + `shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)]) + + it "Apply - serial composed second argument" $ + (toListSerial $ (,) <$> (return 1) <*> (return 2 <> return 3)) + `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)]) + + it "Apply - parallel composed first argument" $ + (toListSerial $ (,) <$> (return 1 <|> return 2) <*> (return 3)) + `shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)]) + + it "Apply - parallel composed second argument" $ + (toListSerial $ (,) <$> (return 1) <*> (return 2 <|> return 3)) + `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)]) + + --------------------------------------------------------------------------- + -- Binds + --------------------------------------------------------------------------- + + describe "Bind then" thenBind + describe "Pure bind serial" $ pureBind toListSerial + describe "Pure bind serial interleaved" $ pureBind toListInterleaved + describe "Pure bind parallel DFS" $ pureBind toListAsync + describe "Pure bind parallel BFS" $ pureBind toListParallel + + describe "Bind (>>=) with empty" $ bindEmpty toListSerial + describe "Bind (>->) with empty" $ bindEmpty toListInterleaved + describe "Bind (>|>) with empty" $ bindEmpty toListAsync + describe "Bind (>>|) with empty" $ bindEmpty toListParallel + + --------------------------------------------------------------------------- + -- Monoidal Compositions + --------------------------------------------------------------------------- + + describe "Serial Composition (<>)" $ compose (<>) id + describe "Serial Composition (mappend)" $ compose mappend id + describe "Interleaved Composition (<>)" $ compose (<=>) sort + describe "Left biased parallel Composition (<|)" $ compose (<|) sort + describe "Fair parallel Composition (<|>)" $ compose (<|>) sort + describe "Fair parallel Composition (mplus)" $ compose mplus sort + + --------------------------------------------------------------------------- + -- Monoidal Composition ordering checks + --------------------------------------------------------------------------- + + describe "Serial interleaved ordering check (<=>)" $ interleaveCheck (<=>) + describe "Parallel interleaved ordering check (<|>)" $ interleaveCheck (<|>) + describe "Left biased parallel time order check" $ parallelCheck (<|) + describe "Fair parallel time order check" $ parallelCheck (<|>) + + --------------------------------------------------------------------------- + -- TBD Monoidal composition combinations + --------------------------------------------------------------------------- + + -- TBD need more such combinations to be tested. + describe "<> and <>" $ composeAndComposeSimple (<>) (<>) (cycle [[1 .. 9]]) + + describe "<> and <=>" $ composeAndComposeSimple + (<>) + (<=>) + ([ [1 .. 9] + , [1 .. 9] + , [1, 3, 2, 4, 6, 5, 7, 9, 8] + , [1, 3, 2, 4, 6, 5, 7, 9, 8] + ]) + + describe "<=> and <=>" $ composeAndComposeSimple + (<=>) + (<=>) + ([ [1, 4, 2, 7, 3, 5, 8, 6, 9] + , [1, 7, 4, 8, 2, 9, 5, 3, 6] + , [1, 4, 3, 7, 2, 6, 9, 5, 8] + , [1, 7, 4, 9, 3, 8, 6, 2, 5] + ]) + + describe "<=> and <>" $ composeAndComposeSimple + (<=>) + (<>) + ([ [1, 4, 2, 7, 3, 5, 8, 6, 9] + , [1, 7, 4, 8, 2, 9, 5, 3, 6] + , [1, 4, 2, 7, 3, 5, 8, 6, 9] + , [1, 7, 4, 8, 2, 9, 5, 3, 6] + ]) + + describe "Nested parallel and serial compositions" $ do + {- + -- This is not correct, the result can also be [4,4,8,0,8,0,2,2] + -- because of parallelism of [8,0] and [8,0]. + it "Nest <|>, <>, <|> (1)" $ + let t = timed + in toListSerial ( + ((t 8 <|> t 4) <> (t 2 <|> t 0)) + <|> ((t 8 <|> t 4) <> (t 2 <|> t 0))) + `shouldReturn` ([4,4,8,8,0,0,2,2]) + -} + it "Nest <|>, <>, <|> (2)" $ + let t = timed + in toListSerial ( + ((t 4 <|> t 8) <> (t 1 <|> t 2)) + <|> ((t 4 <|> t 8) <> (t 1 <|> t 2))) + `shouldReturn` ([4,4,8,8,1,1,2,2]) + -- FIXME: These two keep failing intermittently on Mac OS X + -- Need to examine and fix the tests. + {- + it "Nest <|>, <=>, <|> (1)" $ + let t = timed + in toListSerial ( + ((t 8 <|> t 4) <=> (t 2 <|> t 0)) + <|> ((t 9 <|> t 4) <=> (t 2 <|> t 0))) + `shouldReturn` ([4,4,0,0,8,2,9,2]) + it "Nest <|>, <=>, <|> (2)" $ + let t = timed + in toListSerial ( + ((t 4 <|> t 8) <=> (t 1 <|> t 2)) + <|> ((t 4 <|> t 9) <=> (t 1 <|> t 2))) + `shouldReturn` ([4,4,1,1,8,2,9,2]) + -} + it "Nest <|>, <|>, <|>" $ + let t = timed + in toListSerial ( + ((t 4 <|> t 8) <|> (t 0 <|> t 2)) + <|> ((t 4 <|> t 8) <|> (t 0 <|> t 2))) + `shouldReturn` ([0,0,2,2,4,4,8,8]) + + --------------------------------------------------------------------------- + -- Monoidal composition recursion loops + --------------------------------------------------------------------------- + + describe "Serial loops (<>)" $ loops (<>) id reverse + describe "Left biased parallel loops (<|)" $ loops (<|) sort sort + describe "Fair parallel loops (<|>)" $ loops (<|>) sort sort + + --------------------------------------------------------------------------- + -- Bind and monoidal composition combinations + --------------------------------------------------------------------------- + + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + describe "Bind and compose" $ bindAndComposeSimple toListSerial g + + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + describe "Bind and compose" $ bindAndComposeSimple toListInterleaved g + + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + describe "Bind and compose" $ bindAndComposeSimple toListAsync g + + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + describe "Bind and compose" $ bindAndComposeSimple toListParallel g + + let fldr f = foldr f empty + fldl f = foldl f empty + in do + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ + bindAndComposeHierarchy toListSerial (k g) + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ + bindAndComposeHierarchy toListInterleaved (k g) + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ + bindAndComposeHierarchy toListAsync (k g) + forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ + bindAndComposeHierarchy toListParallel (k g) + + -- Nest two lists using different styles of product compositions + it "Nests two streams using monadic serial composition" nestTwoSerial + it "Nests two streams using monadic interleaved composition" nestTwoInterleaved + it "Nests two streams using monadic async composition" nestTwoAsync + it "Nests two streams using monadic parallel composition" nestTwoParallel + + it "Nests two streams using applicative serial composition" nestTwoSerialApp + it "Nests two streams using applicative interleaved composition" nestTwoInterleavedApp + it "Nests two streams using applicative async composition" nestTwoAsyncApp + it "Nests two streams using applicative parallel composition" nestTwoParallelApp + + it "Nests two streams using Num serial composition" nestTwoSerialNum + it "Nests two streams using Num interleaved composition" nestTwoInterleavedNum + it "Nests two streams using Num async composition" nestTwoAsyncNum + it "Nests two streams using Num parallel composition" nestTwoParallelNum + + --------------------------------------------------------------------------- + -- TBD Bind and Bind combinations + --------------------------------------------------------------------------- + + -- TBD combine all binds and all compose in one example + describe "Miscellaneous combined examples" mixedOps + + describe "Transformation" $ transformOps (<>) + describe "Serial zipping" $ + zipOps A.zipWith A.zipWithM zipping + describe "Async zipping" $ + zipOps A.zipAsyncWith A.zipAsyncWithM zippingAsync + +nestTwoSerial :: Expectation +nestTwoSerial = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListSerial (do + x <- s1 + y <- s2 + return (x + y) + ) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + +nestTwoSerialApp :: Expectation +nestTwoSerialApp = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListSerial ((+) <$> s1 <*> s2) + `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + +nestTwoSerialNum :: Expectation +nestTwoSerialNum = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListSerial (s1 + s2) + `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + +nestTwoInterleaved :: Expectation +nestTwoInterleaved = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListInterleaved (do + x <- s1 + y <- s2 + return (x + y) + ) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + +nestTwoInterleavedApp :: Expectation +nestTwoInterleavedApp = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListInterleaved ((+) <$> s1 <*> s2) + `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + +nestTwoInterleavedNum :: Expectation +nestTwoInterleavedNum = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListInterleaved (s1 + s2) + `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + +nestTwoAsync :: Expectation +nestTwoAsync = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListAsync (do + x <- s1 + y <- s2 + return (x + y) + ) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + +nestTwoAsyncApp :: Expectation +nestTwoAsyncApp = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListAsync ((+) <$> s1 <*> s2) + `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + +nestTwoAsyncNum :: Expectation +nestTwoAsyncNum = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListAsync (s1 + s2) + `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + +nestTwoParallel :: Expectation +nestTwoParallel = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListParallel (do + x <- s1 + y <- s2 + return (x + y) + ) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + +nestTwoParallelApp :: Expectation +nestTwoParallelApp = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListParallel ((+) <$> s1 <*> s2) + `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + +nestTwoParallelNum :: Expectation +nestTwoParallelNum = + let s1 = foldMapWith (<>) return [1..4] + s2 = foldMapWith (<>) return [5..8] + in toListParallel (s1 + s2) + `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + +zipOps :: (Streaming t, Applicative (t IO)) + => (forall a b c. (a -> b -> c) + -> StreamT IO a -> StreamT IO b -> StreamT IO c) + -> (forall a b c. (a -> b -> StreamT IO c) + -> StreamT IO a -> StreamT IO b -> StreamT IO c) + -> (forall a. t IO a -> t IO a) + -> Spec +zipOps z zM app = do + it "zipWith" $ + let s1 = foldMapWith (<>) return [1..10] + s2 = foldMapWith (<>) return [1..] + in toListSerial (z (+) s1 s2) + `shouldReturn` ([2,4..20] :: [Int]) + + it "zipWithM" $ + let s1 = foldMapWith (<>) return [1..10] + s2 = foldMapWith (<>) return [1..] + in toListSerial (zM (\a b -> return (a + b)) s1 s2) + `shouldReturn` ([2,4..20] :: [Int]) + + it "Applicative zip" $ + let s1 = adapt $ serially $ foldMapWith (<>) return [1..10] + s2 = adapt $ serially $ foldMapWith (<>) return [1..] + in (A.toList . app) ((+) <$> s1 <*> s2) + `shouldReturn` ([2,4..20] :: [Int]) + +timed :: Int -> StreamT IO Int +timed x = liftIO (threadDelay (x * 100000)) >> return x + +thenBind :: Spec +thenBind = do + it "Simple runStreaming and 'then' with IO" $ + (runStreaming . serially) (liftIO (putStrLn "hello") >> liftIO (putStrLn "world")) + `shouldReturn` () + it "Then and toList" $ + toListSerial (return (1 :: Int) >> return 2) `shouldReturn` ([2] :: [Int]) + +type ToListType s = (forall a. s IO a -> IO [a]) +pureBind :: Monad (s IO) => ToListType s -> Spec +pureBind l = do + it "Bind and toList" $ + l (return 1 `f` \x -> return 2 `f` \y -> return (x + y)) + `shouldReturn` ([3] :: [Int]) + where f = (>>=) + +bindEmpty :: (Monad (s IO), Alternative (s IO)) => ToListType s -> Spec +bindEmpty l = it "Binds with empty" $ + (l (return (1 :: Int) `f` \_ -> empty `f` \_ -> return 2)) + `shouldReturn` ([] :: [Int]) + where f = (>>=) + +interleaveCheck + :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) + -> Spec +interleaveCheck f = + it "Interleave four" $ + toListSerial ((return 0 <> return 1) `f` (return 100 <> return 101)) + `shouldReturn` ([0, 100, 1, 101]) + +parallelCheck :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) -> Spec +parallelCheck f = do + it "Parallel ordering left associated" $ + toListSerial (((event 4 `f` event 3) `f` event 2) `f` event 1) + `shouldReturn` ([1..4]) + + it "Parallel ordering right associated" $ + toListSerial (event 4 `f` (event 3 `f` (event 2 `f` event 1))) + `shouldReturn` ([1..4]) + + where event n = (liftIO $ threadDelay (n * 100000)) >> (return n) + +compose + :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) + -> ([Int] -> [Int]) + -> Spec +compose f srt = do + it "Compose mempty, mempty" $ + (tl (mempty `f` mempty)) `shouldReturn` [] + it "Compose empty, empty" $ + (tl (empty `f` empty)) `shouldReturn` [] + it "Compose empty at the beginning" $ + (tl $ (empty `f` return 1)) `shouldReturn` [1] + it "Compose empty at the end" $ + (tl $ (return 1 `f` empty)) `shouldReturn` [1] + it "Compose two" $ + (tl (return 0 `f` return 1) >>= return . srt) + `shouldReturn` [0, 1] + it "Compose three - empty in the middle" $ + ((tl $ (return 0 `f` empty `f` return 1)) >>= return . srt) + `shouldReturn` [0, 1] + it "Compose left associated" $ + ((tl $ (((return 0 `f` return 1) `f` return 2) `f` return 3)) + >>= return . srt) `shouldReturn` [0, 1, 2, 3] + it "Compose right associated" $ + ((tl $ (return 0 `f` (return 1 `f` (return 2 `f` return 3)))) + >>= return . srt) `shouldReturn` [0, 1, 2, 3] + it "Compose many" $ + ((tl $ forEachWith f [1..100] return) >>= return . srt) + `shouldReturn` [1..100] + it "Compose hierarchical (multiple levels)" $ + ((tl $ (((return 0 `f` return 1) `f` (return 2 `f` return 3)) + `f` ((return 4 `f` return 5) `f` (return 6 `f` return 7))) + ) >>= return . srt) `shouldReturn` [0..7] + where tl = toListSerial + +composeAndComposeSimple + :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) + -> (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) + -> [[Int]] + -> Spec +composeAndComposeSimple f g answer = do + it "Compose right associated outer expr, right folded inner" $ + let fold = foldMapWith g return + in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9]))) + `shouldReturn` (answer !! 0) + + it "Compose left associated outer expr, right folded inner" $ + let fold = foldMapWith g return + in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9])) + `shouldReturn` (answer !! 1) + + it "Compose right associated outer expr, left folded inner" $ + let fold xs = foldl g empty $ map return xs + in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9]))) + `shouldReturn` (answer !! 2) + + it "Compose left associated outer expr, left folded inner" $ + let fold xs = foldl g empty $ map return xs + in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9])) + `shouldReturn` (answer !! 3) + + +loops + :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) + -> ([Int] -> [Int]) + -> ([Int] -> [Int]) + -> Spec +loops f tsrt hsrt = do + it "Tail recursive loop" $ (toListSerial (loopTail 0) >>= return . tsrt) + `shouldReturn` [0..3] + + it "Head recursive loop" $ (toListSerial (loopHead 0) >>= return . hsrt) + `shouldReturn` [0..3] + + where + loopHead x = do + -- this print line is important for the test (causes a bind) + liftIO $ putStrLn "LoopHead..." + (if x < 3 then loopHead (x + 1) else empty) `f` return x + + loopTail x = do + -- this print line is important for the test (causes a bind) + liftIO $ putStrLn "LoopTail..." + return x `f` (if x < 3 then loopTail (x + 1) else empty) + +bindAndComposeSimple + :: (Streaming t, Alternative (t IO), Monad (t IO)) + => (forall a. t IO a -> IO [a]) + -> (t IO Int -> t IO Int -> t IO Int) + -> Spec +bindAndComposeSimple tl g = do + it "Compose many (right fold) with bind" $ + (tl (forEachWith g [1..10 :: Int] $ \x -> return x `f` (return . id)) + >>= return . sort) `shouldReturn` [1..10] + + it "Compose many (left fold) with bind" $ + let forL xs k = foldl g empty $ map k xs + in (tl (forL [1..10 :: Int] $ \x -> return x `f` (return . id)) + >>= return . sort) `shouldReturn` [1..10] + where f = (>>=) + +bindAndComposeHierarchy + :: Monad (s IO) => (forall a. s IO a -> IO [a]) + -> ([s IO Int] -> s IO Int) + -> Spec +bindAndComposeHierarchy tl g = do + it "Bind and compose nested" $ + (tl bindComposeNested >>= return . sort) + `shouldReturn` (sort ( + [12, 18] + ++ replicate 3 13 + ++ replicate 3 17 + ++ replicate 6 14 + ++ replicate 6 16 + ++ replicate 7 15) :: [Int]) + + where + + -- bindComposeNested :: AsyncT IO Int + bindComposeNested = + let c1 = tripleCompose (return 1) (return 2) (return 3) + c2 = tripleCompose (return 4) (return 5) (return 6) + c3 = tripleCompose (return 7) (return 8) (return 9) + b = tripleBind c1 c2 c3 +-- it seems to be causing a huge space leak in hspec so disabling this for now +-- c = tripleCompose b b b +-- m = tripleBind c c c +-- in m + in b + + tripleCompose a b c = g [a, b, c] + tripleBind mx my mz = + mx `f` \x -> my + `f` \y -> mz + `f` \z -> return (x + y + z) + f = (>>=) + +mixedOps :: Spec +mixedOps = do + it "Compose many ops" $ + (toListSerial composeMixed >>= return . sort) + `shouldReturn` ([8,9,9,9,9,9,10,10,10,10,10,10,10,10,10,10,11,11 + ,11,11,11,11,11,11,11,11,12,12,12,12,12,13 + ] :: [Int]) + where + + composeMixed :: StreamT IO Int + composeMixed = do + liftIO $ return () + liftIO $ putStr "" + x <- return 1 + y <- return 2 + z <- do + x1 <- return 1 <|> return 2 + liftIO $ return () + liftIO $ putStr "" + y1 <- return 1 <| return 2 + z1 <- do + x11 <- return 1 <> return 2 + y11 <- return 1 <| return 2 + z11 <- return 1 <=> return 2 + liftIO $ return () + liftIO $ putStr "" + return (x11 + y11 + z11) + return (x1 + y1 + z1) + return (x + y + z) + +transformOps :: (StreamT IO Int -> StreamT IO Int -> StreamT IO Int) -> Spec +transformOps f = do + it "take all" $ + (toListSerial $ A.take 10 $ foldMapWith f return [1..10]) + `shouldReturn` [1..10] + it "take none" $ + (toListSerial $ A.take 0 $ foldMapWith f return [1..10]) + `shouldReturn` [] + it "take 5" $ + (toListSerial $ A.take 5 $ foldMapWith f return [1..10]) + `shouldReturn` [1..5] + + it "drop all" $ + (toListSerial $ A.drop 10 $ foldMapWith f return [1..10]) + `shouldReturn` [] + it "drop none" $ + (toListSerial $ A.drop 0 $ foldMapWith f return [1..10]) + `shouldReturn` [1..10] + it "drop 5" $ + (toListSerial $ A.drop 5 $ foldMapWith f return [1..10]) + `shouldReturn` [6..10] |