summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorickLaupa <>2018-12-05 20:38:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-12-05 20:38:00 (GMT)
commit58cdc4bb2c60c61d103cf26865576c65b54eece2 (patch)
tree1bd067519ce8bd3706a8313c11392a9ef77afc7b
parent1328e9e7d1763003871141677f72019d3779a1c7 (diff)
version 1.2.0HEAD1.2.0master
-rwxr-xr-x[-rw-r--r--]CHANGELOG.markdown10
-rw-r--r--Database/EventStore.hs244
-rw-r--r--Database/EventStore/Internal/Control.hs6
-rw-r--r--Database/EventStore/Internal/Operation.hs16
-rw-r--r--Database/EventStore/Internal/Operation/Catchup.hs126
-rw-r--r--Database/EventStore/Internal/Operation/DeleteStream.hs2
-rw-r--r--Database/EventStore/Internal/Operation/Read/Common.hs135
-rw-r--r--Database/EventStore/Internal/Operation/ReadAllEvents.hs6
-rw-r--r--Database/EventStore/Internal/Operation/ReadEvent.hs10
-rw-r--r--Database/EventStore/Internal/Operation/ReadEvent/Message.hs4
-rw-r--r--Database/EventStore/Internal/Operation/ReadStreamEvents.hs10
-rw-r--r--Database/EventStore/Internal/Operation/StreamMetadata.hs4
-rw-r--r--Database/EventStore/Internal/Operation/Transaction.hs6
-rw-r--r--Database/EventStore/Internal/Operation/Volatile.hs7
-rw-r--r--Database/EventStore/Internal/Operation/WriteEvents.hs2
-rw-r--r--Database/EventStore/Internal/Stream.hs34
-rw-r--r--Database/EventStore/Internal/Subscription/Api.hs23
-rw-r--r--Database/EventStore/Internal/Subscription/Catchup.hs101
-rw-r--r--Database/EventStore/Internal/Subscription/Persistent.hs11
-rw-r--r--Database/EventStore/Internal/Subscription/Regular.hs24
-rw-r--r--Database/EventStore/Internal/Types.hs43
-rw-r--r--Database/EventStore/Streaming.hs164
-rwxr-xr-x[-rw-r--r--]README.md8
-rw-r--r--eventstore.cabal22
-rw-r--r--tests/Test/Integration/Tests.hs83
25 files changed, 630 insertions, 471 deletions
diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown
index 37527c6..ac12a59 100644..100755
--- a/CHANGELOG.markdown
+++ b/CHANGELOG.markdown
@@ -1,3 +1,13 @@
+1.2.0
+-----
+* Introduce a type-safe `EventNumber` setting.
+* Introduce a type-safe `ResolveLink` setting.
+* Support GHC 8.6 but drop GHC < 8.
+* Refactor `Slice` api.
+* Implement a stream-processing interface.
+* typeful stream id representation.
+* Uniform batch-read and subscription interface.
+
1.1.6
-----
* Update package metadata information.
diff --git a/Database/EventStore.hs b/Database/EventStore.hs
index 55cddc4..d43d301 100644
--- a/Database/EventStore.hs
+++ b/Database/EventStore.hs
@@ -51,15 +51,22 @@ module Database.EventStore
, withJsonAndMetadata
, withBinary
, withBinaryAndMetadata
+ -- * Event number
+ , EventNumber
+ , streamStart
+ , streamEnd
+ , eventNumber
+ , rawEventNumber
+ , eventNumberToInt64
-- * Common Operation types
, OperationMaxAttemptReached(..)
-- * Read Operations
, StreamMetadataResult(..)
+ , BatchResult(..)
+ , ResolveLink(..)
, readEvent
- , readAllEventsBackward
- , readAllEventsForward
- , readStreamEventsBackward
- , readStreamEventsForward
+ , readEventsBackward
+ , readEventsForward
, getStreamMetadata
-- * Write Operations
, StreamACL(..)
@@ -110,6 +117,7 @@ module Database.EventStore
-- * Subscription
, SubscriptionClosed(..)
, SubscriptionId
+ , SubscriptionStream(..)
, Subscription
, SubDropReason(..)
, SubDetails
@@ -120,19 +128,15 @@ module Database.EventStore
, nextEventMaybeSTM
, getSubscriptionDetailsSTM
, unsubscribe
- , subscriptionStream
-- * Volatile Subscription
, RegularSubscription
, subscribe
- , subscribeToAll
, getSubscriptionId
- , isSubscribedToAll
, nextEvent
, nextEventMaybe
-- * Catch-up Subscription
, CatchupSubscription
, subscribeFrom
- , subscribeToAllFrom
, waitTillCatchup
, hasCaughtUp
, hasCaughtUpSTM
@@ -155,19 +159,24 @@ module Database.EventStore
, connectToPersistentSubscription
-- * Results
, Slice(..)
+ , sliceEvents
+ , sliceEOS
+ , sliceNext
+ , emptySlice
, AllSlice
, Op.DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, Op.ReadEvent(..)
- , StreamType(..)
, StreamSlice
, Position(..)
, ReadDirection(..)
, ResolvedEvent(..)
, OperationError(..)
+ , StreamId(..)
, StreamName(..)
+ , isAllStream
, isEventResolvedLink
, resolvedEventOriginal
, resolvedEventDataAsJson
@@ -315,7 +324,7 @@ sendEvents :: Connection
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts cred = do
p <- newPromise
- let op = Op.writeEvents _settings (streamNameRaw evt_stream) exp_ver cred evts
+ let op = Op.writeEvents _settings (streamIdRaw evt_stream) exp_ver cred evts
publishWith _exec (SubmitOperation p op)
async (retrieve p)
@@ -329,7 +338,7 @@ deleteStream :: Connection
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del cred = do
p <- newPromise
- let op = Op.deleteStream _settings (streamNameRaw evt_stream) exp_ver hard_del cred
+ let op = Op.deleteStream _settings (streamIdRaw evt_stream) exp_ver hard_del cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
@@ -363,12 +372,12 @@ startTransaction :: Connection
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver cred = do
p <- newPromise
- let op = Op.transactionStart _settings (streamNameRaw evt_stream) exp_ver cred
+ let op = Op.transactionStart _settings (streamIdRaw evt_stream) exp_ver cred
publishWith _exec (SubmitOperation p op)
async $ do
tid <- retrieve p
return Transaction
- { _tStream = streamNameRaw evt_stream
+ { _tStream = streamIdRaw evt_stream
, _tTransId = TransactionId tid
, _tExpVer = exp_ver
, _tConn = conn
@@ -409,152 +418,119 @@ transactionRollback _ = return ()
-- | Reads a single event from given stream.
readEvent :: Connection
-> StreamName
- -> Int32 -- ^ Event number
- -> Bool -- ^ Resolve Link Tos
+ -> EventNumber
+ -> ResolveLink
-> Maybe Credentials
- -> IO (Async (ReadResult 'RegularStream Op.ReadEvent))
-readEvent Connection{..} stream_id evt_num res_link_tos cred = do
+ -> IO (Async (ReadResult EventNumber Op.ReadEvent))
+readEvent Connection{..} stream_id evtNum resLinkTos cred = do
p <- newPromise
- let op = Op.readEvent _settings (streamNameRaw stream_id) evt_num res_link_tos cred
+ let evt_num = eventNumberToInt64 evtNum
+ res_link_tos = resolveLinkToBool resLinkTos
+ op = Op.readEvent _settings (streamIdRaw stream_id) evt_num res_link_tos cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
--------------------------------------------------------------------------------
--- | Reads events from a given stream forward.
-readStreamEventsForward :: Connection
- -> StreamName
- -> Int64 -- ^ From event number
- -> Int32 -- ^ Batch size
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Credentials
- -> IO (Async (ReadResult 'RegularStream StreamSlice))
-readStreamEventsForward mgr =
- readStreamEventsCommon mgr Forward
-
---------------------------------------------------------------------------------
--- | Reads events from a given stream backward.
-readStreamEventsBackward :: Connection
- -> StreamName
- -> Int64 -- ^ From event number
- -> Int32 -- ^ Batch size
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Credentials
- -> IO (Async (ReadResult 'RegularStream StreamSlice))
-readStreamEventsBackward mgr =
- readStreamEventsCommon mgr Backward
-
---------------------------------------------------------------------------------
-readStreamEventsCommon :: Connection
- -> ReadDirection
- -> StreamName
- -> Int64
- -> Int32
- -> Bool
- -> Maybe Credentials
- -> IO (Async (ReadResult 'RegularStream StreamSlice))
-readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos cred = do
- p <- newPromise
- let name = streamNameRaw stream_id
- op = Op.readStreamEvents _settings dir name start cnt res_link_tos cred
- publishWith _exec (SubmitOperation p op)
- async (retrieve p)
+-- | When batch-reading a stream, this type-level function maps the result you
+-- will have whether you read a regular stream or $all stream. When reading
+-- a regular stream, some read-error can occur like the stream got deleted.
+-- However read-error cannot occur when reading $all stream (because $all
+-- cannot get deleted).
+type family BatchResult t where
+ BatchResult EventNumber = ReadResult EventNumber StreamSlice
+ BatchResult Position = AllSlice
--------------------------------------------------------------------------------
--- | Reads events from the $all stream forward.
-readAllEventsForward :: Connection
- -> Position
- -> Int32 -- ^ Batch size
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Credentials
- -> IO (Async AllSlice)
-readAllEventsForward mgr =
- readAllEventsCommon mgr Forward
-
---------------------------------------------------------------------------------
--- | Reads events from the $all stream backward
-readAllEventsBackward :: Connection
- -> Position
- -> Int32 -- ^ Batch size
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Credentials
- -> IO (Async AllSlice)
-readAllEventsBackward mgr =
- readAllEventsCommon mgr Backward
-
---------------------------------------------------------------------------------
-readAllEventsCommon :: Connection
- -> ReadDirection
- -> Position
- -> Int32
- -> Bool
- -> Maybe Credentials
- -> IO (Async AllSlice)
-readAllEventsCommon Connection{..} dir pos max_c res_link_tos cred = do
+-- | Reads events from a stream forward.
+readEventsForward :: Connection
+ -> StreamId t
+ -> t
+ -> Int32 -- ^ Batch size
+ -> ResolveLink
+ -> Maybe Credentials
+ -> IO (Async (BatchResult t))
+readEventsForward conn = readEventsCommon conn Forward
+
+--------------------------------------------------------------------------------
+-- | Reads events from a stream backward.
+readEventsBackward :: Connection
+ -> StreamId t
+ -> t
+ -> Int32 -- ^ Batch size
+ -> ResolveLink
+ -> Maybe Credentials
+ -> IO (Async (BatchResult t))
+readEventsBackward conn = readEventsCommon conn Backward
+
+--------------------------------------------------------------------------------
+readEventsCommon :: Connection
+ -> ReadDirection
+ -> StreamId t
+ -> t
+ -> Int32
+ -> ResolveLink
+ -> Maybe Credentials
+ -> IO (Async (BatchResult t))
+readEventsCommon Connection{..} dir streamId start cnt resLinkTos cred = do
p <- newPromise
- let op = Op.readAllEvents _settings c_pos p_pos max_c res_link_tos dir cred
+ let res_link_tos = resolveLinkToBool resLinkTos
+ op =
+ case streamId of
+ StreamName{} ->
+ let name = streamIdRaw streamId
+ evtNum = eventNumberToInt64 start in
+ Op.readStreamEvents _settings dir name evtNum cnt res_link_tos cred
+ All ->
+ let Position c_pos p_pos = start in
+ Op.readAllEvents _settings c_pos p_pos cnt res_link_tos dir cred
+
publishWith _exec (SubmitOperation p op)
async (retrieve p)
- where
- Position c_pos p_pos = pos
--------------------------------------------------------------------------------
--- | Subcribes to given stream.
+-- | Subscribes to a stream.
subscribe :: Connection
- -> StreamName
- -> Bool -- ^ Resolve Link Tos
+ -> StreamId t
+ -> ResolveLink
-> Maybe Credentials
- -> IO RegularSubscription
-subscribe Connection{..} stream resLnkTos cred =
+ -> IO (RegularSubscription t)
+subscribe Connection{..} stream resLinkTos cred =
newRegularSubscription _exec stream resLnkTos cred
+ where
+ resLnkTos = resolveLinkToBool resLinkTos
--------------------------------------------------------------------------------
--- | Subcribes to $all stream.
-subscribeToAll :: Connection
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Credentials
- -> IO RegularSubscription
-subscribeToAll conn resLnkTos cred = subscribe conn AllStream resLnkTos cred
-
---------------------------------------------------------------------------------
--- | Subscribes to given stream. If last checkpoint is defined, this will
+-- | Subscribes to a stream. If last checkpoint is defined, this will
-- 'readStreamEventsForward' from that event number, otherwise from the
-- beginning. Once last stream event reached up, a subscription request will
-- be sent using 'subscribe'.
subscribeFrom :: Connection
- -> StreamName
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Int64 -- ^ Last checkpoint
+ -> StreamId t
+ -> ResolveLink
+ -> Maybe t
-> Maybe Int32 -- ^ Batch size
-> Maybe Credentials
- -> IO CatchupSubscription
-subscribeFrom conn streamId resLnkTos lastChkPt batch cred =
- subscribeFromCommon conn resLnkTos batch cred tpe
+ -> IO (CatchupSubscription t)
+subscribeFrom Connection{..} streamId resLinkTos lastChkPt batch cred =
+ newCatchupSubscription _exec resLnkTos batch cred streamId $
+ case streamId of
+ StreamName{} -> fromMaybe streamStart lastChkPt
+ All -> fromMaybe positionStart lastChkPt
where
- tpe = Op.RegularCatchup (streamNameRaw streamId) (fromMaybe 0 lastChkPt)
-
---------------------------------------------------------------------------------
--- | Same as 'subscribeFrom' but applied to $all stream.
-subscribeToAllFrom :: Connection
- -> Bool -- ^ Resolve Link Tos
- -> Maybe Position -- ^ Last checkpoint
- -> Maybe Int32 -- ^ Batch size
- -> Maybe Credentials
- -> IO CatchupSubscription
-subscribeToAllFrom conn resLnkTos lastChkPt batch cred =
- subscribeFromCommon conn resLnkTos batch cred tpe
- where
- Position cPos pPos = fromMaybe positionStart lastChkPt
- tpe = Op.AllCatchup (Position cPos pPos)
+ resLnkTos = resolveLinkToBool resLinkTos
--------------------------------------------------------------------------------
subscribeFromCommon :: Connection
- -> Bool
+ -> ResolveLink
-> Maybe Int32
-> Maybe Credentials
- -> Op.CatchupState
- -> IO CatchupSubscription
-subscribeFromCommon Connection{..} resLnkTos batch cred tpe =
- newCatchupSubscription _exec resLnkTos batch cred tpe
+ -> StreamId t
+ -> t
+ -> IO (CatchupSubscription t)
+subscribeFromCommon Connection{..} resLinkTos batch cred kind seed =
+ newCatchupSubscription _exec resLnkTos batch cred kind seed
+ where
+ resLnkTos = resolveLinkToBool resLinkTos
--------------------------------------------------------------------------------
-- | Asynchronously sets the metadata for a stream.
@@ -566,7 +542,7 @@ setStreamMetadata :: Connection
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata cred = do
p <- newPromise
- let name = streamNameRaw evt_stream
+ let name = streamIdRaw evt_stream
op = Op.setMetaStream _settings name exp_ver cred metadata
publishWith _exec (SubmitOperation p op)
async (retrieve p)
@@ -579,7 +555,7 @@ getStreamMetadata :: Connection
-> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream cred = do
p <- newPromise
- let op = Op.readMetaStream _settings (streamNameRaw evt_stream) cred
+ let op = Op.readMetaStream _settings (streamIdRaw evt_stream) cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
@@ -593,7 +569,7 @@ createPersistentSubscription :: Connection
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
- let op = Op.createPersist grp (streamNameRaw stream) sett cred
+ let op = Op.createPersist grp (streamIdRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
@@ -607,7 +583,7 @@ updatePersistentSubscription :: Connection
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
- let op = Op.updatePersist grp (streamNameRaw stream) sett cred
+ let op = Op.updatePersist grp (streamIdRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
@@ -620,7 +596,7 @@ deletePersistentSubscription :: Connection
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} grp stream cred = do
p <- newPromise
- let op = Op.deletePersist grp (streamNameRaw stream) cred
+ let op = Op.deletePersist grp (streamIdRaw stream) cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
diff --git a/Database/EventStore/Internal/Control.hs b/Database/EventStore/Internal/Control.hs
index 3b8d1c0..14c5095 100644
--- a/Database/EventStore/Internal/Control.hs
+++ b/Database/EventStore/Internal/Control.hs
@@ -52,6 +52,9 @@ module Database.EventStore.Internal.Control
) where
--------------------------------------------------------------------------------
+#if __GLASGOW_HASKELL__ > 710
+import Control.Monad.Fail
+#endif
import Data.Typeable
#if __GLASGOW_HASKELL__ < 802
import Data.Typeable.Internal
@@ -86,6 +89,9 @@ newtype EventStore a =
deriving ( Functor
, Applicative
, Monad
+#if __GLASGOW_HASKELL__ > 710
+ , MonadFail
+#endif
, MonadThrow
, MonadCatch
, MonadIO
diff --git a/Database/EventStore/Internal/Operation.hs b/Database/EventStore/Internal/Operation.hs
index 0e3ac6f..085205f 100644
--- a/Database/EventStore/Internal/Operation.hs
+++ b/Database/EventStore/Internal/Operation.hs
@@ -5,6 +5,7 @@
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE Rank2Types #-}
+{-# LANGUAGE StandaloneDeriving #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation
@@ -74,20 +75,23 @@ data OpResult
-- | Operation exception that can occurs on an operation response.
data OperationError
= WrongExpectedVersion Text ExpectedVersion -- ^ Stream and Expected Version
- | StreamDeleted Text -- ^ Stream
+ | StreamDeleted StreamName -- ^ Stream
| InvalidTransaction
- | AccessDenied StreamName -- ^ Stream
+ | forall t. AccessDenied (StreamId t) -- ^ Stream
| InvalidServerResponse Command Command -- ^ Expected, Found
| ProtobufDecodingError String
| ServerError (Maybe Text) -- ^ Reason
| InvalidOperation Text
- | StreamNotFound Text
+ | StreamNotFound StreamName
| NotAuthenticatedOp
-- ^ Invalid operation state. If happens, it's a driver bug.
| Aborted
-- ^ Occurs when the user asked to close the connection or if the
-- connection can't reconnect anymore.
- deriving (Show, Typeable)
+ deriving Typeable
+
+--------------------------------------------------------------------------------
+deriving instance Show OperationError
--------------------------------------------------------------------------------
instance Exception OperationError
@@ -218,7 +222,7 @@ wrongVersion stream ver = failure (WrongExpectedVersion stream ver)
--------------------------------------------------------------------------------
-- | Raises 'StreamDeleted' exception.
-streamDeleted :: Text -> Code o a
+streamDeleted :: StreamName -> Code o a
streamDeleted stream = failure (StreamDeleted stream)
--------------------------------------------------------------------------------
@@ -228,7 +232,7 @@ invalidTransaction = failure InvalidTransaction
--------------------------------------------------------------------------------
-- | Raises 'AccessDenied' exception.
-accessDenied :: StreamName -> Code oconcat a
+accessDenied :: StreamId t -> Code o a
accessDenied = failure . AccessDenied
--------------------------------------------------------------------------------
diff --git a/Database/EventStore/Internal/Operation/Catchup.hs b/Database/EventStore/Internal/Operation/Catchup.hs
index 5ecd9cd..c5e7cf3 100644
--- a/Database/EventStore/Internal/Operation/Catchup.hs
+++ b/Database/EventStore/Internal/Operation/Catchup.hs
@@ -1,6 +1,7 @@
-{-# LANGUAGE DataKinds #-}
-{-# LANGUAGE GADTs #-}
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.Catchup
@@ -13,11 +14,7 @@
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.Catchup
- ( CatchupState(..)
- , CatchupOpResult(..)
- , Checkpoint(..)
- , catchup
- ) where
+ ( catchup ) where
--------------------------------------------------------------------------------
import Data.Int
@@ -36,100 +33,75 @@ import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
--------------------------------------------------------------------------------
--- | Represents the next checkpoint to reach on a catchup subscription. Wheither
--- it's a regular stream or the $all stream, it either point to an 'Int32' or
--- a 'Position'.
-data Checkpoint = CheckpointNumber Int32 | CheckpointPosition Position
-
---------------------------------------------------------------------------------
defaultBatchSize :: Int32
defaultBatchSize = 500
--------------------------------------------------------------------------------
streamNotFound :: Text -> OperationError
-streamNotFound stream = StreamNotFound stream
+streamNotFound stream = StreamNotFound $ StreamName stream
--------------------------------------------------------------------------------
--- | Catchup operation state.
-data CatchupState
- = RegularCatchup Text Int64
- -- ^ Indicates the stream name and the next event number to start from.
- | AllCatchup Position
- -- ^ Indicates the commit and prepare position. Used when catching up from
- -- the $all stream.
- deriving Show
+fetchStream :: Settings
+ -> Text -- Stream name.
+ -> Int32 -- Batch size.
+ -> Bool -- Resolve link tos.
+ -> Maybe Credentials
+ -> EventNumber
+ -> Code o (Slice EventNumber)
+fetchStream setts stream batch tos cred (EventNumber n) = do
+ outcome <-
+ deconstruct $ fmap Left $
+ readStreamEvents setts Forward stream n batch tos cred
---------------------------------------------------------------------------------
-fetch :: Settings
- -> Int32
- -> Bool
- -> CatchupState
- -> Maybe Credentials
- -> Code o SomeSlice
-fetch setts batch tos state cred =
- case state of
- RegularCatchup stream n -> do
- outcome <- deconstruct $ fmap Left $
- readStreamEvents setts Forward stream n batch tos cred
- fromReadResult stream outcome (return . toSlice)
- AllCatchup (Position com pre) ->
- deconstruct $ fmap (Left . toSlice) $
- readAllEvents setts com pre batch tos Forward cred
+ fromReadResult stream outcome pure
--------------------------------------------------------------------------------
-updateState :: CatchupState -> Location -> CatchupState
-updateState (RegularCatchup stream _) (StreamEventNumber n) =
- RegularCatchup stream n
-updateState (AllCatchup _) (StreamPosition p) = AllCatchup p
-updateState x y = error $ "Unexpected input updateState: " <> show (x,y)
+fetchAll :: Settings
+ -> Int32 -- Batch size.
+ -> Bool -- Resolve link tos.
+ -> Maybe Credentials
+ -> Position
+ -> Code o (Slice Position)
+fetchAll setts batch tos cred (Position com pre) =
+ deconstruct $ fmap Left $
+ readAllEvents setts com pre batch tos Forward cred
--------------------------------------------------------------------------------
-sourceStream :: Settings
- -> Int32
- -> Bool
- -> CatchupState
- -> Maybe Credentials
+sourceStream :: t
+ -> (forall o. t -> Code o (Slice t))
-> Operation SubAction
-sourceStream setts batch tos start cred = unfoldPlan start go
+sourceStream seed iteratee = unfoldPlan seed go
where
go state = do
- s <- fetch setts batch tos state cred
+ s <- iteratee state
traverse_ (yield . Submit) (sliceEvents s)
- when (sliceEOS s)
- stop
-
- return $ updateState state (sliceNext s)
+ case sliceNext s of
+ Just newState -> pure newState
+ Nothing -> stop
--------------------------------------------------------------------------------
-catchupStreamName :: CatchupState -> Text
-catchupStreamName (RegularCatchup stream _) = stream
-catchupStreamName _ = ""
-
---------------------------------------------------------------------------------
-data CatchupOpResult =
- CatchupOpResult { catchupReadEvents :: ![ResolvedEvent]
- , catchupEndOfStream :: !Bool
- , catchupCheckpoint :: !Checkpoint
- }
-
---------------------------------------------------------------------------------
--- | Stream catching up operation.
-catchup :: Settings
- -> CatchupState
- -> Bool
- -> Maybe Int32
+catchup :: forall t. Settings
+ -> StreamId t
+ -> t
+ -> Bool -- Resolve link tos.
+ -> Maybe Int32 -- Batch size.
-> Maybe Credentials
-> Operation SubAction
-catchup setts state tos batchSiz cred =
- sourceStream setts batch tos state cred <> volatile stream tos cred
+catchup setts streamId from tos batchSiz cred =
+ sourceStream from iteratee <> volatile streamId tos cred
where
- batch = fromMaybe defaultBatchSize batchSiz
- stream = catchupStreamName state
+ batch = fromMaybe defaultBatchSize batchSiz
+
+ iteratee :: t -> Code o (Slice t)
+ iteratee =
+ case streamId of
+ StreamName n -> fetchStream setts n batch tos cred
+ All -> fetchAll setts batch tos cred
--------------------------------------------------------------------------------
fromReadResult :: Text
- -> ReadResult 'RegularStream a
+ -> ReadResult EventNumber a
-> (a -> Code o x)
-> Code o x
fromReadResult stream res k =
diff --git a/Database/EventStore/Internal/Operation/DeleteStream.hs b/Database/EventStore/Internal/Operation/DeleteStream.hs
index 2d89974..5c12f87 100644
--- a/Database/EventStore/Internal/Operation/DeleteStream.hs
+++ b/Database/EventStore/Internal/Operation/DeleteStream.hs
@@ -61,6 +61,6 @@ deleteStream Settings{..} s v hard cred = construct $ do
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion s v
- OP_STREAM_DELETED -> streamDeleted s
+ OP_STREAM_DELETED -> streamDeleted $ StreamName s
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied (StreamName s)
diff --git a/Database/EventStore/Internal/Operation/Read/Common.hs b/Database/EventStore/Internal/Operation/Read/Common.hs
index 273f335..fa6e865 100644
--- a/Database/EventStore/Internal/Operation/Read/Common.hs
+++ b/Database/EventStore/Internal/Operation/Read/Common.hs
@@ -18,6 +18,7 @@ module Database.EventStore.Internal.Operation.Read.Common where
--------------------------------------------------------------------------------
import Control.Applicative
import Data.Foldable
+import Data.Maybe (isNothing)
import Data.Monoid
import Data.Traversable
import Data.Int
@@ -32,13 +33,13 @@ import Prelude
--------------------------------------------------------------------------------
-- | Enumeration detailing the possible outcomes of reading a stream.
-data ReadResult :: StreamType -> * -> * where
+data ReadResult t a where
ReadSuccess :: a -> ReadResult t a
- ReadNoStream :: ReadResult 'RegularStream a
- ReadStreamDeleted :: Text -> ReadResult 'RegularStream a
+ ReadNoStream :: ReadResult EventNumber a
+ ReadStreamDeleted :: StreamName -> ReadResult EventNumber a
ReadNotModified :: ReadResult t a
ReadError :: Maybe Text -> ReadResult t a
- ReadAccessDenied :: StreamName -> ReadResult t a
+ ReadAccessDenied :: StreamId t -> ReadResult t a
--------------------------------------------------------------------------------
instance Eq a => Eq (ReadResult t a) where
@@ -84,107 +85,43 @@ instance Traversable (ReadResult t) where
--------------------------------------------------------------------------------
-- | Gathers common slice operations.
-class Slice a where
- type Loc a
-
- sliceEvents :: a -> [ResolvedEvent]
- -- ^ Gets slice's 'ResolvedEvent's.
- sliceDirection :: a -> ReadDirection
- -- ^ Gets slice's reading direction.
- sliceEOS :: a -> Bool
- -- ^ If the slice reaches the end of the stream.
- sliceFrom :: a -> Loc a
- -- ^ Gets the starting location of this slice.
- sliceNext :: a -> Loc a
- -- ^ Gets the next location of this slice.
- toSlice :: a -> SomeSlice
- -- ^ Returns a common view of a slice.
+data Slice t
+ = SliceEndOfStream
+ | Slice ![ResolvedEvent] !(Maybe t)
+ deriving Show
--------------------------------------------------------------------------------
--- | Regular stream slice.
-data StreamSlice =
- StreamSlice
- { sliceStream :: !Text
- , sliceLast :: !Int64
- , _ssDir :: !ReadDirection
- , _ssFrom :: !Int64
- , _ssNext :: !Int64
- , _ssEvents :: ![ResolvedEvent]
- , _ssEOS :: !Bool
- } deriving Show
-
---------------------------------------------------------------------------------
-instance Slice StreamSlice where
- type Loc StreamSlice = Int64
-
- sliceEvents = _ssEvents
- sliceDirection = _ssDir
- sliceEOS = _ssEOS
- sliceFrom = _ssFrom
- sliceNext = _ssNext
-
- toSlice s =
- SomeSlice
- { __events = sliceEvents s
- , __eos = sliceEOS s
- , __dir = sliceDirection s
- , __from = StreamEventNumber $ sliceFrom s
- , __next = StreamEventNumber $ sliceNext s
- }
+-- | Empty slice.
+emptySlice :: Slice t
+emptySlice = SliceEndOfStream
--------------------------------------------------------------------------------
--- | Represents a slice of the $all stream.
-data AllSlice =
- AllSlice
- { _saFrom :: !Position
- , _saNext :: !Position
- , _saDir :: !ReadDirection
- , _saEvents :: ![ResolvedEvent]
- , _saEOS :: !Bool
- } deriving Show
-
---------------------------------------------------------------------------------
-instance Slice AllSlice where
- type Loc AllSlice = Position
-
- sliceEvents = _saEvents
- sliceDirection = _saDir
- sliceEOS = _saEOS
- sliceFrom = _saFrom
- sliceNext = _saNext
-
- toSlice s =
- SomeSlice
- { __events = sliceEvents s
- , __eos = sliceEOS s
- , __dir = sliceDirection s
- , __from = StreamPosition $ sliceFrom s
- , __next = StreamPosition $ sliceNext s
- }
-
---------------------------------------------------------------------------------
-data Location
- = StreamEventNumber !Int64
- | StreamPosition !Position
- deriving Show
+instance Functor Slice where
+ fmap f SliceEndOfStream = SliceEndOfStream
+ fmap f (Slice xs next) = Slice xs (fmap f next)
+
+--------------------------------------------------------------------------------
+-- | Gets slice's 'ResolvedEvents's.
+sliceEvents :: Slice t -> [ResolvedEvent]
+sliceEvents SliceEndOfStream = []
+sliceEvents (Slice xs _) = xs
--------------------------------------------------------------------------------
-data SomeSlice =
- SomeSlice
- { __events :: ![ResolvedEvent]
- , __eos :: !Bool
- , __dir :: !ReadDirection
- , __from :: !Location
- , __next :: !Location
- } deriving Show
+-- | If the slice has reached the end of the stream.
+sliceEOS :: Slice t -> Bool
+sliceEOS SliceEndOfStream = True
+sliceEOS (Slice _ next) = isNothing next
--------------------------------------------------------------------------------
-instance Slice SomeSlice where
- type Loc SomeSlice = Location
+-- | Gets the next location of this slice.
+sliceNext :: Slice t -> Maybe t
+sliceNext SliceEndOfStream = Nothing
+sliceNext (Slice _ next) = next
- sliceEvents = __events
- sliceDirection = __dir
- sliceEOS = __eos
- sliceFrom = __from
- sliceNext = __next
- toSlice = id
+--------------------------------------------------------------------------------
+-- | Regular stream slice.
+type StreamSlice = Slice EventNumber
+
+--------------------------------------------------------------------------------
+-- | Represents a slice of the $all stream.
+type AllSlice = Slice Position
diff --git a/Database/EventStore/Internal/Operation/ReadAllEvents.hs b/Database/EventStore/Internal/Operation/ReadAllEvents.hs
index 6fe4f95..bea2614 100644
--- a/Database/EventStore/Internal/Operation/ReadAllEvents.hs
+++ b/Database/EventStore/Internal/Operation/ReadAllEvents.hs
@@ -57,10 +57,10 @@ readAllEvents Settings{..} c_pos p_pos max_c tos dir cred = construct $ do
es = getField $ _Events resp
evts = fmap newResolvedEventFromBuf es
eos = null evts
- f_pos = Position c_pos p_pos
n_pos = Position nc_pos np_pos
- slice = AllSlice f_pos n_pos dir evts eos
+ slice =
+ if eos then SliceEndOfStream else Slice evts (Just n_pos)
case fromMaybe SUCCESS r of
ERROR -> serverError err
- ACCESS_DENIED -> accessDenied AllStream
+ ACCESS_DENIED -> accessDenied All
_ -> yield slice
diff --git a/Database/EventStore/Internal/Operation/ReadEvent.hs b/Database/EventStore/Internal/Operation/ReadEvent.hs
index 342cd1a..87f1b93 100644
--- a/Database/EventStore/Internal/Operation/ReadEvent.hs
+++ b/Database/EventStore/Internal/Operation/ReadEvent.hs
@@ -37,11 +37,11 @@ import Database.EventStore.Internal.Types
data ReadEvent
= ReadEventNotFound
{ readEventStream :: !Text
- , readEventNumber :: !Int32
+ , readEventNumber :: !Int64
}
| ReadEvent
{ readEventStream :: !Text
- , readEventNumber :: !Int32
+ , readEventNumber :: !Int64
, readEventResolved :: !ResolvedEvent
} deriving Show
@@ -49,10 +49,10 @@ data ReadEvent
-- | Read a specific event given event number operation.
readEvent :: Settings
-> Text
- -> Int32
+ -> Int64
-> Bool
-> Maybe Credentials
- -> Operation (ReadResult 'RegularStream ReadEvent)
+ -> Operation (ReadResult EventNumber ReadEvent)
readEvent Settings{..} s evtn tos cred = construct $ do
let msg = newRequest s evtn tos s_requireMaster
resp <- send readEventCmd readEventCompletedCmd cred msg
@@ -64,7 +64,7 @@ readEvent Settings{..} s evtn tos cred = construct $ do
case r of
NOT_FOUND -> yield not_found
NO_STREAM -> yield ReadNoStream
- STREAM_DELETED -> yield $ ReadStreamDeleted s
+ STREAM_DELETED -> yield $ ReadStreamDeleted $ StreamName s
ERROR -> yield (ReadError err)
ACCESS_DENIED -> yield $ ReadAccessDenied $ StreamName s
SUCCESS -> yield found
diff --git a/Database/EventStore/Internal/Operation/ReadEvent/Message.hs b/Database/EventStore/Internal/Operation/ReadEvent/Message.hs
index 61d25cf..b95a385 100644
--- a/Database/EventStore/Internal/Operation/ReadEvent/Message.hs
+++ b/Database/EventStore/Internal/Operation/ReadEvent/Message.hs
@@ -34,7 +34,7 @@ import Database.EventStore.Internal.Types
data Request
= Request
{ _streamId :: Required 1 (Value Text)
- , _eventNumber :: Required 2 (Value Int32)
+ , _eventNumber :: Required 2 (Value Int64)
, _resolveLinkTos :: Required 3 (Value Bool)
, _requireMaster :: Required 4 (Value Bool)
}
@@ -45,7 +45,7 @@ instance Encode Request
--------------------------------------------------------------------------------
-- | 'Request' smart constructor.
-newRequest :: Text -> Int32 -> Bool -> Bool -> Request
+newRequest :: Text -> Int64 -> Bool -> Bool -> Request
newRequest stream_id evt_num res_link_tos req_master =
Request
{ _streamId = putField stream_id
diff --git a/Database/EventStore/Internal/Operation/ReadStreamEvents.hs b/Database/EventStore/Internal/Operation/ReadStreamEvents.hs
index 32aaf68..4d67d66 100644
--- a/Database/EventStore/Internal/Operation/ReadStreamEvents.hs
+++ b/Database/EventStore/Internal/Operation/ReadStreamEvents.hs
@@ -39,7 +39,7 @@ readStreamEvents :: Settings
-> Int32
-> Bool
-> Maybe Credentials
- -> Operation (ReadResult 'RegularStream StreamSlice)
+ -> Operation (ReadResult EventNumber StreamSlice)
readStreamEvents Settings{..} dir s st cnt tos cred = construct $ do
let req_cmd =
case dir of
@@ -58,11 +58,13 @@ readStreamEvents Settings{..} dir s st cnt tos cred = construct $ do
err = getField $ _error resp
eos = getField $ _endOfStream resp
nxt = getField $ _nextNumber resp
- lst = getField $ _lastNumber resp
- found = StreamSlice s lst dir st nxt evts eos
+ found =
+ if null evts && eos
+ then SliceEndOfStream
+ else Slice evts (if eos then Nothing else Just $ EventNumber nxt)
case r of
NO_STREAM -> yield ReadNoStream
- STREAM_DELETED -> yield $ ReadStreamDeleted s
+ STREAM_DELETED -> yield $ ReadStreamDeleted $ StreamName s
NOT_MODIFIED -> yield ReadNotModified
ERROR -> yield (ReadError err)
ACCESS_DENIED -> yield $ ReadAccessDenied $ StreamName s
diff --git a/Database/EventStore/Internal/Operation/StreamMetadata.hs b/Database/EventStore/Internal/Operation/StreamMetadata.hs
index e7e7352..d38b2c5 100644
--- a/Database/EventStore/Internal/Operation/StreamMetadata.hs
+++ b/Database/EventStore/Internal/Operation/StreamMetadata.hs
@@ -78,8 +78,8 @@ streamNotFound :: OperationError
streamNotFound = InvalidOperation "Read metadata on an inexistant stream"
--------------------------------------------------------------------------------
-onReadResult :: ReadResult 'RegularStream ReadEvent
- -> (Text -> Int32 -> ResolvedEvent -> Code o a)
+onReadResult :: ReadResult EventNumber ReadEvent
+ -> (Text -> Int64 -> ResolvedEvent -> Code o a)
-> Code o a
onReadResult (ReadSuccess r) k =
case r of
diff --git a/Database/EventStore/Internal/Operation/Transaction.hs b/Database/EventStore/Internal/Operation/Transaction.hs
index 858ecc0..688fc84 100644
--- a/Database/EventStore/Internal/Operation/Transaction.hs
+++ b/Database/EventStore/Internal/Operation/Transaction.hs
@@ -54,7 +54,7 @@ transactionStart Settings{..} stream exp_v cred = construct $ do
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion stream exp_v
- OP_STREAM_DELETED -> streamDeleted stream
+ OP_STREAM_DELETED -> streamDeleted $ StreamName stream
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied $ StreamName stream
OP_SUCCESS -> yield tid
@@ -78,7 +78,7 @@ transactionWrite Settings{..} stream exp_v trans_id evts cred = construct $ do
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion stream exp_v
- OP_STREAM_DELETED -> streamDeleted stream
+ OP_STREAM_DELETED -> streamDeleted $ StreamName stream
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied $ StreamName stream
OP_SUCCESS -> yield ()
@@ -107,7 +107,7 @@ transactionCommit Settings{..} stream exp_v trans_id cred = construct $ do
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion stream exp_v
- OP_STREAM_DELETED -> streamDeleted stream
+ OP_STREAM_DELETED -> streamDeleted $ StreamName stream
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied $ StreamName stream
OP_SUCCESS -> yield res
diff --git a/Database/EventStore/Internal/Operation/Volatile.hs b/Database/EventStore/Internal/Operation/Volatile.hs
index dd69dbd..36d1015 100644
--- a/Database/EventStore/Internal/Operation/Volatile.hs
+++ b/Database/EventStore/Internal/Operation/Volatile.hs
@@ -20,13 +20,16 @@ import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
+import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
--------------------------------------------------------------------------------
-volatile :: Text -> Bool -> Maybe Credentials -> Operation SubAction
-volatile stream tos cred = construct (issueRequest stream tos cred)
+volatile :: StreamId t -> Bool -> Maybe Credentials -> Operation SubAction
+volatile streamId tos cred = construct (issueRequest stream tos cred)
+ where
+ stream = streamIdRaw streamId
--------------------------------------------------------------------------------
issueRequest :: Text -> Bool -> Maybe Credentials -> Code SubAction ()
diff --git a/Database/EventStore/Internal/Operation/WriteEvents.hs b/Database/EventStore/Internal/Operation/WriteEvents.hs
index bb22167..3331537 100644
--- a/Database/EventStore/Internal/Operation/WriteEvents.hs
+++ b/Database/EventStore/Internal/Operation/WriteEvents.hs
@@ -55,6 +55,6 @@ writeEvents Settings{..} s v cred evts = construct $ do
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion s v
- OP_STREAM_DELETED -> streamDeleted s
+ OP_STREAM_DELETED -> streamDeleted $ StreamName s
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied (StreamName s)
diff --git a/Database/EventStore/Internal/Stream.hs b/Database/EventStore/Internal/Stream.hs
index e1ba1a5..4b6df3c 100644
--- a/Database/EventStore/Internal/Stream.hs
+++ b/Database/EventStore/Internal/Stream.hs
@@ -17,26 +17,34 @@ module Database.EventStore.Internal.Stream where
--------------------------------------------------------------------------------
import Database.EventStore.Internal.Prelude
+import Database.EventStore.Internal.Types
--------------------------------------------------------------------------------
--- | A stream can either point to $all or a regular one.
-data StreamType = All | RegularStream deriving (Eq, Ord)
+-- | Represents a regular stream name or $all stream.
+data StreamId loc where
+ StreamName :: Text -> StreamId EventNumber
+ All :: StreamId Position
--------------------------------------------------------------------------------
--- | Represents a regular stream name or $all stream.
-data StreamName = StreamName Text | AllStream deriving Eq
+-- | If the stream is the $all stream.
+isAllStream :: StreamId t -> Bool
+isAllStream StreamName{} = False
+isAllStream _ = True
+
+--------------------------------------------------------------------------------
+instance Eq (StreamId t) where
+ StreamName n == StreamName v = n == v
+ All == _ = True
--------------------------------------------------------------------------------
-streamNameRaw :: StreamName -> Text
-streamNameRaw (StreamName n) = n
-streamNameRaw AllStream = ""
+type StreamName = StreamId EventNumber
--------------------------------------------------------------------------------
-instance Show StreamName where
- show (StreamName t) = show t
- show AllStream = "$all"
+streamIdRaw :: StreamId t -> Text
+streamIdRaw (StreamName n) = n
+streamIdRaw All = ""
--------------------------------------------------------------------------------
-instance IsString StreamName where
- fromString "$all" = AllStream
- fromString stream = StreamName $ pack stream \ No newline at end of file
+instance Show (StreamId t) where
+ show (StreamName n) = show n
+ show All = "$all"
diff --git a/Database/EventStore/Internal/Subscription/Api.hs b/Database/EventStore/Internal/Subscription/Api.hs
index 2675cc6..3e740da 100644
--- a/Database/EventStore/Internal/Subscription/Api.hs
+++ b/Database/EventStore/Internal/Subscription/Api.hs
@@ -1,5 +1,6 @@
-{-# LANGUAGE DeriveDataTypeable #-}
-{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE DeriveDataTypeable #-}
+{-# LANGUAGE FunctionalDependencies #-}
+{-# LANGUAGE ScopedTypeVariables #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Api
@@ -46,13 +47,15 @@ class Subscription s where
-- | Returns the runtime details of a subscription.
getSubscriptionDetailsSTM :: s -> STM SubDetails
- -- | Get subscription stream.
- subscriptionStream :: s -> StreamName
-
-- | Asynchronously unsubscribe from the the stream.
unsubscribe :: s -> IO ()
--------------------------------------------------------------------------------
+-- | Returns the stream of a subscription.
+class SubscriptionStream s t | t -> s where
+ subscriptionStream :: s -> StreamId t
+
+--------------------------------------------------------------------------------
-- | Awaits for the next event.
nextEvent :: Subscription s => s -> IO ResolvedEvent
nextEvent s = atomically $ do
@@ -107,16 +110,8 @@ subUnsubscribe pub s = do
publishWith pub (SendPackage pkg)
--------------------------------------------------------------------------------
--- | If the subscription is on the $all stream.
-isSubscribedToAll :: Subscription s => s -> Bool
-isSubscribedToAll s =
- case subscriptionStream s of
- StreamName{} -> False
- _ -> True
-
---------------------------------------------------------------------------------
-- | Gets the ID of the subscription.
getSubscriptionId :: Subscription s => s -> IO SubscriptionId
getSubscriptionId s = atomically $ do
details <- getSubscriptionDetailsSTM s
- return (SubscriptionId $ subId details) \ No newline at end of file
+ return (SubscriptionId $ subId details)
diff --git a/Database/EventStore/Internal/Subscription/Catchup.hs b/Database/EventStore/Internal/Subscription/Catchup.hs
index 1da429f..ebba1e6 100644
--- a/Database/EventStore/Internal/Subscription/Catchup.hs
+++ b/Database/EventStore/Internal/Subscription/Catchup.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
@@ -16,6 +18,7 @@ module Database.EventStore.Internal.Subscription.Catchup where
--------------------------------------------------------------------------------
import Control.Monad.Fix
+import Safe (fromJustNote)
--------------------------------------------------------------------------------
import Database.EventStore.Internal.Callback
@@ -38,28 +41,24 @@ data Phase
| Closed (Either SomeException SubDropReason)
--------------------------------------------------------------------------------
-data CatchupTrack
- = CatchupRegular !(Maybe Int64)
- | CatchupAll !(Maybe Position)
+receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
+receivedAlready StreamName{} old e =
+ EventNumber (resolvedEventOriginalEventNumber e) < old
+receivedAlready All old e =
+ let pos =
+ fromJustNote
+ "Position is always defined when reading events from $all stream"
+ $ resolvedEventPosition e in
+ pos < old
--------------------------------------------------------------------------------
-catchupTrack :: CatchupState -> CatchupTrack
-catchupTrack RegularCatchup{} = CatchupRegular Nothing
-catchupTrack AllCatchup{} = CatchupAll Nothing
-
---------------------------------------------------------------------------------
-receivedAlready :: CatchupTrack -> ResolvedEvent -> Bool
-receivedAlready (CatchupRegular old) e =
- maybe False (resolvedEventOriginalEventNumber e <=) old
-receivedAlready (CatchupAll old) e =
- fromMaybe False ((<=) <$> resolvedEventPosition e <*> old)
-
---------------------------------------------------------------------------------
-updateTrack :: ResolvedEvent -> CatchupTrack -> CatchupTrack
-updateTrack e (CatchupRegular _) =
- CatchupRegular (Just $ resolvedEventOriginalEventNumber e)
-updateTrack e (CatchupAll _) =
- CatchupAll (resolvedEventPosition e)
+nextTarget :: StreamId t -> ResolvedEvent -> t
+nextTarget StreamName{} e =
+ EventNumber (resolvedEventOriginalEventNumber e)
+nextTarget All e =
+ fromJustNote
+ "Position is always defined when reading events from $all stream"
+ $ resolvedEventPosition e
--------------------------------------------------------------------------------
-- | This kind of subscription specifies a starting point, in the form of an
@@ -71,16 +70,16 @@ updateTrack e (CatchupAll _) =
-- events in it, the subscriber can expect to see events 51 through 100, and
-- then any events subsequently written until such time as the subscription is
-- dropped or closed.
-data CatchupSubscription =
+data CatchupSubscription t =
CatchupSubscription { _catchupExec :: Exec
- , _catchupStream :: StreamName
+ , _catchupStream :: StreamId t
, _catchupPhase :: TVar Phase
- , _catchupTrack :: TVar CatchupTrack
+ , _catchupTrack :: TVar t
, _catchupNext :: STM (Maybe ResolvedEvent)
}
--------------------------------------------------------------------------------
-instance Subscription CatchupSubscription where
+instance Subscription (CatchupSubscription t) where
nextEventMaybeSTM = _catchupNext
getSubscriptionDetailsSTM s = do
@@ -90,34 +89,27 @@ instance Subscription CatchupSubscription where
Closed r -> throwClosed r
_ -> retrySTM
- subscriptionStream = _catchupStream
-
unsubscribe s = subUnsubscribe (_catchupExec s) s
--------------------------------------------------------------------------------
-streamName :: CatchupState -> StreamName
-streamName (RegularCatchup stream _) = StreamName stream
-streamName _ = "$all"
-
---------------------------------------------------------------------------------
-streamText :: StreamName -> Text
-streamText (StreamName s) = s
-streamText _ = ""
+instance SubscriptionStream (CatchupSubscription t) t where
+ subscriptionStream = _catchupStream
--------------------------------------------------------------------------------
newCatchupSubscription :: Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
- -> CatchupState
- -> IO CatchupSubscription
-newCatchupSubscription exec tos batch cred state = do
+ -> StreamId t
+ -> t
+ -> IO (CatchupSubscription t)
+newCatchupSubscription exec tos batch cred streamId seed = do
phaseVar <- newTVarIO CatchingUp
queue <- newTQueueIO
- track <- newTVarIO $ catchupTrack state
+ track <- newTVarIO seed
- let stream = streamName state
- sub = CatchupSubscription exec stream phaseVar track $ do
+ let stream = streamIdRaw streamId
+ sub = CatchupSubscription exec streamId phaseVar track $ do
p <- readTVar phaseVar
isEmpty <- isEmptyTQueue queue
if isEmpty
@@ -132,7 +124,7 @@ newCatchupSubscription exec tos batch cred state = do
Just opE ->
case opE of
StreamNotFound{} -> do
- let op = volatile (streamText stream) tos cred
+ let op = volatile streamId tos cred
publishWith exec (SubmitOperation cb op)
_ -> atomically $ writeTVar phaseVar (Closed $ Left e)
_ -> atomically $ writeTVar phaseVar (Closed $ Left e)
@@ -143,29 +135,18 @@ newCatchupSubscription exec tos batch cred state = do
atomically $ writeTVar phaseVar (Closed $ Right r)
Submit e -> atomically $ do
tracker <- readTVar track
- unless (receivedAlready tracker e) $ do
- writeTVar track (updateTrack e tracker)
+ unless (receivedAlready streamId tracker e) $ do
+ writeTVar track (nextTarget streamId e)
writeTQueue queue e
ConnectionReset -> do
- tpe <- readTVarIO track
- let newState =
- case tpe of
- CatchupRegular old ->
- case old of
- Just n -> RegularCatchup (streamText stream) n
- _ -> state
- CatchupAll old ->
- case old of
- Just p -> AllCatchup p
- _ -> state
-
- newOp = catchup (execSettings exec) newState tos batch cred
+ chk <- readTVarIO track
+ let newOp = catchup (execSettings exec) streamId chk tos batch cred
newCb <- mfix $ \self -> newCallback (callback self)
publishWith exec (SubmitOperation newCb newOp)
cb <- mfix $ \self -> newCallback (callback self)
- let op = catchup (execSettings exec) state tos batch cred
+ let op = catchup (execSettings exec) streamId seed tos batch cred
publishWith exec (SubmitOperation cb op)
return sub
@@ -176,17 +157,17 @@ throwClosed (Right r) = throwSTM (SubscriptionClosed $ Just r)
--------------------------------------------------------------------------------
-- | Non blocking version of `waitTillCatchup`.
-hasCaughtUp :: CatchupSubscription -> IO Bool
+hasCaughtUp :: CatchupSubscription t -> IO Bool
hasCaughtUp sub = atomically $ hasCaughtUpSTM sub
--------------------------------------------------------------------------------
-- | Waits until 'CatchupSubscription' subscription catch-up its stream.
-waitTillCatchup :: CatchupSubscription -> IO ()
+waitTillCatchup :: CatchupSubscription t -> IO ()
waitTillCatchup sub = atomically $ unlessM (hasCaughtUpSTM sub) retrySTM
--------------------------------------------------------------------------------
-- | Like 'hasCaughtUp' but lives in 'STM' monad.
-hasCaughtUpSTM :: CatchupSubscription -> STM Bool
+hasCaughtUpSTM :: CatchupSubscription t -> STM Bool
hasCaughtUpSTM CatchupSubscription{..} = do
p <- readTVar _catchupPhase
case p of
diff --git a/Database/EventStore/Internal/Subscription/Persistent.hs b/Database/EventStore/Internal/Subscription/Persistent.hs
index 037f1d8..68d934f 100644
--- a/Database/EventStore/Internal/Subscription/Persistent.hs
+++ b/Database/EventStore/Internal/Subscription/Persistent.hs
@@ -1,4 +1,5 @@
-{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE RecordWildCards #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Persistent
@@ -62,11 +63,13 @@ instance Subscription PersistentSubscription where
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
- subscriptionStream = _perStream
-
unsubscribe s = subUnsubscribe (_perExec s) s
--------------------------------------------------------------------------------
+instance SubscriptionStream PersistentSubscription EventNumber where
+ subscriptionStream = _perStream
+
+--------------------------------------------------------------------------------
newPersistentSubscription :: Exec
-> Text
-> StreamName
@@ -77,7 +80,7 @@ newPersistentSubscription exec grp stream bufSize cred = do
phaseVar <- newTVarIO Pending
queue <- newTQueueIO
- let name = streamNameRaw stream
+ let name = streamIdRaw stream
sub = PersistentSubscription exec stream cred phaseVar $ do
p <- readTVar phaseVar
isEmpty <- isEmptyTQueue queue
diff --git a/Database/EventStore/Internal/Subscription/Regular.hs b/Database/EventStore/Internal/Subscription/Regular.hs
index 16890e1..845013b 100644
--- a/Database/EventStore/Internal/Subscription/Regular.hs
+++ b/Database/EventStore/Internal/Subscription/Regular.hs
@@ -1,3 +1,4 @@
+{-# LANGUAGE FlexibleInstances #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Regular
@@ -34,15 +35,15 @@ data Phase
-- events in it when a subscriber connects, the subscriber can expect to see
-- event number 101 onwards until the time the subscription is closed or
-- dropped.
-data RegularSubscription =
+data RegularSubscription t =
RegularSubscription { _regExec :: Exec
- , _regStream :: StreamName
+ , _regStream :: StreamId t
, _regPhase :: TVar Phase
, _regNext :: STM (Maybe ResolvedEvent)
}
--------------------------------------------------------------------------------
-instance Subscription RegularSubscription where
+instance Subscription (RegularSubscription t) where
nextEventMaybeSTM = _regNext
getSubscriptionDetailsSTM s = do
@@ -55,22 +56,23 @@ instance Subscription RegularSubscription where
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
- subscriptionStream = _regStream
-
unsubscribe s = subUnsubscribe (_regExec s) s
--------------------------------------------------------------------------------
+instance SubscriptionStream (RegularSubscription t) t where
+ subscriptionStream = _regStream
+
+--------------------------------------------------------------------------------
newRegularSubscription :: Exec
- -> StreamName
+ -> StreamId t
-> Bool
-> Maybe Credentials
- -> IO RegularSubscription
-newRegularSubscription exec stream tos cred = do
+ -> IO (RegularSubscription t)
+newRegularSubscription exec streamId tos cred = do
phaseVar <- newTVarIO Pending
queue <- newTQueueIO
- let name = streamNameRaw stream
- sub = RegularSubscription exec stream phaseVar $ do
+ let sub = RegularSubscription exec streamId phaseVar $ do
p <- readTVar phaseVar
isEmpty <- isEmptyTQueue queue
if isEmpty
@@ -99,5 +101,5 @@ newRegularSubscription exec stream tos cred = do
writeTVar phaseVar (Closed $ Right SubAborted)
cb <- newCallback callback
- publishWith exec (SubmitOperation cb (volatile name tos cred))
+ publishWith exec (SubmitOperation cb (volatile streamId tos cred))
return sub
diff --git a/Database/EventStore/Internal/Types.hs b/Database/EventStore/Internal/Types.hs
index 8cb61b7..f0eabad 100644
--- a/Database/EventStore/Internal/Types.hs
+++ b/Database/EventStore/Internal/Types.hs
@@ -24,6 +24,7 @@ import Prelude (String)
import Data.Maybe
import Data.Monoid (Endo(..))
import Foreign.C.Types (CTime(..))
+import Numeric.Natural (Natural)
--------------------------------------------------------------------------------
import Control.Monad.Reader
@@ -234,6 +235,46 @@ streamExists :: ExpectedVersion
streamExists = StreamExists
--------------------------------------------------------------------------------
+-- | Represents an event position within a stream.
+newtype EventNumber = EventNumber Int64 deriving (Eq, Ord, Show)
+
+--------------------------------------------------------------------------------
+-- | The first event in a stream.
+streamStart :: EventNumber
+streamStart = EventNumber 0
+
+--------------------------------------------------------------------------------
+-- | The last event in the stream.
+streamEnd :: EventNumber
+streamEnd = EventNumber (-1)
+
+--------------------------------------------------------------------------------
+-- | the Nth event of a stream.
+eventNumber :: Natural -> EventNumber
+eventNumber n = EventNumber (fromIntegral n)
+
+--------------------------------------------------------------------------------
+-- | Returns a 'EventNumber from a raw 'Int64'.
+rawEventNumber :: Int64 -> EventNumber
+rawEventNumber = EventNumber
+
+--------------------------------------------------------------------------------
+-- | Returns a raw 'Int64' from an 'EventNumber'.
+eventNumberToInt64 :: EventNumber -> Int64
+eventNumberToInt64 (EventNumber n) = n
+
+--------------------------------------------------------------------------------
+-- | Determines whether any link event encountered in the stream will be
+-- resolved. See the discussion for more information:
+-- https://eventstore.org/docs/dotnet-api/reading-events/index.html#resolvedevent
+data ResolveLink = ResolveLink | NoResolveLink deriving (Show, Eq)
+
+--------------------------------------------------------------------------------
+resolveLinkToBool :: ResolveLink -> Bool
+resolveLinkToBool ResolveLink = True
+resolveLinkToBool NoResolveLink = False
+
+--------------------------------------------------------------------------------
-- EventStore Messages
--------------------------------------------------------------------------------
-- | Serializes form of an 'Event'.
@@ -997,7 +1038,7 @@ data StreamMetadataResult
= StreamMetadataResult
{ streamMetaResultStream :: !Text
-- ^ The name of the stream.
- , streamMetaResultVersion :: !Int32
+ , streamMetaResultVersion :: !Int64
-- ^ The version of the metadata format.
, streamMetaResultData :: !StreamMetadata
-- ^ A 'StreamMetadata' containing user-specified metadata.
diff --git a/Database/EventStore/Streaming.hs b/Database/EventStore/Streaming.hs
new file mode 100644
index 0000000..ea543d0
--- /dev/null
+++ b/Database/EventStore/Streaming.hs
@@ -0,0 +1,164 @@
+{-# LANGUAGE LambdaCase #-}
+--------------------------------------------------------------------------------
+-- |
+-- Module : Database.EventStore.Streaming
+-- Copyright : (C) 2018 Yorick Laupa
+-- License : (see the file LICENSE)
+-- Maintainer: Yorick Laupa <yo.eight@gmail.com>
+-- Stability : experimental
+-- Portability: non-portable
+--
+--------------------------------------------------------------------------------
+module Database.EventStore.Streaming
+ ( ReadError(..)
+ , readThroughForward
+ , readThroughBackward
+ , throwOnError
+ ) where
+
+--------------------------------------------------------------------------------
+import Control.Exception (Exception, throwIO)
+import Data.Int (Int32)
+import Data.Maybe (fromMaybe)
+import Data.Typeable (Typeable)
+import Prelude
+
+--------------------------------------------------------------------------------
+import Control.Concurrent.Async.Lifted (wait)
+import Control.Monad.Except (ExceptT, throwError, runExceptT)
+import Data.Text (Text)
+import Streaming
+import qualified Streaming.Prelude as Streaming
+
+--------------------------------------------------------------------------------
+import qualified Database.EventStore as ES
+import Database.EventStore.Internal.Operation.Read.Common (emptySlice)
+import Database.EventStore.Internal.Types (EventNumber(..))
+
+--------------------------------------------------------------------------------
+data ReadError t
+ = StreamDeleted !ES.StreamName
+ | ReadError !(Maybe Text)
+ | AccessDenied !(ES.StreamId t)
+ deriving Show
+
+--------------------------------------------------------------------------------
+instance (Show t, Typeable t) => Exception (ReadError t)
+
+--------------------------------------------------------------------------------
+data Fetch t = FetchError !(ReadError t) | Fetch !(ES.Slice t)
+
+--------------------------------------------------------------------------------
+toFetch :: ES.ReadResult t (ES.Slice t) -> Fetch t
+toFetch ES.ReadNoStream = Fetch ES.emptySlice
+toFetch ES.ReadNotModified = Fetch ES.emptySlice
+toFetch (ES.ReadStreamDeleted n) = FetchError (StreamDeleted n)
+toFetch (ES.ReadError e) = FetchError (ReadError e)
+toFetch (ES.ReadAccessDenied n) = FetchError (AccessDenied n)
+toFetch (ES.ReadSuccess s) = Fetch s
+
+--------------------------------------------------------------------------------
+handleBatchResult :: ES.StreamId t -> ES.BatchResult t -> Fetch t
+handleBatchResult ES.StreamName{} = toFetch
+handleBatchResult ES.All = Fetch
+
+--------------------------------------------------------------------------------
+data State t = Need t | Fetched ![ES.ResolvedEvent] !(Maybe t)
+
+--------------------------------------------------------------------------------
+streaming :: (t -> IO (Fetch t))
+ -> t
+ -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
+streaming iteratee = Streaming.unfoldr go . Need
+ where
+ go (Fetched buffer next) =
+ case buffer of
+ e:rest -> pure (Right (e, Fetched rest next))
+ [] -> maybe stop (go . Need) next
+ go (Need pos) = do
+ liftIO (iteratee pos) >>= \case
+ FetchError e -> throwError e
+ Fetch s ->
+ case s of
+ ES.SliceEndOfStream -> stop
+ ES.Slice xs next -> go (Fetched xs next)
+
+ stop = pure (Left ())
+
+--------------------------------------------------------------------------------
+-- | Returns an iterator able to consume a stream entirely. When reading forward,
+-- the iterator ends when the last stream's event is reached.
+readThroughForward :: ES.Connection
+ -> ES.StreamId t
+ -> ES.ResolveLink
+ -> t
+ -> Maybe Int32
+ -> Maybe ES.Credentials
+ -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
+readThroughForward conn = readThrough conn ES.Forward
+
+--------------------------------------------------------------------------------
+-- | Returns an iterator able to consume a stream entirely. When reading backward,
+-- the iterator ends when the first stream's event is reached.
+readThroughBackward :: ES.Connection
+ -> ES.StreamId t
+ -> ES.ResolveLink
+ -> t
+ -> Maybe Int32
+ -> Maybe ES.Credentials
+ -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
+readThroughBackward conn = readThrough conn ES.Backward
+
+--------------------------------------------------------------------------------
+-- | Throws an exception in case 'ExceptT' is a 'Left'.
+throwOnError :: (Show t, Typeable t)
+ => Stream (Of a) (ExceptT (ReadError t) IO) ()
+ -> Stream (Of a) IO ()
+throwOnError = hoist go
+ where
+ go action =
+ runExceptT action >>= \case
+ Left e -> throwIO e
+ Right a -> pure a
+
+--------------------------------------------------------------------------------
+readThrough :: ES.Connection
+ -> ES.ReadDirection
+ -> ES.StreamId t
+ -> ES.ResolveLink
+ -> t
+ -> Maybe Int32
+ -> Maybe ES.Credentials
+ -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
+readThrough conn dir streamId lnk from sizMay cred = streaming iteratee from
+ where
+ batchSize = fromMaybe 500 sizMay
+
+ iteratee =
+ case dir of
+ ES.Forward -> readForward conn streamId batchSize lnk cred
+ ES.Backward -> readBackward conn streamId batchSize lnk cred
+
+--------------------------------------------------------------------------------
+readForward :: ES.Connection
+ -> ES.StreamId t
+ -> Int32
+ -> ES.ResolveLink
+ -> Maybe ES.Credentials
+ -> t
+ -> IO (Fetch t)
+readForward conn streamId siz lnk creds start =
+ fmap (handleBatchResult streamId) . wait =<<
+ ES.readEventsForward conn streamId start siz lnk creds
+
+--------------------------------------------------------------------------------
+readBackward :: ES.Connection
+ -> ES.StreamId t
+ -> Int32
+ -> ES.ResolveLink
+ -> Maybe ES.Credentials
+ -> t
+ -> IO (Fetch t)
+readBackward conn streamId siz lnk creds start =
+ fmap (handleBatchResult streamId) . wait =<<
+ ES.readEventsBackward conn streamId start siz lnk creds
diff --git a/README.md b/README.md
index ccc8ae5..8bf4c10 100644..100755
--- a/README.md
+++ b/README.md
@@ -9,9 +9,9 @@ More information about the GetEventStore database can be found there: https://ev
Requirements
============
* 64bits system
- * GHC >= 7.8.3
+ * GHC >= 8.0.3
* Cabal >= 1.18
- * EventStore >= 3.0.0 (>= 3.1.0 if you want competing consumers).
+ * EventStore >= 4
*Note: If you use this client version >= to `1.1`, it will only supports EventStore >= 4.0.0.*
@@ -45,8 +45,6 @@ $ cabal test
How to use
==========
-This code snippet showcases client version >= `1.1`.
-
```haskell
{-# LANGUAGE OverloadedStrings #-} -- That library uses `Text` pervasively. This pragma permits to use
-- String literal when a Text is needed.
@@ -74,7 +72,7 @@ main = do
evt = createEvent "programming" Nothing (withJson js)
-- Appends an event to a stream named `languages`.
- as <- sendEvent conn "languages" anyVersion evt Nothing
+ as <- sendEvent conn (StreamName "languages") anyVersion evt Nothing
-- EventStore interactions are fundamentally asynchronous. Nothing requires you to wait
-- for the completion of an operation, but it's good to know if something went wrong.
diff --git a/eventstore.cabal b/eventstore.cabal
index 0711592..9dc9513 100644
--- a/eventstore.cabal
+++ b/eventstore.cabal
@@ -1,26 +1,27 @@
--- This file has been generated from package.yaml by hpack version 0.28.2.
+cabal-version: 1.12
+
+-- This file has been generated from package.yaml by hpack version 0.31.1.
--
-- see: https://github.com/sol/hpack
--
--- hash: efad84983fbffd630c221007e541d0a71ec04435a4d4e1aa642cc25e5108ccf0
+-- hash: 397c16fc115765cfccc8c21c8b7133555d40e98546632200395b77c3551720c8
name: eventstore
-version: 1.1.6
+version: 1.2.0
synopsis: EventStore TCP Client
description: EventStore TCP Client <https://eventstore.org>
category: Database
-homepage: https://gitlab.com/YoEight/eventstore-hs.git
+homepage: https://gitlab.com/YoEight/eventstore-hs
author: Yorick Laupa
maintainer: yo.eight@gmail.com
copyright: Yorick Laupa
license: BSD3
license-file: LICENSE
-tested-with: GHC >= 7.8 && < 8.6
+tested-with: GHC >= 7.8 && <= 8.6
build-type: Simple
-cabal-version: >= 1.10
extra-source-files:
- CHANGELOG.markdown
README.md
+ CHANGELOG.markdown
source-repository head
type: git
@@ -29,6 +30,7 @@ source-repository head
library
exposed-modules:
Database.EventStore
+ Database.EventStore.Streaming
Database.EventStore.Internal.Test
other-modules:
Database.EventStore.Internal.Callback
@@ -94,7 +96,7 @@ library
, cereal >=0.4 && <0.6
, clock
, connection ==0.2.*
- , containers ==0.5.*
+ , containers
, dns >=3.0.1
, dotnet-timespan
, ekg-core
@@ -112,10 +114,12 @@ library
, mtl
, protobuf >=0.2.1.1 && <0.3
, random ==1.*
+ , safe
, safe-exceptions
, semigroups >=0.5
, stm
, stm-chans
+ , streaming
, text
, time >=1.4
, transformers-base
@@ -157,10 +161,12 @@ test-suite eventstore-tests
, monad-control
, mono-traversable ==1.*
, protobuf
+ , safe
, safe-exceptions
, semigroups
, stm
, stm-chans
+ , streaming
, tasty
, tasty-hspec
, tasty-hunit
diff --git a/tests/Test/Integration/Tests.hs b/tests/Test/Integration/Tests.hs
index 1f93b09..92b0244 100644
--- a/tests/Test/Integration/Tests.hs
+++ b/tests/Test/Integration/Tests.hs
@@ -22,6 +22,7 @@ import Data.DotNet.TimeSpan
import Data.Maybe (fromMaybe)
import Data.UUID hiding (null)
import Data.UUID.V4
+import qualified Streaming.Prelude as Streaming
import System.Environment (lookupEnv)
import Test.Tasty.HUnit
import Test.Tasty.Hspec
@@ -30,7 +31,8 @@ import Test.Tasty.Hspec
import Database.EventStore
( SubscriptionClosed(..)
, ReadResult(..)
- , StreamName(..)
+ , StreamId(..)
+ , StreamName
, Slice(..)
, ConnectionType(..)
, Connection
@@ -50,10 +52,8 @@ import Database.EventStore
, waitUnsubscribeConfirmed
, subscribeFrom
, waitTillCatchup
- , readAllEventsBackward
- , readAllEventsForward
- , readStreamEventsBackward
- , readStreamEventsForward
+ , readEventsBackward
+ , readEventsForward
, subscribe
, connect
, waitTillClosed
@@ -63,6 +63,7 @@ import Database.EventStore
, transactionWrite
, deleteStream
, transactionCommit
+ , sliceEvents
)
import Database.EventStore.Internal.Test hiding
( Connection(..)
@@ -75,6 +76,7 @@ import Database.EventStore.Internal.Test hiding
, transactionCommit
, i
)
+import Database.EventStore.Streaming
import Test.Common
--------------------------------------------------------------------------------
@@ -116,6 +118,8 @@ spec = beforeAll createConnection $ afterAll shuttingDown $ describe "Features"
it "deletes a persistent subscription" deletePersistentTest
it "connects to a persistent subscription" connectToPersistentTest
it "set MaxAge metadata correctly" maxAgeTest
+ it "streams regular stream (forward)" streamRegularStreamForwardTest
+ it "streams regular stream (backward)" streamRegularStreamBackwardTest
--------------------------------------------------------------------------------
freshStreamId :: IO StreamName
@@ -141,7 +145,7 @@ readEventTest conn = do
evt = createEvent "foo" Nothing $ withJson js
as <- sendEvent conn stream anyVersion evt Nothing
_ <- wait as
- bs <- readEvent conn stream 0 False Nothing
+ bs <- readEvent conn stream streamStart NoResolveLink Nothing
rs <- wait bs
case rs of
ReadSuccess re ->
@@ -172,13 +176,13 @@ transactionTest conn = do
evt = createEvent "foo" Nothing $ withJson js
t <- startTransaction conn stream anyVersion Nothing >>= wait
_ <- transactionWrite t [evt] Nothing >>= wait
- rs <- readEvent conn stream 0 False Nothing >>= wait
+ rs <- readEvent conn stream streamStart NoResolveLink Nothing >>= wait
case rs of
ReadNoStream -> return ()
e -> fail $ "transaction-test stream is supposed to not exist "
<> show e
_ <- transactionCommit t Nothing >>= wait
- rs2 <- readEvent conn stream 0 False Nothing >>= wait
+ rs2 <- readEvent conn stream streamStart NoResolveLink Nothing >>= wait
case rs2 of
ReadSuccess re ->
case re of
@@ -200,7 +204,7 @@ readStreamEventForwardTest conn = do
]
evts = fmap (createEvent "foo" Nothing . withJson) jss
_ <- sendEvents conn stream anyVersion evts Nothing >>= wait
- rs <- readStreamEventsForward conn stream 0 10 False Nothing >>= wait
+ rs <- readEventsForward conn stream streamStart 10 NoResolveLink Nothing >>= wait
case rs of
ReadSuccess sl -> do
let jss_evts = catMaybes $ fmap resolvedEventDataAsJson
@@ -216,8 +220,9 @@ readStreamEventBackwardTest conn = do
, object [ "bar" .= True]
]
evts = fmap (createEvent "foo" Nothing . withJson) jss
- _ <- sendEvents conn "read-backward-test" anyVersion evts Nothing >>= wait
- rs <- readStreamEventsBackward conn "read-backward-test" 2 10 False Nothing >>= wait
+ _ <- sendEvents conn (StreamName "read-backward-test") anyVersion evts Nothing >>= wait
+ let startFrom = eventNumber 2
+ rs <- readEventsBackward conn (StreamName "read-backward-test") startFrom 10 NoResolveLink Nothing >>= wait
case rs of
ReadSuccess sl -> do
let jss_evts = catMaybes $ fmap resolvedEventDataAsJson
@@ -228,13 +233,13 @@ readStreamEventBackwardTest conn = do
--------------------------------------------------------------------------------
readAllEventsForwardTest :: Connection -> IO ()
readAllEventsForwardTest conn = do
- sl <- readAllEventsForward conn positionStart 3 False Nothing >>= wait
+ sl <- readEventsForward conn All positionStart 3 NoResolveLink Nothing >>= wait
assertEqual "Events is not empty" False (null $ sliceEvents sl)
--------------------------------------------------------------------------------
readAllEventsBackwardTest :: Connection -> IO ()
readAllEventsBackwardTest conn = do
- sl <- readAllEventsBackward conn positionEnd 3 False Nothing >>= wait
+ sl <- readEventsBackward conn All positionEnd 3 NoResolveLink Nothing >>= wait
assertEqual "Events is not empty" False (null $ sliceEvents sl)
--------------------------------------------------------------------------------
@@ -247,7 +252,7 @@ subscribeTest conn = do
, object [ "bar" .= True]
]
evts = fmap (createEvent "foo" Nothing . withJson) jss
- sub <- subscribe conn stream False Nothing
+ sub <- subscribe conn stream NoResolveLink Nothing
_ <- waitConfirmation sub
_ <- sendEvents conn stream anyVersion evts Nothing >>= wait
let loop 3 = return []
@@ -281,7 +286,7 @@ subscribeFromTest conn = do
evts = fmap (createEvent "foo" Nothing . withJson) jss
evts2 = fmap (createEvent "foo" Nothing . withJson) jss2
_ <- sendEvents conn stream anyVersion evts Nothing >>= wait
- sub <- subscribeFrom conn stream False Nothing (Just 1) Nothing
+ sub <- subscribeFrom conn stream NoResolveLink Nothing (Just 1) Nothing
_ <- waitConfirmation sub
_ <- sendEvents conn stream anyVersion evts2 Nothing >>= wait
@@ -316,7 +321,7 @@ data SubNoStreamTest
subscribeFromNoStreamTest :: Connection -> IO ()
subscribeFromNoStreamTest conn = do
stream <- freshStreamId
- sub <- subscribeFrom conn stream False Nothing Nothing Nothing
+ sub <- subscribeFrom conn stream NoResolveLink Nothing Nothing Nothing
let loop [] = do
m <- nextEventMaybe sub
case m of
@@ -456,3 +461,49 @@ maxAgeTest conn = do
assertEqual "Should have equal timespan" (Just timespan)
(streamMetadataMaxAge m)
_ -> fail $ "Stream " <> show stream <> " doesn't exist"
+
+--------------------------------------------------------------------------------
+generateEvents :: Int -> [Value]
+generateEvents n = take n $ fmap toObj [1..]
+
+--------------------------------------------------------------------------------
+toObj :: Int -> Value
+toObj n = object [ pack (show n) .= n ]
+
+--------------------------------------------------------------------------------
+streamRegularStreamForwardTest :: Connection -> IO ()
+streamRegularStreamForwardTest conn = do
+ stream <- freshStreamId
+
+ let jss = generateEvents 10
+ evts = fmap (createEvent "foo" Nothing . withJson) jss
+ src = throwOnError $ readThroughForward conn stream NoResolveLink streamStart (Just 1) Nothing
+
+ _ <- sendEvents conn stream anyVersion evts Nothing >>= wait
+ rest <- Streaming.foldM_ check (pure [1..10]) pure src
+ assertEqual "Should be empty" [] rest
+ where
+ check (x:xs) e =
+ case resolvedEventDataAsJson e of
+ Just e | e == toObj x -> pure xs
+ | otherwise -> fail "Out of order event's appeared (stream)"
+ _ -> fail "Can't deserialized event"
+
+--------------------------------------------------------------------------------
+streamRegularStreamBackwardTest :: Connection -> IO ()
+streamRegularStreamBackwardTest conn = do
+ stream <- freshStreamId
+
+ let jss = generateEvents 10
+ evts = fmap (createEvent "foo" Nothing . withJson) jss
+ src = throwOnError $ readThroughBackward conn stream NoResolveLink streamEnd (Just 1) Nothing
+
+ _ <- sendEvents conn stream anyVersion evts Nothing >>= wait
+ rest <- Streaming.foldM_ check (pure $ reverse [1..10]) pure src
+ assertEqual "Should be empty" [] rest
+ where
+ check (x:xs) e =
+ case resolvedEventDataAsJson e of
+ Just e | e == toObj x -> pure xs
+ | otherwise -> fail "Out of order event's appeared (stream)"
+ _ -> fail "Can't deserialized event"