summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFumiakiKinoshita <>2018-08-10 08:35:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-08-10 08:35:00 (GMT)
commitcac2abe8199908fb5c2b97e8f89ade7023949549 (patch)
tree5a7a0abc664d9240f52e6d42cb57723ad3cbbe3a
version 0.10.1
-rw-r--r--ChangeLog.md3
-rw-r--r--LICENSE30
-rw-r--r--README.md31
-rw-r--r--Setup.hs2
-rw-r--r--app/client.hs100
-rw-r--r--app/server.hs31
-rw-r--r--liszt.cabal119
-rw-r--r--src/Database/Liszt.hs44
-rw-r--r--src/Database/Liszt/Internal.hs596
-rw-r--r--src/Database/Liszt/Network.hs101
-rw-r--r--src/Database/Liszt/Tracker.hs250
11 files changed, 1307 insertions, 0 deletions
diff --git a/ChangeLog.md b/ChangeLog.md
new file mode 100644
index 0000000..c84d250
--- /dev/null
+++ b/ChangeLog.md
@@ -0,0 +1,3 @@
+## 0.1
+
+Initial release
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..068f3e9
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright Fumiaki Kinoshita (c) 2018
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * Neither the name of Fumiaki Kinoshita nor the names of other
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..2b36715
--- /dev/null
+++ b/README.md
@@ -0,0 +1,31 @@
+# Liszt
+
+Liszt is an append-only key-list database.
+
+## Insertion
+
+For the sake of reliability, insertion is performed locally.
+
+```haskell
+commitFile :: FilePath -> Transaction a -> IO a
+insert :: Serialise a => Key -> a -> Transaction ()
+
+> commitFile "foo.liszt" $ insert "message" ("hello, world" :: Text)
+```
+
+## Query
+
+`lisztd` starts a server. The first argument is the root directory to find liszt
+files.
+
+```
+$ lisztd .
+```
+
+You can use the command line utility to watch a stream. `-b 0` follows a stream
+from offset 0. `-f "%p\n"` prints payloads with newlines.
+
+```
+$ liszt foo.liszt message -b 0 -f "%p\n"
+hello, world
+```
diff --git a/Setup.hs b/Setup.hs
new file mode 100644
index 0000000..9a994af
--- /dev/null
+++ b/Setup.hs
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
diff --git a/app/client.hs b/app/client.hs
new file mode 100644
index 0000000..7e28242
--- /dev/null
+++ b/app/client.hs
@@ -0,0 +1,100 @@
+{-# LANGUAGE LambdaCase, RecordWildCards #-}
+module Main where
+import Database.Liszt
+
+import Control.Monad
+import Data.Function (fix)
+import Data.Winery
+import qualified Data.ByteString.Char8 as B
+import System.Environment
+import System.IO
+import System.Console.GetOpt
+import System.Exit
+
+parseHostPort :: String -> (String -> Int -> r) -> r
+parseHostPort str k = case break (==':') str of
+ (host, ':' : port) -> k host (read port)
+ (host, _) -> k host 1886
+
+data Options = Options
+ { host :: !String
+ , timeout :: !Double
+ , ranges :: ![(Offset, Offset)]
+ , beginning :: !(Maybe Offset)
+ , format :: !String
+ }
+
+readOffset :: String -> Offset
+readOffset ('_' : n) = FromEnd (read n)
+readOffset n = SeqNo (read n)
+
+options :: [OptDescr (Options -> Options)]
+options = [Option "h" ["host"] (ReqArg (\str o -> o { host = str }) "HOST:PORT") "stream input"
+ , Option "r" ["range"] (ReqArg (\str o -> o { ranges = case break (==':') str of
+ (begin, ':' : end) -> (readOffset begin, readOffset end) : ranges o
+ _ -> (readOffset str, readOffset str) : ranges o
+ }) "FROM:TO") "ranges"
+ , Option "b" ["begin"] (ReqArg (\str o -> o { beginning = Just $! readOffset str }) "pos") "get all the contents from this position"
+ , Option "t" ["timeout"] (ReqArg (\str o -> o { timeout = read str }) "SECONDS") "Timeout"
+ , Option "f" ["format"] (ReqArg (\str o -> o { format = str }) "FORMAT") "format"
+ ]
+
+defaultOptions :: Options
+defaultOptions = Options
+ { host = "localhost"
+ , timeout = 1
+ , ranges = []
+ , beginning = Nothing
+ , format = "%p"
+ }
+
+parseFormat :: String -> (Int, B.ByteString, B.ByteString) -> IO ()
+parseFormat ('%' : c : str) t@(ofs, tag, payload) = do
+ case c of
+ 'p' -> B.hPutStr stdout payload
+ 't' -> B.hPutStr stdout tag
+ 'i' -> print ofs
+ 's' -> print (B.length payload)
+ 'W' -> do
+ let e = toEncoding t
+ print (getSize e)
+ hPutEncoding stdout e
+ '%' -> putChar '%'
+ _ -> error $ "invalid format specifier: %" ++ c : ""
+ parseFormat str t
+parseFormat ('\\' : c : str) t = do
+ case c of
+ '\\' -> putChar '\\'
+ 'n' -> putChar '\n'
+ 'r' -> putChar '\r'
+ 't' -> putChar '\t'
+ _ -> error $ "unknown escape sequence: \\" ++ c : ""
+ parseFormat str t
+parseFormat (c : str) t = putChar c >> parseFormat str t
+parseFormat [] _ = hFlush stdout
+
+main :: IO ()
+main = getOpt Permute options <$> getArgs >>= \case
+ (fs, path : name : _, []) -> do
+ let o = foldl (flip id) defaultOptions fs
+ let printer = parseFormat $ format o
+ parseHostPort (host o) withConnection (B.pack path) $ \conn -> do
+ let name' = B.pack name
+ let timeout' = floor $ timeout o * 1000000
+ let req i j = Request name' timeout' i j
+ forM_ (reverse $ ranges o) $ \(i, j) -> do
+ bss <- fetch conn $ req maxBound i j
+ mapM_ printer bss
+ forM_ (beginning o) $ \start -> do
+ bss0 <- fetch conn $ req maxBound start start
+ mapM_ printer bss0
+ unless (null bss0) $ do
+ let (start', _, _) = last bss0
+ flip fix (start' + 1) $ \self i -> do
+ bss <- fetch conn $ req 1 (SeqNo i) (SeqNo i)
+ mapM_ printer bss
+ unless (null bss) $ self $ let (j, _, _) = last bss in j + 1
+
+ (_, _, es) -> do
+ name <- getProgName
+ die $ unlines es ++ usageInfo name options
diff --git a/app/server.hs b/app/server.hs
new file mode 100644
index 0000000..661673a
--- /dev/null
+++ b/app/server.hs
@@ -0,0 +1,31 @@
+{-# LANGUAGE LambdaCase #-}
+module Main where
+
+import Database.Liszt.Network
+import System.Environment
+import System.Console.GetOpt
+import System.Exit
+
+data Options = Options
+ { port :: Int
+ }
+
+defaultOptions :: Options
+defaultOptions = Options
+ { port = 1886
+ }
+
+options :: [OptDescr (Options -> Options)]
+options = [Option "p" ["port"] (ReqArg (\e o -> o { port = read e }) "NUM") "port number"]
+
+main :: IO ()
+main = getOpt Permute options <$> getArgs >>= \case
+ (fs, args, []) -> do
+ let o = foldl (flip id) defaultOptions fs
+ startServer (port o) $ case args of
+ path : _ -> path
+ [] -> "."
+ (_, _, es) -> do
+ name <- getProgName
+ die $ unlines es ++ usageInfo name options
+
diff --git a/liszt.cabal b/liszt.cabal
new file mode 100644
index 0000000..383ce97
--- /dev/null
+++ b/liszt.cabal
@@ -0,0 +1,119 @@
+-- This file has been generated from package.yaml by hpack version 0.20.0.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: 5561a69ed2e98f7b84def2db25e066ef420542febb624a57bdbe15bb4c7c521a
+
+name: liszt
+version: 0.1
+synopsis: Append only key-list database
+description: Please see the README on GitHub at <https://github.com/fumieval/liszt#readme>
+category: Database
+homepage: https://github.com/fumieval/liszt#readme
+bug-reports: https://github.com/fumieval/liszt/issues
+author: Fumiaki Kinoshita
+maintainer: fumiexcel@gmail.com
+copyright: Copyright (c) 2018 Fumiaki Kinoshita
+license: BSD3
+license-file: LICENSE
+build-type: Simple
+cabal-version: >= 1.10
+
+extra-source-files:
+ ChangeLog.md
+ README.md
+
+source-repository head
+ type: git
+ location: https://github.com/fumieval/liszt
+
+library
+ hs-source-dirs:
+ src
+ build-depends:
+ base >=4.7 && <5
+ , binary
+ , bytestring
+ , containers
+ , deepseq
+ , directory
+ , exceptions
+ , filepath
+ , fsnotify
+ , network
+ , reflection
+ , scientific
+ , sendfile
+ , stm
+ , stm-delay
+ , text
+ , transformers
+ , unordered-containers
+ , winery
+ exposed-modules:
+ Database.Liszt
+ Database.Liszt.Tracker
+ Database.Liszt.Internal
+ Database.Liszt.Network
+ other-modules:
+ Paths_liszt
+ default-language: Haskell2010
+
+executable liszt
+ main-is: client.hs
+ hs-source-dirs:
+ app
+ build-depends:
+ base >=4.7 && <5
+ , binary
+ , bytestring
+ , containers
+ , deepseq
+ , directory
+ , exceptions
+ , filepath
+ , fsnotify
+ , liszt
+ , network
+ , reflection
+ , scientific
+ , sendfile
+ , stm
+ , stm-delay
+ , text
+ , transformers
+ , unordered-containers
+ , winery
+ other-modules:
+ Paths_liszt
+ default-language: Haskell2010
+
+executable lisztd
+ main-is: server.hs
+ hs-source-dirs:
+ app
+ ghc-options: -threaded
+ build-depends:
+ base >=4.7 && <5
+ , binary
+ , bytestring
+ , containers
+ , deepseq
+ , directory
+ , exceptions
+ , filepath
+ , fsnotify
+ , liszt
+ , network
+ , reflection
+ , scientific
+ , sendfile
+ , stm
+ , stm-delay
+ , text
+ , transformers
+ , unordered-containers
+ , winery
+ other-modules:
+ Paths_liszt
+ default-language: Haskell2010
diff --git a/src/Database/Liszt.hs b/src/Database/Liszt.hs
new file mode 100644
index 0000000..048e626
--- /dev/null
+++ b/src/Database/Liszt.hs
@@ -0,0 +1,44 @@
+module Database.Liszt (
+ openLiszt,
+ closeLiszt,
+ withLiszt,
+ LisztHandle,
+ -- * Writer interface
+ Key,
+ Tag,
+ Transaction,
+ clear,
+ insert,
+ insertTagged,
+ insertRaw,
+ commit,
+ commitFile,
+ -- * Reader
+ Offset(..),
+ Request(..),
+ defRequest,
+ Connection,
+ withConnection,
+ fetch
+ ) where
+
+import Database.Liszt.Internal
+import Database.Liszt.Network
+import Database.Liszt.Tracker
+import Data.Winery
+
+-- | Commit a 'Transaction' to a file.
+commitFile :: FilePath -> Transaction a -> IO a
+commitFile path m = withLiszt path $ \h -> commit h m
+
+-- | Insert a value.
+insert :: Serialise a => Key -> a -> Transaction ()
+insert k v = insertRaw k mempty (toEncoding v)
+{-# INLINE insert #-}
+
+-- | Insert a value with a tag (e.g. timestamp).
+-- Tags can be used to perform `WineryTag` query.
+-- Tag values should be monotonically increasing but this is not checked.
+insertTagged :: (Serialise t, Serialise a) => Key -> t -> a -> Transaction ()
+insertTagged k t v = insertRaw k (toEncoding t) (toEncoding v)
+{-# INLINE insertTagged #-}
diff --git a/src/Database/Liszt/Internal.hs b/src/Database/Liszt/Internal.hs
new file mode 100644
index 0000000..94a9868
--- /dev/null
+++ b/src/Database/Liszt/Internal.hs
@@ -0,0 +1,596 @@
+{-# LANGUAGE LambdaCase, DeriveTraversable, DeriveGeneric #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE BangPatterns #-}
+module Database.Liszt.Internal (
+ Key
+ , Tag
+ , LisztHandle(..)
+ , openLiszt
+ , closeLiszt
+ , withLiszt
+ -- * Writing
+ , clear
+ , insertRaw
+ , commit
+ , Transaction
+ , TransactionState
+ -- * Reading
+ , availableKeys
+ -- * Frame
+ , Frame(..)
+ , decodeFrame
+ , forceSpine
+ -- * Fetching
+ , Fetchable(..)
+ , RawPointer(..)
+ -- * Footer
+ , fetchRoot
+ , footerSize
+ , isFooter
+ , lookupSpine
+ -- * Spine
+ , Spine
+ , spineLength
+ , QueryResult
+ , takeSpine
+ , dropSpine
+ , takeSpineWhile
+ , dropSpineWhile
+ ) where
+
+import Control.Concurrent
+import Control.DeepSeq
+import Control.Monad
+import Control.Monad.Catch
+import Control.Monad.IO.Class
+import Control.Monad.Trans.State.Strict
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Internal as B
+import qualified Data.IntMap.Strict as IM
+import Data.IORef
+import Data.Proxy
+import Data.Word
+import Data.Winery
+import qualified Data.Winery.Internal.Builder as WB
+import Foreign.ForeignPtr
+import Foreign.Ptr
+import GHC.Generics (Generic)
+import System.Directory
+import System.IO
+
+type Key = B.ByteString
+
+-- | Tag is an extra value attached to a payload. This can be used to perform
+-- a binary search.
+type Tag = WB.Encoding
+
+type Spine a = [(Int, a)]
+
+newtype KeyPointer = KeyPointer RawPointer deriving (Show, Eq, Serialise, NFData)
+
+data RawPointer = RP !Int !Int deriving (Show, Eq, Generic)
+instance Serialise RawPointer
+instance NFData RawPointer where
+ rnf r = r `seq` ()
+
+data Frame a = Empty
+ | Leaf1 !KeyPointer !(Spine a)
+ | Leaf2 !KeyPointer !(Spine a) !KeyPointer !(Spine a)
+ | Node2 !a !KeyPointer !(Spine a) !a
+ | Node3 !a !KeyPointer !(Spine a) !a !KeyPointer !(Spine a) !a
+ | Tree !Tag !RawPointer !a !a
+ | Leaf !Tag !RawPointer
+ deriving (Generic, Show, Eq, Functor, Foldable, Traversable)
+
+instance Serialise a => Serialise (Frame a)
+
+decodeFrame :: B.ByteString -> Frame RawPointer
+decodeFrame = either (error . show) id $ getDecoder
+ $ schema (Proxy :: Proxy (Frame RawPointer))
+
+data LisztHandle = LisztHandle
+ { hPayload :: !Handle
+ , refBuffer :: MVar (Int, ForeignPtr Word8)
+ , keyCache :: IORef (IM.IntMap Key)
+ , refModified :: IORef Bool
+ , handleLock :: MVar ()
+ }
+
+openLiszt :: MonadIO m => FilePath -> m LisztHandle
+openLiszt path = liftIO $ do
+ exist <- doesFileExist path
+ hPayload <- openBinaryFile path ReadWriteMode
+ unless exist $ B.hPutStr hPayload emptyFooter
+ buf <- B.mallocByteString 4096
+ refBuffer <- newMVar (4096, buf)
+ keyCache <- newIORef IM.empty
+ handleLock <- newMVar ()
+ refModified <- newIORef False
+ return LisztHandle{..}
+
+closeLiszt :: MonadIO m => LisztHandle -> m ()
+closeLiszt lh = liftIO $ hClose $ hPayload lh
+
+withLiszt :: (MonadIO m, MonadMask m) => FilePath -> (LisztHandle -> m a) -> m a
+withLiszt path = bracket (openLiszt path) closeLiszt
+
+--------------------------------------------------------------------------------
+-- Transaction
+
+data StagePointer = Commited !RawPointer | Uncommited !Int deriving Eq
+
+data TransactionState = TS
+ { dbHandle :: !LisztHandle
+ , freshId :: !Int
+ , pending :: !(IM.IntMap (Frame StagePointer))
+ , currentRoot :: !(Frame StagePointer)
+ , currentPos :: !Int
+ }
+
+type Transaction = StateT TransactionState IO
+
+-- | Replace the specified key with an empty list.
+clear :: Key -> Transaction ()
+clear key = do
+ root <- gets currentRoot
+ insertF key (const $ return []) root >>= \case
+ Pure t -> modify $ \ts -> ts { currentRoot = t }
+ Carry l k a r -> modify $ \ts -> ts { currentRoot = Node2 l k a r }
+
+append :: LisztHandle -> (Handle -> IO a) -> IO a
+append LisztHandle{..} cont = do
+ hSeek hPayload SeekFromEnd 0
+ writeIORef refModified True
+ cont hPayload
+
+insertRaw :: Key -> Tag -> Encoding -> Transaction ()
+insertRaw key tag payload = do
+ lh <- gets dbHandle
+ ofs <- gets currentPos
+ liftIO $ append lh $ \h -> hPutEncoding h payload
+ root <- gets currentRoot
+ modify $ \ts -> ts { currentPos = ofs + WB.getSize payload }
+ insertF key (insertSpine tag (ofs `RP` WB.getSize payload)) root >>= \case
+ Pure t -> modify $ \ts -> ts { currentRoot = t }
+ Carry l k a r -> modify $ \ts -> ts { currentRoot = Node2 l k a r }
+
+addFrame :: Frame StagePointer -> Transaction StagePointer
+addFrame f = state $ \ts -> (Uncommited (freshId ts), ts
+ { freshId = freshId ts + 1
+ , pending = IM.insert (freshId ts) f (pending ts) })
+
+allocKey :: Key -> Transaction KeyPointer
+allocKey key = do
+ lh <- gets dbHandle
+ ofs <- gets currentPos
+ liftIO $ append lh $ \h -> do
+ B.hPutStr h key
+ modifyIORef' (keyCache lh) (IM.insert ofs key)
+ modify $ \ts -> ts { currentPos = ofs + B.length key }
+ return $ KeyPointer $ RP ofs (B.length key)
+
+commit :: MonadIO m => LisztHandle -> Transaction a -> m a
+commit h transaction = liftIO $ modifyMVar (handleLock h) $ const $ do
+ offset0 <- fromIntegral <$> hFileSize h'
+ root <- fetchRoot h
+ do
+ (a, TS _ _ pendings root' offset1) <- runStateT transaction
+ $ TS h 0 IM.empty (fmap Commited root) offset0
+
+ let substP (Commited ofs) = return ofs
+ substP (Uncommited i) = case IM.lookup i pendings of
+ Just f -> substF f
+ Nothing -> error "panic!"
+ substF Empty = return (RP 0 0)
+ substF (Leaf1 pk pv) = do
+ pv' <- traverse (traverse substP) pv
+ write (Leaf1 pk pv')
+ substF (Leaf2 pk pv qk qv) = do
+ pv' <- traverse (traverse substP) pv
+ qv' <- traverse (traverse substP) qv
+ write (Leaf2 pk pv' qk qv')
+ substF (Node2 l pk pv r) = do
+ l' <- substP l
+ pv' <- traverse (traverse substP) pv
+ r' <- substP r
+ write (Node2 l' pk pv' r')
+ substF (Node3 l pk pv m qk qv r) = do
+ l' <- substP l
+ pv' <- traverse (traverse substP) pv
+ m' <- substP m
+ qv' <- traverse (traverse substP) qv
+ r' <- substP r
+ write (Node3 l' pk pv' m' qk qv' r')
+ substF (Tree t p l r) = do
+ l' <- substP l
+ r' <- substP r
+ write (Tree t p l' r')
+ substF (Leaf t p) = write (Leaf t p)
+
+ writeFooter offset1 $ substF root'
+ return ((), a)
+ `onException` writeFooter offset0 (write root)
+ where
+ writeFooter ofs m = do
+ modified <- readIORef (refModified h)
+ when modified $ do
+ hSeek h' SeekFromEnd 0
+ RP _ len <- evalStateT m ofs
+ B.hPutStr h' $ B.drop len emptyFooter
+ hFlush h'
+ writeIORef (refModified h) False
+
+ h' = hPayload h
+ write :: Frame RawPointer -> StateT Int IO RawPointer
+ write f = do
+ ofs <- get
+ let e = toEncoding f
+ liftIO $ hPutEncoding h' e
+ put $! ofs + WB.getSize e
+ return $ RP ofs (WB.getSize e)
+
+fetchKeyT :: KeyPointer -> Transaction Key
+fetchKeyT p = gets dbHandle >>= \h -> liftIO (fetchKey h p)
+
+fetchStage :: StagePointer -> Transaction (Frame StagePointer)
+fetchStage (Commited p) = do
+ h <- gets dbHandle
+ liftIO $ fmap Commited <$> fetchFrame h p
+fetchStage (Uncommited i) = gets pending >>= return . maybe (error "fetch: not found") id . IM.lookup i
+
+insertF :: Key
+ -> (Spine StagePointer -> Transaction (Spine StagePointer))
+ -> Frame StagePointer
+ -> Transaction (Result StagePointer)
+insertF k u Empty = Pure <$> (Leaf1 <$> allocKey k <*> u [])
+insertF k u (Leaf1 pk pv) = do
+ vpk <- fetchKeyT pk
+ Pure <$> case compare k vpk of
+ LT -> do
+ kp <- allocKey k
+ (\v -> Leaf2 kp v pk pv) <$> u []
+ EQ -> Leaf1 pk <$> u pv
+ GT -> do
+ kp <- allocKey k
+ Leaf2 pk pv kp <$> u []
+insertF k u (Leaf2 pk pv qk qv) = do
+ vpk <- fetchKeyT pk
+ case compare k vpk of
+ LT -> do
+ v <- u []
+ kp <- allocKey k
+ l <- addFrame $ Leaf1 kp v
+ r <- addFrame $ Leaf1 qk qv
+ return $ Carry l pk pv r
+ EQ -> do
+ v <- u pv
+ return $ Pure $ Leaf2 pk v qk qv
+ GT -> do
+ vqk <- fetchKeyT qk
+ case compare k vqk of
+ LT -> do
+ v <- u []
+ l <- addFrame $ Leaf1 pk pv
+ r <- addFrame $ Leaf1 qk qv
+ kp <- allocKey k
+ return $ Carry l kp v r
+ EQ -> do
+ v <- u qv
+ return $ Pure $ Leaf2 pk pv qk v
+ GT -> do
+ v <- u []
+ kp <- allocKey k
+ l <- addFrame $ Leaf1 pk pv
+ r <- addFrame $ Leaf1 kp v
+ return $ Carry l qk qv r
+insertF k u (Node2 l pk0 pv0 r) = do
+ vpk0 <- fetchKeyT pk0
+ case compare k vpk0 of
+ LT -> do
+ fl <- fetchStage l
+ insertF k u fl >>= \case
+ Pure l' -> do
+ l'' <- addFrame l'
+ return $ Pure $ Node2 l'' pk0 pv0 r
+ Carry l' ck cv r' -> return $ Pure $ Node3 l' ck cv r' pk0 pv0 r
+ EQ -> do
+ v <- u pv0
+ return $ Pure $ Node2 l pk0 v r
+ GT -> do
+ fr <- fetchStage r
+ insertF k u fr >>= \case
+ Pure r' -> do
+ r'' <- addFrame r'
+ return $ Pure $ Node2 l pk0 pv0 r''
+ Carry l' ck cv r' -> return $ Pure $ Node3 l pk0 pv0 l' ck cv r'
+insertF k u (Node3 l pk0 pv0 m qk0 qv0 r) = do
+ vpk0 <- fetchKeyT pk0
+ case compare k vpk0 of
+ LT -> do
+ fl <- fetchStage l
+ insertF k u fl >>= \case
+ Pure l' -> do
+ l'' <- addFrame l'
+ return $ Pure $ Node3 l'' pk0 pv0 m qk0 qv0 r
+ Carry l' ck cv r' -> do
+ bl <- addFrame (Node2 l' ck cv r')
+ br <- addFrame (Node2 m qk0 qv0 r)
+ return $ Pure $ Node2 bl pk0 pv0 br
+ EQ -> do
+ v <- u pv0
+ return $ Pure $ Node3 l pk0 v m qk0 qv0 r
+ GT -> do
+ vqk0 <- fetchKeyT qk0
+ case compare k vqk0 of
+ LT -> do
+ fm <- fetchStage m
+ insertF k u fm >>= \case
+ Pure m' -> do
+ m'' <- addFrame m'
+ return $ Pure $ Node3 l pk0 pv0 m'' qk0 qv0 r
+ Carry l' ck cv r' -> do
+ bl <- addFrame $ Node2 l pk0 pv0 l'
+ br <- addFrame $ Node2 r' qk0 qv0 r
+ return $ Pure $ Node2 bl ck cv br
+ EQ -> do
+ v <- u qv0
+ return $ Pure $ Node3 l pk0 pv0 m qk0 v r
+ GT -> do
+ fr <- fetchStage r
+ insertF k u fr >>= \case
+ Pure r' -> do
+ r'' <- addFrame r'
+ return $ Pure $ Node3 l pk0 pv0 m qk0 qv0 r''
+ Carry l' ck cv r' -> do
+ bl <- addFrame $ Node2 l pk0 pv0 m
+ br <- addFrame $ Node2 l' ck cv r'
+ return $ Pure $ Node2 bl qk0 qv0 br
+insertF _ _ (Tree _ _ _ _) = fail "Unexpected Tree"
+insertF _ _ (Leaf _ _) = fail "Unexpected Leaf"
+
+data Result a = Pure (Frame a)
+ | Carry !a !KeyPointer !(Spine a) !a
+
+insertSpine :: Tag -> RawPointer -> Spine StagePointer -> Transaction (Spine StagePointer)
+insertSpine tag p ((m, x) : (n, y) : ss) | m == n = do
+ t <- addFrame $ Tree tag p x y
+ return $ (2 * m + 1, t) : ss
+insertSpine tag p ss = do
+ t <- addFrame $ Leaf tag p
+ return $ (1, t) : ss
+
+--------------------------------------------------------------------------------
+-- Fetching
+
+class Fetchable a where
+ fetchFrame :: LisztHandle -> a -> IO (Frame a)
+
+instance Fetchable RawPointer where
+ fetchFrame h (RP ofs len) = fetchFrame'
+ (hSeek (hPayload h) AbsoluteSeek (fromIntegral ofs)) h len
+
+fetchKey :: LisztHandle -> KeyPointer -> IO Key
+fetchKey LisztHandle{..} (KeyPointer (RP ofs len)) = do
+ cache <- readIORef keyCache
+ case IM.lookup ofs cache of
+ Just key -> return key
+ Nothing -> do
+ hSeek hPayload AbsoluteSeek (fromIntegral ofs)
+ key <- B.hGet hPayload len
+ modifyIORef' keyCache (IM.insert ofs key)
+ return key
+
+fetchFrame' :: IO () -> LisztHandle -> Int -> IO (Frame RawPointer)
+fetchFrame' seek h len = modifyMVar (refBuffer h) $ \(blen, buf) -> do
+ seek
+ (blen', buf') <- if blen < len
+ then do
+ buf' <- B.mallocByteString len
+ return (len, buf')
+ else return (blen, buf)
+ f <- withForeignPtr buf' $ \ptr -> do
+ _ <- hGetBuf (hPayload h) ptr len
+ let f = decodeFrame $ B.PS buf' 0 len
+ forceSpine f
+ return ((blen', buf'), f)
+
+lookupSpine :: Fetchable a => LisztHandle -> Key -> Frame a -> IO (Maybe (Spine a))
+lookupSpine h k (Leaf1 p v) = do
+ vp <- fetchKey h p
+ if k == vp then return (Just v) else return Nothing
+lookupSpine h k (Leaf2 p u q v) = do
+ vp <- fetchKey h p
+ if k == vp then return (Just u) else do
+ vq <- fetchKey h q
+ return $ if k == vq then Just v else Nothing
+lookupSpine h k (Node2 l p v r) = do
+ vp <- fetchKey h p
+ case compare k vp of
+ LT -> fetchFrame h l >>= lookupSpine h k
+ EQ -> return (Just v)
+ GT -> fetchFrame h r >>= lookupSpine h k
+lookupSpine h k (Node3 l p u m q v r) = do
+ vp <- fetchKey h p
+ case compare k vp of
+ LT -> fetchFrame h l >>= lookupSpine h k
+ EQ -> return (Just u)
+ GT -> do
+ vq <- fetchKey h q
+ case compare k vq of
+ LT -> fetchFrame h m >>= lookupSpine h k
+ EQ -> return (Just v)
+ GT -> fetchFrame h r >>= lookupSpine h k
+lookupSpine _ _ _ = return Nothing
+
+availableKeys :: LisztHandle -> Frame RawPointer -> IO [Key]
+availableKeys _ Empty = return []
+availableKeys h (Leaf1 k _) = pure <$> fetchKey h k
+availableKeys h (Leaf2 j _ k _) = sequence [fetchKey h j, fetchKey h k]
+availableKeys h (Node2 l k _ r) = do
+ lks <- fetchFrame h l >>= availableKeys h
+ vk <- fetchKey h k
+ rks <- fetchFrame h r >>= availableKeys h
+ return $ lks ++ vk : rks
+availableKeys h (Node3 l j _ m k _ r) = do
+ lks <- fetchFrame h l >>= availableKeys h
+ vj <- fetchKey h j
+ mks <- fetchFrame h m >>= availableKeys h
+ vk <- fetchKey h k
+ rks <- fetchFrame h r >>= availableKeys h
+ return $ lks ++ vj : mks ++ vk : rks
+availableKeys _ _ = fail "availableKeys: unexpected frame"
+
+--------------------------------------------------------------------------------
+-- Footer (root node)
+
+footerSize :: Int
+footerSize = 256
+
+emptyFooter :: B.ByteString
+emptyFooter = B.pack [0,14,171,160,140,150,185,18,22,70,203,145,129,232,42,76,81,176,163,195,96,209,8,74,97,123,57,136,107,174,241,142,100,164,181,138,253,170,25,77,12,191,212,224,142,167,215,73,48,0,2,170,226,114,8,29,141,85,243,179,81,11,59,246,62,189,202,254,56,140,227,195,189,118,152,26,106,81,4,121,152,72,247,119,111,128,75,242,29,96,157,190,170,1,57,77,61,132,72,8,233,94,254,18,197,152,128,15,174,9,91,78,125,21,72,250,179,176,176,47,230,45,255,228,214,223,28,61,123,159,104,233,131,39,88,245,13,242,228,48,17,119,159,173,71,172,238,69,137,141,153,133,79,24,81,242,19,21,209,44,120,69,180,103,100,185,189,191,50,132,52,229,248,12,207,134,45,241,2,217,112,21,239,65,39,30,33,16,147,152,52,204,221,56,87,191,235,235,173,181,106,165,37,52,245,221,13,80,91,207,224,95,157,222,3,210,54,28,99,1,7,50,189,163,141,244,200,101,250,61,48,10,243,248,153,98,224,73,227,121,72,156,228,205,43,82,166,48,85,132,0,76,73,83,90,84]
+
+isFooter :: B.ByteString -> Bool
+isFooter bs = B.drop 128 bs == B.drop 128 emptyFooter
+
+fetchRoot :: LisztHandle -> IO (Frame RawPointer)
+fetchRoot h = fetchFrame'
+ (hSeek (hPayload h) SeekFromEnd (-fromIntegral footerSize)) h footerSize
+
+--------------------------------------------------------------------------------
+-- Spine operations
+
+spineLength :: Spine a -> Int
+spineLength = sum . map fst
+
+type QueryResult = (Tag, RawPointer)
+
+dropSpine :: Fetchable a => LisztHandle -> Int -> Spine a -> IO (Spine a)
+dropSpine _ _ [] = return []
+dropSpine _ 0 s = return s
+dropSpine h n0 ((siz0, t0) : xs0)
+ | siz0 <= n0 = dropSpine h (n0 - siz0) xs0
+ | otherwise = dropTree n0 siz0 t0 xs0
+ where
+ dropTree 0 siz t xs = return $ (siz, t) : xs
+ dropTree n siz t xs = fetchFrame h t >>= \case
+ Tree _ _ l r
+ | n == 1 -> return $ (siz', l) : (siz', r) : xs
+ | n <= siz' -> dropTree (n - 1) siz' l ((siz', r) : xs)
+ | otherwise -> dropTree (n - siz' - 1) siz' r xs
+ Leaf _ _ -> return xs
+ _ -> error $ "dropTree: unexpected frame"
+ where
+ siz' = siz `div` 2
+
+takeSpine :: Fetchable a => LisztHandle -> Int -> Spine a -> [QueryResult] -> IO [QueryResult]
+takeSpine _ n _ ps | n <= 0 = return ps
+takeSpine _ _ [] ps = return ps
+takeSpine h n ((siz, t) : xs) ps
+ | n >= siz = takeAll h t ps >>= takeSpine h (n - siz) xs
+ | otherwise = takeTree h n siz t ps
+
+takeTree :: Fetchable a => LisztHandle -> Int -> Int -> a -> [QueryResult] -> IO [QueryResult]
+takeTree _ n _ _ ps | n <= 0 = return ps
+takeTree h n siz t ps = fetchFrame h t >>= \case
+ Tree tag p l r
+ | n == 1 -> return $ (tag, p) : ps
+ | n <= siz' -> takeTree h (n - 1) siz' l ((tag, p) : ps)
+ | otherwise -> do
+ ps' <- takeAll h l ((tag, p) : ps)
+ takeTree h (n - siz' - 1) siz' r ps'
+ Leaf tag p -> return $ (tag, p) : ps
+ _ -> error $ "takeTree: unexpected frame"
+ where
+ siz' = siz `div` 2
+
+takeAll :: Fetchable a => LisztHandle -> a -> [QueryResult] -> IO [QueryResult]
+takeAll h t ps = fetchFrame h t >>= \case
+ Tree tag p l r -> takeAll h l ((tag, p) : ps) >>= takeAll h r
+ Leaf tag p -> return ((tag, p) : ps)
+ _ -> error $ "takeAll: unexpected frame"
+
+takeSpineWhile :: Fetchable a
+ => (Tag -> Bool)
+ -> LisztHandle
+ -> Spine a
+ -> [QueryResult] -> IO [QueryResult]
+takeSpineWhile cond h = go where
+ go (t0 : ((siz, t) : xs)) ps = fetchFrame h t >>= \case
+ Leaf tag p
+ | cond tag -> takeAll h (snd t0) ps >>= go xs . ((tag, p):)
+ | otherwise -> inner t0 ps
+ Tree tag p l r
+ | cond tag -> takeAll h (snd t0) ps
+ >>= go ((siz', l) : (siz', r) : xs) . ((tag, p):)
+ | otherwise -> inner t0 ps
+ _ -> error "takeSpineWhile: unexpected frame"
+ where
+ siz' = siz `div` 2
+ go [t] ps = inner t ps
+ go [] ps = return ps
+
+ inner (siz, t) ps = fetchFrame h t >>= \case
+ Leaf tag p
+ | cond tag -> return $ (tag, p) : ps
+ | otherwise -> return ps
+ Tree tag p l r
+ | cond tag -> go [(siz', l), (siz', r)] ((tag, p) : ps)
+ | otherwise -> return ps
+ _ -> error "takeSpineWhile: unexpected frame"
+ where
+ siz' = siz `div` 2
+
+dropSpineWhile :: Fetchable a
+ => (Tag -> Bool)
+ -> LisztHandle
+ -> Spine a
+ -> IO (Maybe (Int, QueryResult, Spine a))
+dropSpineWhile cond h = go 0 where
+ go !total (t0@(siz0, _) : ts@((siz, t) : xs)) = fetchFrame h t >>= \case
+ Leaf tag _
+ | cond tag -> go (total + siz0 + siz) xs
+ | otherwise -> dropTree total t0 ts
+ Tree tag _ l r
+ | cond tag -> go (total + siz0 + 1) $ (siz', l) : (siz', r) : xs
+ | otherwise -> dropTree total t0 ts
+ _ -> error "dropSpineWhile: unexpected frame"
+ where
+ siz' = siz `div` 2
+ go total (t : ts) = dropTree total t ts
+ go _ [] = return Nothing
+
+ dropTree !total (siz, t) ts = fetchFrame h t >>= \case
+ Leaf tag p
+ | cond tag -> go (total + 1) ts
+ | otherwise -> return $ Just (total, (tag, p), ts)
+ Tree tag p l r
+ | cond tag -> go (total + 1) $ (siz', l) : (siz', r) : ts
+ | otherwise -> return $ Just (total, (tag, p), (siz', l) : (siz', r) : ts)
+ _ -> error "dropSpineWhile: unexpected frame"
+ where
+ siz' = siz `div` 2
+
+--------------------------------------------------------------------------------
+-- Magical
+
+copyByteString :: B.ByteString -> IO B.ByteString
+copyByteString (B.PS fp ofs len) = do
+ fp' <- B.mallocByteString len
+ withForeignPtr fp' $ \dst -> withForeignPtr fp
+ $ \src -> B.memcpy dst (src `plusPtr` ofs) len
+ return (B.PS fp' 0 len)
+
+forceSpine :: NFData a => Frame a -> IO (Frame a)
+forceSpine f = case f of
+ Empty -> return Empty
+ Leaf1 _ s -> rnf s `seq` return f
+ Leaf2 _ s _ t -> rnf s `seq` rnf t `seq` return f
+ Node2 _ _ s _ -> rnf s `seq` return f
+ Node3 _ _ s _ _ t _ -> rnf s `seq` rnf t `seq` return f
+ Tree tag p l r -> do
+ tag' <- WB.bytes <$> copyByteString (WB.toByteString tag)
+ return (Tree tag' p l r)
+ Leaf tag p -> do
+ tag' <- WB.bytes <$> copyByteString (WB.toByteString tag)
+ return (Leaf tag' p)
diff --git a/src/Database/Liszt/Network.hs b/src/Database/Liszt/Network.hs
new file mode 100644
index 0000000..f8712f8
--- /dev/null
+++ b/src/Database/Liszt/Network.hs
@@ -0,0 +1,101 @@
+{-# LANGUAGE LambdaCase #-}
+module Database.Liszt.Network
+ ( startServer
+ , Connection
+ , withConnection
+ , connect
+ , disconnect
+ , fetch) where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+import Database.Liszt.Tracker
+import Database.Liszt.Internal (hPayload, RawPointer(..))
+import Data.Binary
+import Data.Binary.Get
+import Data.Winery
+import qualified Data.Winery.Internal.Builder as WB
+import qualified Data.ByteString.Char8 as B
+import qualified Data.ByteString.Lazy as BL
+import qualified Network.Socket.SendFile.Handle as SF
+import qualified Network.Socket.ByteString as SB
+import qualified Network.Socket as S
+import System.IO
+import Text.Read (readMaybe)
+
+respond :: Tracker -> S.Socket -> IO ()
+respond tracker conn = do
+ msg <- SB.recv conn 4096
+ req <- try (evaluate $ decodeCurrent msg) >>= \case
+ Left e -> throwIO $ WineryError e
+ Right a -> return a
+ unless (B.null msg) $ handleRequest tracker req $ \lh lastSeqNo offsets -> do
+ let count = length offsets
+ _ <- SB.send conn $ encodeResp $ Right count
+ forM_ (zip [lastSeqNo - count + 1..] offsets) $ \(i, (tag, RP pos len)) -> do
+ SB.sendAll conn $ WB.toByteString $ mconcat
+ [ WB.word64 (fromIntegral i)
+ , WB.word64 (fromIntegral $ WB.getSize tag), tag
+ , WB.word64 $ fromIntegral len]
+ SF.sendFile' conn (hPayload lh) (fromIntegral pos) (fromIntegral len)
+
+startServer :: Int -> FilePath -> IO ()
+startServer port prefix = withLisztReader prefix $ \env -> do
+ let hints = S.defaultHints { S.addrFlags = [S.AI_NUMERICHOST, S.AI_NUMERICSERV], S.addrSocketType = S.Stream }
+ addr:_ <- S.getAddrInfo (Just hints) (Just "0.0.0.0") (Just $ show port)
+ bracket (S.socket (S.addrFamily addr) (S.addrSocketType addr) (S.addrProtocol addr)) S.close $ \sock -> do
+ S.setSocketOption sock S.ReuseAddr 1
+ S.setSocketOption sock S.NoDelay 1
+ S.bind sock $ S.SockAddrInet (fromIntegral port) (S.tupleToHostAddress (0,0,0,0))
+ S.listen sock 2
+ forever $ do
+ (conn, _) <- S.accept sock
+ forkFinally (do
+ path <- decode . BL.fromStrict <$> SB.recv conn 4096
+ withTracker env path $ \t -> do
+ SB.sendAll conn $ B.pack "READY"
+ forever $ respond t conn)
+ $ \result -> do
+ case result of
+ Left ex -> case fromException ex of
+ Just e -> SB.sendAll conn $ encodeResp $ Left $ show (e :: LisztError)
+ Nothing -> hPutStrLn stderr $ show ex
+ Right _ -> return ()
+ S.close conn
+
+encodeResp :: Either String Int -> B.ByteString
+encodeResp = BL.toStrict . encode
+
+newtype Connection = Connection (MVar S.Socket)
+
+withConnection :: String -> Int -> B.ByteString -> (Connection -> IO r) -> IO r
+withConnection host port path = bracket (connect host port path) disconnect
+
+connect :: String -> Int -> B.ByteString -> IO Connection
+connect host port path = do
+ let hints = S.defaultHints { S.addrFlags = [S.AI_NUMERICSERV], S.addrSocketType = S.Stream }
+ addr:_ <- S.getAddrInfo (Just hints) (Just host) (Just $ show port)
+ sock <- S.socket (S.addrFamily addr) (S.addrSocketType addr) (S.addrProtocol addr)
+ S.connect sock $ S.addrAddress addr
+ SB.sendAll sock $ BL.toStrict $ encode path
+ _ <- SB.recv sock 4096
+ Connection <$> newMVar sock
+
+disconnect :: Connection -> IO ()
+disconnect (Connection sock) = takeMVar sock >>= S.close
+
+fetch :: Connection -> Request -> IO [(Int, B.ByteString, B.ByteString)]
+fetch (Connection msock) req = modifyMVar msock $ \sock -> do
+ SB.sendAll sock $ serialiseOnly req
+ go sock $ runGetIncremental $ get >>= \case
+ Left e -> case readMaybe e of
+ Just e' -> throw (e' :: LisztError)
+ Nothing -> fail $ "Unknown error: " ++ show e
+ Right n -> replicateM n ((,,) <$> get <*> get <*> get)
+ where
+ go sock (Done _ _ a) = return (sock, a)
+ go sock (Partial cont) = do
+ bs <- SB.recv sock 4096
+ if B.null bs then go sock $ cont Nothing else go sock $ cont $ Just bs
+ go _ (Fail _ _ str) = fail $ show req ++ ": " ++ str
diff --git a/src/Database/Liszt/Tracker.hs b/src/Database/Liszt/Tracker.hs
new file mode 100644
index 0000000..f5c2805
--- /dev/null
+++ b/src/Database/Liszt/Tracker.hs
@@ -0,0 +1,250 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE UndecidableInstances #-}
+module Database.Liszt.Tracker
+ ( Offset(..)
+ , Request(..)
+ , defRequest
+ , handleRequest
+ , LisztError(..)
+ , LisztReader
+ , withLisztReader
+ , Tracker
+ , withTracker
+ )where
+
+import Control.Applicative
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Concurrent.STM.Delay
+import Control.Exception
+import Control.Monad
+import Database.Liszt.Internal
+import qualified Data.ByteString.Char8 as B
+import qualified Data.ByteString.Internal as B
+import qualified Data.IntMap.Strict as IM
+import qualified Data.HashMap.Strict as HM
+import Data.Scientific (Scientific)
+import Data.Reflection (Given(..), give)
+import Data.Text (Text)
+import Data.Winery
+import Foreign.ForeignPtr
+import qualified Data.Winery.Internal.Builder as WB
+import GHC.Generics (Generic)
+import System.Directory
+import System.FilePath
+import System.FSNotify
+import System.IO
+
+data Offset = SeqNo !Int
+ | FromEnd !Int
+ | WineryTag !Schema ![Text] !Scientific
+ deriving (Show, Generic)
+instance Serialise Offset
+
+data Request = Request
+ { reqKey :: !Key
+ , reqTimeout :: !Int
+ , reqLimit :: !Int
+ , reqFrom :: !Offset
+ , reqTo :: !Offset
+ } deriving (Show, Generic)
+instance Serialise Request
+
+defRequest :: Key -> Request
+defRequest k = Request
+ { reqKey = k
+ , reqTimeout = 0
+ , reqFrom = FromEnd 1
+ , reqTo = FromEnd 1
+ , reqLimit = maxBound
+ }
+
+data LisztError = MalformedRequest
+ | InvalidRequest
+ | StreamNotFound
+ | FileNotFound
+ | IndexNotFound
+ | WinerySchemaError !String
+ | WineryError !DecodeException
+ deriving (Show, Read)
+instance Exception LisztError
+
+data Tracker = Tracker
+ { vRoot :: !(TVar (Frame CachePointer))
+ , vUpdated :: !(TVar Bool)
+ , vPending :: !(TVar [STM (IO ())])
+ , vReaders :: !(TVar Int)
+ , followThread :: !ThreadId
+ , trackerPath :: !B.ByteString
+ , streamHandle :: !LisztHandle
+ , cache :: !Cache
+ }
+
+data Cache = Cache
+ { primaryCache :: TVar (IM.IntMap (Frame CachePointer))
+ , secondaryCache :: TVar (IM.IntMap (Frame CachePointer))
+ }
+
+newtype CachePointer = CachePointer RawPointer
+
+instance Given Cache => Fetchable CachePointer where
+ fetchFrame h (CachePointer p@(RP ofs _)) = join $ atomically $ do
+ let Cache{..} = given
+ pcache <- readTVar primaryCache
+ case IM.lookup ofs pcache of
+ Just x -> return (pure x)
+ Nothing -> do
+ scache <- readTVar secondaryCache
+ case IM.lookup ofs scache of
+ Just x -> do
+ writeTVar primaryCache $! IM.insert ofs x pcache
+ return (pure x)
+ Nothing -> return $ do
+ x <- fmap CachePointer <$> fetchFrame h p
+ atomically $ modifyTVar' primaryCache $ IM.insert ofs x
+ return x
+
+flipCache :: Cache -> STM ()
+flipCache Cache{..} = do
+ readTVar primaryCache >>= writeTVar secondaryCache
+ writeTVar primaryCache IM.empty
+
+createTracker :: WatchManager -> FilePath -> B.ByteString -> IO Tracker
+createTracker man prefix trackerPath = do
+ let filePath = prefix </> B.unpack trackerPath
+ exist <- doesFileExist filePath
+ unless exist $ throwIO FileNotFound
+ streamHandle <- openLiszt filePath
+ vRoot <- newTVarIO Empty
+ vPending <- newTVarIO []
+ vUpdated <- newTVarIO True
+ vReaders <- newTVarIO 1
+ stopWatch <- watchDir man (takeDirectory filePath) (\case
+ Modified path' _ _ | filePath == path' -> True
+ _ -> False)
+ $ const $ void $ atomically $ writeTVar vUpdated True
+
+ fptr <- B.mallocByteString 4096
+
+ let wait = atomically $ do
+ b <- readTVar vUpdated
+ unless b retry
+ writeTVar vUpdated False
+
+ hSeek (hPayload streamHandle) SeekFromEnd (-fromIntegral footerSize)
+ let seekRoot prevSize = do
+ n <- withForeignPtr fptr $ \p -> hGetBuf (hPayload streamHandle) p 4096
+ if n == 0
+ then do
+ let bs = B.PS fptr (max 0 $ prevSize - footerSize) footerSize
+ if isFooter bs
+ then try (evaluate $ decodeFrame bs) >>= \case
+ Left (_ :: DecodeException) -> do
+ wait
+ seekRoot 0
+ Right a -> forceSpine a
+ else do
+ wait
+ seekRoot 0
+ else seekRoot n
+
+ cache <- Cache <$> newTVarIO IM.empty <*> newTVarIO IM.empty
+
+ followThread <- forkFinally (forever $ do
+ newRoot <- fmap CachePointer <$> seekRoot 0
+ join $ atomically $ do
+ flipCache cache
+ writeTVar vRoot newRoot
+ pending <- readTVar vPending
+ writeTVar vPending []
+ ms <- sequence pending
+ return $ sequence_ ms
+ wait) $ const $ stopWatch >> closeLiszt streamHandle
+
+ return Tracker{..}
+
+data LisztReader = LisztReader
+ { watchManager :: WatchManager
+ , vTrackers :: TVar (HM.HashMap B.ByteString Tracker)
+ , prefix :: FilePath
+ }
+
+withLisztReader :: FilePath -> (LisztReader -> IO ()) -> IO ()
+withLisztReader prefix k = do
+ vTrackers <- newTVarIO HM.empty
+ withManager $ \watchManager -> k LisztReader{..}
+
+acquireTracker :: LisztReader -> B.ByteString -> IO Tracker
+acquireTracker LisztReader{..} path = join $ atomically $ do
+ streams <- readTVar vTrackers
+ case HM.lookup path streams of
+ Just s -> do
+ modifyTVar' (vReaders s) (+1)
+ return (return s)
+ Nothing -> return $ do
+ s <- createTracker watchManager prefix path
+ atomically $ modifyTVar vTrackers (HM.insert path s)
+ return s
+
+releaseTracker :: LisztReader -> Tracker -> IO ()
+releaseTracker LisztReader{..} Tracker{..} = join $ atomically $ do
+ n <- readTVar vReaders
+ if n <= 1
+ then do
+ modifyTVar' vTrackers (HM.delete trackerPath)
+ return $ do
+ killThread followThread
+ closeLiszt streamHandle
+ else return () <$ writeTVar vReaders (n - 1)
+
+withTracker :: LisztReader -> B.ByteString -> (Tracker -> IO a) -> IO a
+withTracker env path = bracket (acquireTracker env path) (releaseTracker env)
+
+handleRequest :: Tracker
+ -> Request
+ -> (LisztHandle -> Int -> [QueryResult] -> IO ())
+ -> IO ()
+handleRequest str@Tracker{..} req@Request{..} cont = do
+ root <- atomically $ do
+ b <- readTVar vUpdated
+ when b retry
+ readTVar vRoot
+ give cache $ lookupSpine streamHandle reqKey root >>= \case
+ Nothing -> throwIO StreamNotFound
+ Just spine -> do
+ let len = spineLength spine
+ let goSeqNo ofs
+ | ofs >= len = do
+ delay <- newDelay reqTimeout
+ atomically $ do
+ modifyTVar vPending $ (:) $ cont streamHandle 0 [] <$ waitDelay delay
+ <|> pure (handleRequest str req { reqTo = SeqNo ofs } cont)
+ | otherwise = do
+ spine' <- dropSpine streamHandle (len - ofs - 1) spine
+ case reqFrom of
+ FromEnd n -> takeSpine streamHandle (min reqLimit $ ofs - (len - n) + 1) spine' [] >>= cont streamHandle ofs
+ SeqNo n -> takeSpine streamHandle (min reqLimit $ ofs - n + 1) spine' [] >>= cont streamHandle ofs
+ WineryTag sch name p -> do
+ dec <- handleWinery sch name
+ takeSpineWhile ((>=p) . dec . WB.toByteString) streamHandle spine' [] >>= cont streamHandle ofs
+ case reqTo of
+ FromEnd ofs -> goSeqNo (len - ofs)
+ SeqNo ofs -> goSeqNo ofs
+ WineryTag sch name p -> do
+ dec <- handleWinery sch name
+ dropSpineWhile ((>=p) . dec . WB.toByteString) streamHandle spine >>= \case
+ Nothing -> cont streamHandle 0 []
+ Just (dropped, e, spine') -> case reqFrom of
+ FromEnd n -> takeSpine streamHandle (min reqLimit $ n - dropped + 1) spine' [e] >>= cont streamHandle (len - dropped)
+ SeqNo n -> takeSpine streamHandle (min reqLimit $ len - dropped - n + 1) spine' [e] >>= cont streamHandle (len - dropped)
+ WineryTag sch' name' q -> do
+ dec' <- handleWinery sch' name'
+ takeSpineWhile ((>=q) . dec' . WB.toByteString) streamHandle spine' [e] >>= cont streamHandle (len - dropped)
+ where
+ handleWinery :: Schema -> [Text] -> IO (B.ByteString -> Scientific)
+ handleWinery sch names = either (throwIO . WinerySchemaError . show) pure
+ $ getDecoderBy (foldr (flip extractFieldBy) deserialiser names) sch