summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwereHamster <>2016-03-28 16:33:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2016-03-28 16:33:00 (GMT)
commitb04cde778bb3237c5ff4571bc31a311081db0224 (patch)
treeb8148b6da8f2bd6b09e68b013ab8b20951556477
parent29919adca34088cabf345cc7121cc3b7331d0327 (diff)
version 0.0.30.0.3
-rw-r--r--avers-server.cabal4
-rw-r--r--src/Avers/Server.hs130
2 files changed, 85 insertions, 49 deletions
diff --git a/avers-server.cabal b/avers-server.cabal
index 681afef..47439d3 100644
--- a/avers-server.cabal
+++ b/avers-server.cabal
@@ -1,5 +1,5 @@
name: avers-server
-version: 0.0.2
+version: 0.0.3
synopsis: Server implementation of the Avers API
description: See README.md
homepage: http://github.com/wereHamster/avers-server
@@ -36,6 +36,7 @@ library
, base64-bytestring
, bytestring
, bytestring-conversion
+ , containers
, cookie
, cryptohash
, either
@@ -45,6 +46,7 @@ library
, rethinkdb-client-driver
, servant
, servant-server
+ , stm
, text
, time
, transformers
diff --git a/src/Avers/Server.hs b/src/Avers/Server.hs
index 827f374..d55afad 100644
--- a/src/Avers/Server.hs
+++ b/src/Avers/Server.hs
@@ -16,54 +16,58 @@ module Avers.Server
) where
-import Control.Monad
-import Control.Monad.Except
+import Control.Monad
+import Control.Monad.Except
-- import Control.Monad.IO.Class
-import Control.Monad.Trans.Either
+import Control.Monad.Trans.Either
-import Data.Text (Text)
+import Control.Concurrent
+import Control.Concurrent.STM
+
+import Data.Text (Text)
-- import qualified Data.Text as T
-import qualified Data.Text.Encoding as T
+import qualified Data.Text.Encoding as T
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Base64 as BS64
+import qualified Data.Set as S
+
-import Data.Monoid
-import Data.Pool
+import Data.Monoid
-- import Data.Proxy
-- import Data.Maybe
-import Data.Time
-import Data.Aeson (ToJSON, encode, decode)
+import Data.Time
+import Data.Aeson (ToJSON, encode, decode)
-- import Data.Aeson.TH (defaultOptions)
-import Crypto.Hash.SHA256 (hashlazy)
+import Crypto.Hash.SHA256 (hashlazy)
-import Servant.API
-import Servant.Server
+import Servant.API hiding (Patch)
+import Servant.Server
-import Avers
+import Avers
-- import Avers.TH
-- import Avers.Storage
-import Avers.Storage.Expressions
-import Avers.Types
+-- import Avers.Storage.Expressions
+-- import Avers.Types
-import Avers.API
+import Avers.API
-import Avers.Server.Authorization
-import Avers.Server.Instances ()
+import Avers.Server.Authorization
+import Avers.Server.Instances ()
-import qualified Database.RethinkDB as R
+-- import qualified Database.RethinkDB as R
-import Network.HTTP.Types.Status
+import Network.HTTP.Types.Status
-import Network.Wai
-import Network.Wai.Handler.WebSockets (websocketsApp)
+import Network.Wai
+import Network.Wai.Handler.WebSockets (websocketsApp)
import qualified Network.WebSockets as WS
-import Web.Cookie
+import Web.Cookie
-import Prelude
+import Prelude
@@ -128,6 +132,7 @@ serveAversCoreAPI aversH auth =
:<|> serveCreateRelease
:<|> serveLookupRelease
:<|> serveLookupLatestRelease
+ :<|> serveFeed
:<|> serveChangeSecret
where
@@ -198,7 +203,15 @@ serveAversCoreAPI aversH auth =
}
serveDeleteObject _ _ = left err500
- serveLookupPatch _ _ _ _ = left err500
+
+
+ ----------------------------------------------------------------------------
+ -- LookupPatch
+ serveLookupPatch objId revId _cred validationToken = do
+ -- TODO: authorization
+
+ patch <- reqAvers aversH $ lookupPatch (BaseObjectId objId) revId
+ cacheableResponse validationToken patch
----------------------------------------------------------------------------
@@ -212,37 +225,58 @@ serveAversCoreAPI aversH auth =
connection <- WS.acceptRequest pendingConnection
WS.forkPingThread connection 10
- revIdData <- WS.receiveData connection
- case decode revIdData of
- Nothing -> pure ()
- Just revId -> do
- withResource (databaseHandlePool aversH) $ \handle -> do
- token <- R.start handle $
- R.SequenceChanges $
- objectPatchSequenceE (BaseObjectId objId) revId maxBound
+ chan <- changeChannel aversH
+ loop connection chan
- loop connection handle token
+ loop :: WS.Connection -> TChan Change -> IO ()
+ loop connection chan = do
+ change <- atomically $ readTChan chan
+ case change of
+ (CPatch patch) -> when (patchObjectId patch == BaseObjectId objId) $
+ WS.sendTextData connection (encode patch)
- loop :: WS.Connection -> R.Handle -> R.Token -> IO ()
- loop connection handle token = do
- res <- R.nextResult handle token :: IO (Either R.Error (R.Sequence R.ChangeNotification))
- case res of
- Left e -> print e
- Right (R.Done r) -> do
- forM_ r $ \p ->
- WS.sendTextData connection (encode $ R.cnNewValue p)
+ loop connection chan
- Right (R.Partial _ r) -> do
- forM_ r $ \p ->
- WS.sendTextData connection (encode $ R.cnNewValue p)
-
- R.continue handle token
- loop connection handle token
serveCreateRelease _ _ _ = left err500
serveLookupRelease _ _ _ _ = left err500
serveLookupLatestRelease _ _ _ = left err500
+
+
+ ----------------------------------------------------------------------------
+ -- Feed
+ serveFeed _cred req respond = respond $
+ case websocketsApp WS.defaultConnectionOptions wsApp req of
+ Nothing -> responseLBS status500 [] "This is a WebSocket endpoint"
+ Just res -> res
+ where
+ wsApp pendingConnection = do
+ subscriptions <- newTVarIO S.empty
+ connection <- WS.acceptRequest pendingConnection
+ WS.forkPingThread connection 10
+
+ void $ forkIO $ forever $ do
+ msg <- WS.receiveData connection
+ case decode msg of
+ Nothing -> pure ()
+ Just (IncludeObjectChanges objId) ->
+ atomically $ modifyTVar' subscriptions $ S.insert $ BaseObjectId objId
+
+ chan <- changeChannel aversH
+ loop connection subscriptions chan
+
+ loop :: WS.Connection -> TVar (S.Set ObjectId) -> TChan Change -> IO ()
+ loop connection subscriptions chan = do
+ change <- atomically $ readTChan chan
+ subs <- atomically $ readTVar subscriptions
+ case change of
+ (CPatch p) -> when (S.member (patchObjectId p) subs) $
+ WS.sendTextData connection (encode change)
+
+ loop connection subscriptions chan
+
+
serveChangeSecret _ _ = left err500