summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMitsutoshiAoe <>2014-04-03 14:14:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2014-04-03 14:14:00 (GMT)
commit2a30948e60bc3acf619cec088d81ba02adc791e4 (patch)
tree84836cee3ded9a0f140ab1b22e6ef9a6b9142674
version 0.0.00.0.0
-rw-r--r--LICENSE30
-rw-r--r--Setup.hs2
-rw-r--r--examples/random-points.hs132
-rw-r--r--influxdb.cabal81
-rw-r--r--src/Database/InfluxDB.hs77
-rw-r--r--src/Database/InfluxDB/Decode.hs135
-rw-r--r--src/Database/InfluxDB/Encode.hs96
-rw-r--r--src/Database/InfluxDB/Http.hs659
-rw-r--r--src/Database/InfluxDB/Lens.hs26
-rw-r--r--src/Database/InfluxDB/Stream.hs18
-rw-r--r--src/Database/InfluxDB/Types.hs222
-rw-r--r--src/Database/InfluxDB/Types/Internal.hs15
12 files changed, 1493 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..38b05bc
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright (c) 2014, Mitsutoshi Aoe
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * Neither the name of Mitsutoshi Aoe nor the names of other
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Setup.hs b/Setup.hs
new file mode 100644
index 0000000..9a994af
--- /dev/null
+++ b/Setup.hs
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
diff --git a/examples/random-points.hs b/examples/random-points.hs
new file mode 100644
index 0000000..71e3069
--- /dev/null
+++ b/examples/random-points.hs
@@ -0,0 +1,132 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+import Control.Applicative
+import Control.Exception as E
+import Control.Monad
+import Control.Monad.Trans
+import Data.Function (fix)
+import Data.Time.Clock.POSIX
+import System.Environment
+import System.IO
+import qualified Data.Text as T
+import qualified Data.Vector as V
+
+import System.Random.MWC (Variate(..))
+import qualified Network.HTTP.Client as HC
+import qualified System.Random.MWC as MWC
+
+import Database.InfluxDB
+
+oneWeekInSeconds :: Int
+oneWeekInSeconds = 7*24*60*60
+
+main :: IO ()
+main = do
+ [read -> (numPoints :: Int), read -> (batches :: Int)] <- getArgs
+ hSetBuffering stdout NoBuffering
+ config <- newConfig
+ HC.withManager managerSettings $ \manager -> do
+ dropDatabase config manager (Database "ctx" Nothing)
+ `E.catch`
+ -- Ignore exceptions here
+ \(_ :: HC.HttpException) -> return ()
+ db <- createDatabase config manager "ctx"
+ gen <- MWC.create
+ flip fix batches $ \outerLoop !m -> when (m > 0) $ do
+ postWithPrecision config manager db SecondsPrecision $ withSeries "ct1" $
+ flip fix numPoints $ \innerLoop !n -> when (n > 0) $ do
+ !timestamp <- liftIO $ (-)
+ <$> getPOSIXTime
+ <*> (fromIntegral <$> uniformR (0, oneWeekInSeconds) gen)
+ !value <- liftIO $ uniform gen
+ writePoints $ Point value timestamp
+ innerLoop $ n - 1
+ outerLoop $ m - 1
+
+ result <- query config manager db "select count(value) from ct1;"
+ case result of
+ [] -> putStrLn "Empty series"
+ series:_ -> do
+ print $ seriesColumns series
+ print $ seriesPoints series
+ -- Streaming output
+ queryChunked config manager db "select * from ct1;" $ \stream0 ->
+ flip fix stream0 $ \loop stream -> case stream of
+ Done -> return ()
+ Yield series next -> do
+ case fromSeriesData series of
+ Left reason -> hPutStrLn stderr reason
+ Right points -> mapM_ print (points :: [Point])
+ putStrLn "--"
+ stream' <- next
+ loop stream'
+
+newConfig :: IO Config
+newConfig = do
+ pool <- newServerPool localServer [] -- no backup servers
+ return Config
+ { configCreds = rootCreds
+ , configServerPool = pool
+ }
+
+managerSettings :: HC.ManagerSettings
+managerSettings = HC.defaultManagerSettings
+ { HC.managerResponseTimeout = Just $ 60*(10 :: Int)^(6 :: Int)
+ }
+
+data Point = Point !Name !POSIXTime deriving Show
+
+instance ToSeriesData Point where
+ toSeriesColumns _ = V.fromList ["value", "time"]
+ toSeriesPoints (Point value time) = V.fromList
+ [ toValue value
+ , epochInSeconds time
+ ]
+
+instance FromSeriesData Point where
+ parseSeriesData = withValues $ \values -> Point
+ <$> values .: "value"
+ <*> values .: "time"
+
+epochInSeconds :: POSIXTime -> Value
+epochInSeconds = Int . floor
+
+data Name
+ = Foo
+ | Bar
+ | Baz
+ | Quu
+ | Qux
+ deriving (Enum, Bounded, Show)
+
+instance ToValue Name where
+ toValue Foo = String "foo"
+ toValue Bar = String "bar"
+ toValue Baz = String "baz"
+ toValue Quu = String "quu"
+ toValue Qux = String "qux"
+
+instance FromValue Name where
+ parseValue (String name) = case name of
+ "foo" -> return Foo
+ "bar" -> return Bar
+ "baz" -> return Baz
+ "quu" -> return Quu
+ "qux" -> return Qux
+ _ -> fail $ "Incorrect string: " ++ T.unpack name
+ parseValue v = typeMismatch "String" v
+
+instance Variate Name where
+ uniform = uniformR (minBound, maxBound)
+ uniformR (lower, upper) g = do
+ name <- uniformR (fromEnum lower, fromEnum upper) g
+ return $! toEnum name
+
+instance FromValue POSIXTime where
+ parseValue (Int n) = return $ fromIntegral n
+ parseValue (Float d) = return $ realToFrac d
+ parseValue v = typeMismatch "Int or Float" v
diff --git a/influxdb.cabal b/influxdb.cabal
new file mode 100644
index 0000000..754fa3c
--- /dev/null
+++ b/influxdb.cabal
@@ -0,0 +1,81 @@
+name: influxdb
+version: 0.0.0
+synopsis: Haskell client library for InfluxDB
+description: Haskell client library for InfluxDB
+homepage: https://github.com/maoe/influxdb-haskell
+license: BSD3
+license-file: LICENSE
+author: Mitsutoshi Aoe
+maintainer: Mitsutoshi Aoe <maoe@foldr.in>
+copyright: Copyright (C) 2014 Mitsutoshi Aoe
+category: Database
+build-type: Simple
+cabal-version: >= 1.10
+
+flag examples
+ default: False
+
+library
+ exposed-modules:
+ Database.InfluxDB
+ Database.InfluxDB.Decode
+ Database.InfluxDB.Encode
+ Database.InfluxDB.Http
+ Database.InfluxDB.Lens
+ -- Database.InfluxDB.Protobuf
+ -- Database.InfluxDB.Query
+ Database.InfluxDB.Stream
+ Database.InfluxDB.Types
+ other-modules:
+ Database.InfluxDB.Types.Internal
+ ghc-options: -Wall
+ build-depends:
+ base >= 4 && < 4.8
+ , aeson >= 0.7
+ , attoparsec
+ , bytestring
+ , containers
+ , data-default-class
+ , dlist
+ , http-client
+ , lifted-base
+ , mtl
+ , network
+ , retry
+ , scientific
+ , tagged
+ , text
+ , time
+ , vector
+ hs-source-dirs: src
+ default-language: Haskell2010
+
+executable influx-random-points
+ if flag(examples)
+ buildable: True
+ else
+ buildable: False
+ hs-source-dirs: examples
+ main-is: random-points.hs
+ ghc-options: -Wall
+ build-depends:
+ base
+ , bytestring
+ , http-client
+ , influxdb
+ , mtl
+ , mwc-random
+ , text
+ , time
+ , vector
+ default-language: Haskell2010
+
+source-repository head
+ type: git
+ branch: develop
+ location: https://github.com/maoe/influxdb-haskell.git
+
+source-repository this
+ type: git
+ tag: v0.0.0
+ location: https://github.com/maoe/influxdb-haskell.git
diff --git a/src/Database/InfluxDB.hs b/src/Database/InfluxDB.hs
new file mode 100644
index 0000000..f9ad216
--- /dev/null
+++ b/src/Database/InfluxDB.hs
@@ -0,0 +1,77 @@
+module Database.InfluxDB
+ (
+ -- * Series data types
+ Series(..), seriesColumns, seriesPoints
+ , SeriesData(..)
+ , Value(..)
+
+ -- ** Encoding
+ , ToSeriesData(..)
+ , ToValue(..)
+
+ -- ** Decoding
+ , FromSeries(..), fromSeries
+ , FromSeriesData(..), fromSeriesData
+ , FromValue(..), fromValue
+
+ , withValues, (.:)
+ , typeMismatch
+
+ -- * HTTP API
+ -- ** Data types
+ , Config(..)
+ , Credentials(..), rootCreds
+ , TimePrecision(..)
+ , Server(..), localServer
+ , ServerPool, newServerPool
+ , Database(..)
+ , User(..)
+ , Admin(..)
+
+ -- ** Writing Data
+
+ -- *** Updating Points
+ , post, postWithPrecision
+ , SeriesT, PointT
+ , writeSeries
+ , withSeries
+ , writePoints
+
+ -- *** Deleting Points
+ -- **** One Time Deletes (not implemented)
+ -- , deleteSeries
+ -- **** Regularly Scheduled Deletes (not implemented)
+ -- , getScheduledDeletes
+ -- , addScheduledDelete
+ -- , removeScheduledDelete
+
+ -- ** Querying Data
+ , query
+ , Stream(..)
+ , queryChunked
+
+ -- ** Administration & Security
+ -- *** Creating and Dropping Databases
+ , listDatabases
+ , createDatabase
+ , dropDatabase
+
+ -- *** Security
+ -- **** Cluster admin
+ , listClusterAdmins
+ , addClusterAdmin
+ , updateClusterAdminPassword
+ , deleteClusterAdmin
+ -- **** Database user
+ , listDatabaseUsers
+ , addDatabaseUser
+ , updateDatabaseUserPassword
+ , deleteDatabaseUser
+ , grantAdminPrivilegeTo
+ , revokeAdminPrivilegeFrom
+ ) where
+
+import Database.InfluxDB.Decode
+import Database.InfluxDB.Encode
+import Database.InfluxDB.Http
+import Database.InfluxDB.Types
diff --git a/src/Database/InfluxDB/Decode.hs b/src/Database/InfluxDB/Decode.hs
new file mode 100644
index 0000000..98b8296
--- /dev/null
+++ b/src/Database/InfluxDB/Decode.hs
@@ -0,0 +1,135 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE RecordWildCards #-}
+module Database.InfluxDB.Decode
+ ( FromSeries(..), fromSeries
+ , FromSeriesData(..), fromSeriesData
+ , withValues, (.:)
+ , FromValue(..), fromValue
+ , Parser, ValueParser, typeMismatch
+ ) where
+import Control.Applicative
+import Control.Monad.Reader
+import Data.Map (Map)
+import Data.Vector (Vector)
+import Data.Tuple (swap)
+import qualified Data.DList as DL
+import qualified Data.Map as Map
+import qualified Data.Text as T
+import qualified Data.Vector as V
+
+import Database.InfluxDB.Types
+
+-- | A type that can be converted from a @Series@.
+class FromSeries a where
+ parseSeries :: Series -> Parser a
+
+instance FromSeries Series where
+ parseSeries = return
+
+instance FromSeries SeriesData where
+ parseSeries = return . seriesData
+
+-- | Converte a value from a @Series@, failing if the types do not match.
+fromSeries :: FromSeries a => Series -> Either String a
+fromSeries = runParser . parseSeries
+
+-- | A type that can be converted from a @SeriesData@. A typical implementation
+-- is as follows.
+--
+-- > import Control.Applicative ((<$>), (<*>))
+-- > import qualified Data.Vector as V
+-- >
+-- > data Event = Event Text EventType
+-- > data EventType = Login | Logout
+-- >
+-- > instance FromSeriesData where
+-- > parseSeriesData = withValues $ values -> Event
+-- > <$> values .: "user"
+-- > <*> values .: "type"
+-- >
+-- > instance FromValue EventType
+class FromSeriesData a where
+ parseSeriesData :: Vector Column -> Vector Value -> Parser a
+
+instance FromSeriesData SeriesData where
+ parseSeriesData columns values = return SeriesData
+ { seriesDataColumns = columns
+ , seriesDataPoints = DL.singleton values
+ }
+
+-- | Converte a value from a @SeriesData@, failing if the types do not match.
+fromSeriesData :: FromSeriesData a => SeriesData -> Either String [a]
+fromSeriesData SeriesData {..} = mapM
+ (runParser . parseSeriesData seriesDataColumns)
+ (DL.toList seriesDataPoints)
+
+withValues
+ :: (Vector Value -> ValueParser a)
+ -> Vector Column -> Vector Value -> Parser a
+withValues f columns values =
+ runReaderT m $ Map.fromList $ map swap $ V.toList $ V.indexed columns
+ where
+ ValueParser m = f values
+
+(.:) :: FromValue a => Vector Value -> Column -> ValueParser a
+values .: column = do
+ found <- asks $ Map.lookup column
+ case found of
+ Nothing -> fail $ "No such column: " ++ T.unpack column
+ Just idx -> do
+ value <- V.indexM values idx
+ liftParser $ parseValue value
+
+-- | A type that can be converted from a @Value@.
+class FromValue a where
+ parseValue :: Value -> Parser a
+
+-- | Converte a value from a @Value@, failing if the types do not match.
+fromValue :: FromValue a => Value -> Either String a
+fromValue = runParser . parseValue
+
+instance FromValue Value where
+ parseValue = return
+
+instance FromValue Bool where
+ parseValue (Bool b) = return b
+ parseValue v = typeMismatch "Bool" v
+
+instance FromValue a => FromValue (Maybe a) where
+ parseValue Null = return Nothing
+ parseValue v = Just <$> parseValue v
+
+instance FromValue Int where
+ parseValue (Int n) = return $ fromIntegral n
+ parseValue v = typeMismatch "Int" v
+
+instance FromValue Double where
+ parseValue (Float d) = return d
+ parseValue v = typeMismatch "Float" v
+
+typeMismatch
+ :: String
+ -> Value
+ -> Parser a
+typeMismatch expected actual = fail $
+ "when expecting a " ++ expected ++
+ ", encountered " ++ name ++ " instead"
+ where
+ name = case actual of
+ Int _ -> "Int"
+ Float _ -> "Float"
+ String _ -> "String"
+ Bool _ -> "Bool"
+ Null -> "Null"
+
+newtype Parser a = Parser
+ { runParser :: Either String a
+ } deriving (Functor, Applicative, Monad)
+
+type ColumnIndex = Map Column Int
+
+newtype ValueParser a = ValueParser (ReaderT ColumnIndex Parser a)
+ deriving (Functor, Applicative, Monad, MonadReader ColumnIndex)
+
+liftParser :: Parser a -> ValueParser a
+liftParser = ValueParser . ReaderT . const
diff --git a/src/Database/InfluxDB/Encode.hs b/src/Database/InfluxDB/Encode.hs
new file mode 100644
index 0000000..ffc0ce6
--- /dev/null
+++ b/src/Database/InfluxDB/Encode.hs
@@ -0,0 +1,96 @@
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+module Database.InfluxDB.Encode
+ ( ToSeries(..)
+ , ToSeriesData(..), toSeriesData
+ , ToValue(..)
+ ) where
+import Data.Int (Int8, Int16, Int32, Int64)
+import Data.Proxy
+import Data.Vector (Vector)
+import Data.Word (Word8, Word16, Word32)
+import qualified Data.DList as DL
+import qualified Data.Text as T
+import qualified Data.Text.Lazy as TL
+
+import Database.InfluxDB.Types
+
+-- | A type that can be converted to a @Series@.
+class ToSeries a where
+ toSeries :: a -> Series
+
+-- | A type that can be converted to a @SeriesData@. A typical implementation is
+-- as follows.
+--
+-- > import qualified Data.Vector as V
+-- >
+-- > data Event = Event Text EventType
+-- > data EventType = Login | Logout
+-- >
+-- > instance ToSeriesData where
+-- > toSeriesColumn _ = V.fromList ["user", "type"]
+-- > toSeriesPoints (Event user ty) = V.fromList [toValue user, toValue ty]
+-- >
+-- > instance ToValue EventType
+class ToSeriesData a where
+ -- | Column names. You can safely ignore the proxy agument.
+ toSeriesColumns :: Proxy a -> Vector Column
+ -- | Data points.
+ toSeriesPoints :: a -> Vector Value
+
+toSeriesData :: forall a. ToSeriesData a => a -> SeriesData
+toSeriesData a = SeriesData
+ { seriesDataColumns = toSeriesColumns (Proxy :: Proxy a)
+ , seriesDataPoints = DL.singleton (toSeriesPoints a)
+ }
+
+-- | A type that can be stored in InfluxDB.
+class ToValue a where
+ toValue :: a -> Value
+
+instance ToValue Bool where
+ toValue = Bool
+
+instance ToValue a => ToValue (Maybe a) where
+ toValue Nothing = Null
+ toValue (Just a) = toValue a
+
+instance ToValue Int where
+ toValue = Int . fromIntegral
+
+instance ToValue Int8 where
+ toValue = Int . fromIntegral
+
+instance ToValue Int16 where
+ toValue = Int . fromIntegral
+
+instance ToValue Int32 where
+ toValue = Int . fromIntegral
+
+instance ToValue Int64 where
+ toValue = Int
+
+instance ToValue Word8 where
+ toValue = Int . fromIntegral
+
+instance ToValue Word16 where
+ toValue = Int . fromIntegral
+
+instance ToValue Word32 where
+ toValue = Int . fromIntegral
+
+instance ToValue Float where
+ toValue = Float . realToFrac
+
+instance ToValue Double where
+ toValue = Float
+
+instance ToValue T.Text where
+ toValue = String
+
+instance ToValue TL.Text where
+ toValue = String . TL.toStrict
+
+instance ToValue String where
+ toValue = String . T.pack
diff --git a/src/Database/InfluxDB/Http.hs b/src/Database/InfluxDB/Http.hs
new file mode 100644
index 0000000..11d97e5
--- /dev/null
+++ b/src/Database/InfluxDB/Http.hs
@@ -0,0 +1,659 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+module Database.InfluxDB.Http
+ ( Config(..)
+ , Credentials(..), rootCreds
+ , Server(..), localServer
+ , TimePrecision(..)
+
+ -- * Writing Data
+
+ -- ** Updating Points
+ , post, postWithPrecision
+ , SeriesT, PointT
+ , writeSeries
+ , withSeries
+ , writePoints
+
+ -- ** Deleting Points
+ -- *** One Time Deletes (not implemented)
+ -- , deleteSeries
+ -- *** Regularly Scheduled Deletes (not implemented)
+ -- , getScheduledDeletes
+ -- , addScheduledDelete
+ -- , removeScheduledDelete
+
+ -- * Querying Data
+ , query
+ , Stream(..)
+ , queryChunked
+
+ -- * Administration & Security
+ -- ** Creating and Dropping Databases
+ , listDatabases
+ , createDatabase
+ , dropDatabase
+
+ -- ** Security
+ -- *** Cluster admin
+ , listClusterAdmins
+ , addClusterAdmin
+ , updateClusterAdminPassword
+ , deleteClusterAdmin
+ -- *** Database user
+ , listDatabaseUsers
+ , addDatabaseUser
+ , updateDatabaseUserPassword
+ , deleteDatabaseUser
+ , grantAdminPrivilegeTo
+ , revokeAdminPrivilegeFrom
+ ) where
+
+import Control.Applicative
+import Control.Monad.Identity
+import Control.Monad.Writer
+import Data.DList (DList)
+import Data.IORef (IORef)
+import Data.Proxy
+import Data.Text (Text)
+import Data.Vector (Vector)
+import Network.URI (escapeURIString, isAllowedInURI)
+import Text.Printf (printf)
+import qualified Data.ByteString as BS
+import qualified Data.ByteString.Char8 as BS8
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.DList as DL
+import qualified Data.Text as T
+
+import Control.Exception.Lifted (Handler(..))
+import Control.Retry
+import Data.Aeson ((.=))
+import Data.Default.Class (Default(def))
+import qualified Data.Aeson as A
+import qualified Data.Aeson.Encode as AE
+import qualified Data.Attoparsec as P
+import qualified Network.HTTP.Client as HC
+
+import Database.InfluxDB.Decode
+import Database.InfluxDB.Encode
+import Database.InfluxDB.Types
+import Database.InfluxDB.Stream (Stream(..))
+import qualified Database.InfluxDB.Stream as S
+
+-- | Configurations for HTTP API client.
+data Config = Config
+ { configCreds :: !Credentials
+ , configServerPool :: IORef ServerPool
+ }
+
+-- | Default credentials.
+rootCreds :: Credentials
+rootCreds = Credentials
+ { credsUser = "root"
+ , credsPassword = "root"
+ }
+
+-- | Default server location.
+localServer :: Server
+localServer = Server
+ { serverHost = "localhost"
+ , serverPort = 8086
+ , serverSsl = False
+ }
+
+data TimePrecision
+ = SecondsPrecision
+ | MillisecondsPrecision
+ | MicrosecondsPrecision
+
+timePrecChar :: TimePrecision -> Char
+timePrecChar SecondsPrecision = 's'
+timePrecChar MillisecondsPrecision = 'm'
+timePrecChar MicrosecondsPrecision = 'u'
+
+-----------------------------------------------------------
+-- Writing Data
+
+-- | Post a bunch of writes for (possibly multiple) series into a database.
+post
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> SeriesT IO a
+ -> IO a
+post config manager database =
+ postGeneric config manager database Nothing
+
+-- | Post a bunch of writes for (possibly multiple) series into a database like
+-- @post@ but with time precision.
+postWithPrecision
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> TimePrecision
+ -> SeriesT IO a
+ -> IO a
+postWithPrecision config manager database timePrec =
+ postGeneric config manager database (Just timePrec)
+
+postGeneric
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> Maybe TimePrecision
+ -> SeriesT IO a
+ -> IO a
+postGeneric Config {..} manager database timePrec write = do
+ (a, series) <- runSeriesT write
+ void $ httpLbsWithRetry configServerPool (makeRequest series) manager
+ return a
+ where
+ makeRequest series = def
+ { HC.method = "POST"
+ , HC.requestBody = HC.RequestBodyLBS $ AE.encode series
+ , HC.path = escapeString $ printf "/db/%s/series"
+ (T.unpack databaseName)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ (maybe
+ ""
+ (printf "&time_precision=%c" . timePrecChar)
+ timePrec :: String)
+ }
+ Database {databaseName} = database
+ Credentials {..} = configCreds
+
+-- | Monad transformer to batch up multiple writes of series to speed up
+-- insertions.
+newtype SeriesT m a = SeriesT (WriterT (DList Series) m a)
+ deriving
+ ( Functor, Applicative, Monad, MonadIO, MonadTrans
+ , MonadWriter (DList Series)
+ )
+
+-- | Monad transformer to batch up multiple writes of points to speed up
+-- insertions.
+newtype PointT p m a = PointT (WriterT (DList (Vector Value)) m a)
+ deriving
+ ( Functor, Applicative, Monad, MonadIO, MonadTrans
+ , MonadWriter (DList (Vector Value))
+ )
+
+runSeriesT :: Monad m => SeriesT m a -> m (a, [Series])
+runSeriesT (SeriesT w) = do
+ (a, series) <- runWriterT w
+ return (a, DL.toList series)
+
+-- | Write a single series data.
+writeSeries
+ :: (Monad m, ToSeriesData a)
+ => Text
+ -- ^ Series name
+ -> a
+ -- ^ Series data
+ -> SeriesT m ()
+writeSeries name a = tell . DL.singleton $ Series
+ { seriesName = name
+ , seriesData = toSeriesData a
+ }
+
+-- | Write a bunch of data for a single series. Columns for the points don't
+-- need to be specified because they can be inferred from the type of @a@.
+withSeries
+ :: forall m a. (Monad m, ToSeriesData a)
+ => Text
+ -- ^ Series name
+ -> PointT a m ()
+ -> SeriesT m ()
+withSeries name (PointT w) = do
+ (_, values) <- lift $ runWriterT w
+ tell $ DL.singleton Series
+ { seriesName = name
+ , seriesData = SeriesData
+ { seriesDataColumns = toSeriesColumns (Proxy :: Proxy a)
+ , seriesDataPoints = values
+ }
+ }
+
+-- | Write a data into a series.
+writePoints
+ :: (Monad m, ToSeriesData a)
+ => a
+ -> PointT a m ()
+writePoints = tell . DL.singleton . toSeriesPoints
+
+-- TODO: Delete API hasn't been implemented in InfluxDB yet
+--
+-- deleteSeries
+-- :: Config
+-- -> HC.Manager
+-- -> Series
+-- -> IO ()
+-- deleteSeries Config {..} manager =
+-- error "deleteSeries: not implemented"
+--
+-- getScheduledDeletes
+-- :: Config
+-- -> HC.Manager
+-- -> IO [ScheduledDelete]
+-- getScheduledDeletes = do
+-- error "getScheduledDeletes: not implemented"
+--
+-- addScheduledDelete
+-- :: Config
+-- -> HC.Manager
+-- -> IO ScheduledDelete
+-- addScheduledDelete =
+-- error "addScheduledDeletes: not implemented"
+--
+-- removeScheduledDelete
+-- :: Config
+-- -> HC.Manager
+-- -> ScheduledDeletes
+-- -> IO ()
+-- removeScheduledDelete =
+-- error "removeScheduledDelete: not implemented"
+
+-----------------------------------------------------------
+-- Querying Data
+
+-- | Query a specified database.
+--
+-- The query format is specified in the
+-- <http://influxdb.org/docs/query_language/ InfluxDB Query Language>.
+query
+ :: FromSeries a
+ => Config
+ -> HC.Manager
+ -> Database
+ -> Text -- ^ Query text
+ -> IO [a]
+query Config {..} manager database q = do
+ response <- httpLbsWithRetry configServerPool request manager
+ case A.decode (HC.responseBody response) of
+ Nothing -> fail $ show response
+ Just xs -> case mapM fromSeries xs of
+ Left reason -> fail reason
+ Right ys -> return ys
+ where
+ request = def
+ { HC.path = escapeString $ printf "/db/%s/series"
+ (T.unpack databaseName)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s&q=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ (T.unpack q)
+ }
+ Database {databaseName} = database
+ Credentials {..} = configCreds
+
+-- | Construct streaming output
+responseStream :: A.FromJSON a => HC.BodyReader -> IO (Stream IO a)
+responseStream body = demandPayload $ \payload ->
+ if BS.null payload
+ then return Done
+ else decode $ parseAsJson payload
+ where
+ demandPayload k = HC.brRead body >>= k
+ decode (P.Done leftover value) = case A.fromJSON value of
+ A.Success a -> return $ Yield a $ if BS.null leftover
+ then responseStream body
+ else decode $ parseAsJson leftover
+ A.Error message -> fail message
+ decode (P.Partial k) = demandPayload (decode . k)
+ decode (P.Fail _ _ message) = fail message
+ parseAsJson = P.parse A.json
+
+-- | Query a specified database like @query@ but in a streaming fashion.
+queryChunked
+ :: FromSeries a
+ => Config
+ -> HC.Manager
+ -> Database
+ -> Text -- ^ Query text
+ -> (Stream IO a -> IO b)
+ -- ^ Action to handle the resulting stream of series
+ -> IO b
+queryChunked Config {..} manager database q f =
+ withPool configServerPool request $ \request' ->
+ HC.withResponse request' manager $
+ responseStream . HC.responseBody >=> S.mapM parse >=> f
+ where
+ parse series = case fromSeries series of
+ Left reason -> fail reason
+ Right a -> return a
+ request = def
+ { HC.path = escapeString $ printf "/db/%s/series"
+ (T.unpack databaseName)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s&q=%s&chunked=true"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ (T.unpack q)
+ }
+ Database {databaseName} = database
+ Credentials {..} = configCreds
+
+-----------------------------------------------------------
+-- Administration & Security
+
+-- | List existing databases.
+listDatabases :: Config -> HC.Manager -> IO [Database]
+listDatabases Config {..} manager = do
+ response <- httpLbsWithRetry configServerPool makeRequest manager
+ case A.decode (HC.responseBody response) of
+ Nothing -> fail $ show response
+ Just xs -> return xs
+ where
+ makeRequest = def
+ { HC.path = "/db"
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Credentials {..} = configCreds
+
+-- | Create a new database. Requires cluster admin privileges.
+createDatabase :: Config -> HC.Manager -> Text -> IO Database
+createDatabase Config {..} manager name = do
+ void $ httpLbsWithRetry configServerPool makeRequest manager
+ return Database
+ { databaseName = name
+ , databaseReplicationFactor = Nothing
+ }
+ where
+ makeRequest = def
+ { HC.method = "POST"
+ , HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "name" .= name
+ ]
+ , HC.path = "/db"
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Credentials {..} = configCreds
+
+-- | Drop a database. Requires cluster admin privileges.
+dropDatabase :: Config -> HC.Manager -> Database -> IO ()
+dropDatabase Config {..} manager database =
+ void $ httpLbsWithRetry configServerPool makeRequest manager
+ where
+ makeRequest = def
+ { HC.method = "DELETE"
+ , HC.path = escapeString $ printf "/db/%s"
+ (T.unpack databaseName)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Database {databaseName} = database
+ Credentials {..} = configCreds
+
+-- | List cluster administrators.
+listClusterAdmins
+ :: Config
+ -> HC.Manager
+ -> IO [Admin]
+listClusterAdmins Config {..} manager = do
+ response <- httpLbsWithRetry configServerPool makeRequest manager
+ case A.decode (HC.responseBody response) of
+ Nothing -> fail $ show response
+ Just xs -> return xs
+ where
+ makeRequest = def
+ { HC.path = "/cluster_admins"
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Credentials {..} = configCreds
+
+-- | Add a new cluster administrator. Requires cluster admin privilege.
+addClusterAdmin
+ :: Config
+ -> HC.Manager
+ -> Text
+ -> IO Admin
+addClusterAdmin Config {..} manager name = do
+ void $ httpLbsWithRetry configServerPool makeRequest manager
+ return Admin
+ { adminUsername = name
+ }
+ where
+ makeRequest = def
+ { HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "name" .= name
+ ]
+ , HC.path = "/cluster_admins"
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Credentials {..} = configCreds
+
+-- | Update a cluster administrator's password. Requires cluster admin
+-- privilege.
+updateClusterAdminPassword
+ :: Config
+ -> HC.Manager
+ -> Admin
+ -> Text
+ -> IO ()
+updateClusterAdminPassword Config {..} manager admin password =
+ void $ httpLbsWithRetry configServerPool makeRequest manager
+ where
+ makeRequest = def
+ { HC.method = "POST"
+ , HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "password" .= password
+ ]
+ , HC.path = escapeString $ printf "/cluster_admins/%s"
+ (T.unpack adminUsername)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Admin {adminUsername} = admin
+ Credentials {..} = configCreds
+
+-- | Delete a cluster administrator. Requires cluster admin privilege.
+deleteClusterAdmin
+ :: Config
+ -> HC.Manager
+ -> Admin
+ -> IO ()
+deleteClusterAdmin Config {..} manager admin =
+ void $ httpLbsWithRetry configServerPool makeRequest manager
+ where
+ makeRequest = def
+ { HC.method = "DELETE"
+ , HC.path = escapeString $ printf "/cluster_admins/%s"
+ (T.unpack adminUsername)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Admin {adminUsername} = admin
+ Credentials {..} = configCreds
+
+-- | List database users.
+listDatabaseUsers
+ :: Config
+ -> HC.Manager
+ -> Text
+ -> IO [User]
+listDatabaseUsers Config {..} manager database = do
+ response <- httpLbsWithRetry configServerPool makeRequest manager
+ case A.decode (HC.responseBody response) of
+ Nothing -> fail $ show response
+ Just xs -> return xs
+ where
+ makeRequest = def
+ { HC.path = escapeString $ printf "/db/%s/users"
+ (T.unpack database)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Credentials {..} = configCreds
+
+-- | Add an user to the database users.
+addDatabaseUser
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> Text
+ -> IO User
+addDatabaseUser Config {..} manager database name = do
+ void $ httpLbsWithRetry configServerPool makeRequest manager
+ return User
+ { userName = name
+ }
+ where
+ makeRequest = def
+ { HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "name" .= name
+ ]
+ , HC.path = escapeString $ printf "/db/%s/users"
+ (T.unpack databaseName)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ Database {databaseName} = database
+ Credentials {..} = configCreds
+
+-- | Delete an user from the database users.
+deleteDatabaseUser
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> User
+ -> IO ()
+deleteDatabaseUser config manager database user =
+ void $ httpLbsWithRetry (configServerPool config) request manager
+ where
+ request = (makeRequestFromDatabaseUser config database user)
+ { HC.method = "DELETE"
+ }
+
+-- | Update password for the database user.
+updateDatabaseUserPassword
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> User
+ -> Text
+ -> IO ()
+updateDatabaseUserPassword config manager database user password =
+ void $ httpLbsWithRetry (configServerPool config) request manager
+ where
+ request = (makeRequestFromDatabaseUser config database user)
+ { HC.method = "POST"
+ , HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "password" .= password
+ ]
+ }
+
+-- | Give admin privilege to the user.
+grantAdminPrivilegeTo
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> User
+ -> IO ()
+grantAdminPrivilegeTo config manager database user =
+ void $ httpLbsWithRetry (configServerPool config) request manager
+ where
+ request = (makeRequestFromDatabaseUser config database user)
+ { HC.method = "POST"
+ , HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "admin" .= True
+ ]
+ }
+
+-- | Remove admin privilege from the user.
+revokeAdminPrivilegeFrom
+ :: Config
+ -> HC.Manager
+ -> Database
+ -> User
+ -> IO ()
+revokeAdminPrivilegeFrom config manager database user =
+ void $ httpLbsWithRetry (configServerPool config) request manager
+ where
+ request = (makeRequestFromDatabaseUser config database user)
+ { HC.method = "POST"
+ , HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
+ [ "admin" .= False
+ ]
+ }
+
+makeRequestFromDatabaseUser
+ :: Config
+ -> Database
+ -> User
+ -> HC.Request
+makeRequestFromDatabaseUser Config {..} database user = def
+ { HC.path = escapeString $ printf "/db/%s/users/%s"
+ (T.unpack databaseName)
+ (T.unpack userName)
+ , HC.queryString = escapeString $ printf "u=%s&p=%s"
+ (T.unpack credsUser)
+ (T.unpack credsPassword)
+ }
+ where
+ Database {databaseName} = database
+ User {userName} = user
+ Credentials {..} = configCreds
+
+-----------------------------------------------------------
+
+httpLbsWithRetry
+ :: IORef ServerPool
+ -> HC.Request
+ -> HC.Manager
+ -> IO (HC.Response BL.ByteString)
+httpLbsWithRetry pool request manager =
+ withPool pool request $ \request' ->
+ HC.httpLbs request' manager
+
+withPool
+ :: IORef ServerPool
+ -> HC.Request
+ -> (HC.Request -> IO a)
+ -> IO a
+withPool pool request f =
+ recovering defaultRetrySettings handlers $ do
+ server <- activeServer pool
+ f $ makeRequest server
+ where
+ makeRequest Server {..} = request
+ { HC.host = escapeText serverHost
+ , HC.port = serverPort
+ , HC.secure = serverSsl
+ }
+ handlers =
+ [ Handler $ \e -> case e of
+ HC.InternalIOException _ -> do
+ failover pool
+ return True
+ _ -> return False
+ ]
+
+defaultRetrySettings :: RetrySettings
+defaultRetrySettings = RetrySettings
+ { numRetries = limitedRetries 5
+ , backoff = True
+ , baseDelay = 50
+ }
+
+escapeText :: Text -> BS.ByteString
+escapeText = escapeString . T.unpack
+
+escapeString :: String -> BS.ByteString
+escapeString = BS8.pack . escapeURIString isAllowedInURI
diff --git a/src/Database/InfluxDB/Lens.hs b/src/Database/InfluxDB/Lens.hs
new file mode 100644
index 0000000..320d03a
--- /dev/null
+++ b/src/Database/InfluxDB/Lens.hs
@@ -0,0 +1,26 @@
+{-# LANGUAGE RankNTypes #-}
+module Database.InfluxDB.Lens
+ ( credentials, user, password
+ ) where
+import Control.Applicative
+import Data.Text (Text)
+
+import Database.InfluxDB.Http
+
+type Lens s t a b = Functor f => (a -> f b) -> s -> f t
+type Lens' s a = Lens s s a a
+
+credentials :: Lens' Config Credentials
+credentials f r = set <$> f (configCreds r)
+ where
+ set c = r { configCreds = c }
+
+user :: Lens' Credentials Text
+user f c = set <$> f (credsUser c)
+ where
+ set u = c { credsUser = u }
+
+password :: Lens' Credentials Text
+password f s = set <$> f (credsPassword s)
+ where
+ set p = s { credsPassword = p }
diff --git a/src/Database/InfluxDB/Stream.hs b/src/Database/InfluxDB/Stream.hs
new file mode 100644
index 0000000..e418651
--- /dev/null
+++ b/src/Database/InfluxDB/Stream.hs
@@ -0,0 +1,18 @@
+module Database.InfluxDB.Stream where
+import Prelude hiding (mapM)
+
+-- | Effectful stream
+data Stream m a
+ = Yield a (m (Stream m a))
+ -- ^ Yield a value. The stream will be continued.
+ | Done
+ -- ^ The end of the stream.
+
+-- | Map each element of a stream to a monadic action, evaluate these actions
+-- from left to right, and collect the results as a stream.
+mapM :: Monad m => (a -> m b) -> Stream m a -> m (Stream m b)
+mapM _ Done = return Done
+mapM f (Yield a mb) = do
+ a' <- f a
+ b <- mb
+ return $ Yield a' (mapM f b)
diff --git a/src/Database/InfluxDB/Types.hs b/src/Database/InfluxDB/Types.hs
new file mode 100644
index 0000000..57b280f
--- /dev/null
+++ b/src/Database/InfluxDB/Types.hs
@@ -0,0 +1,222 @@
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE DeriveDataTypeable #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TemplateHaskell #-}
+module Database.InfluxDB.Types
+ ( -- * Series, columns and data points
+ Series(..)
+ , seriesColumns
+ , seriesPoints
+ , SeriesData(..)
+ , Column
+ , Value(..)
+
+ -- * Data types for HTTP API
+ , Credentials(..)
+ , Server(..)
+ , Database(..)
+ , ScheduledDelete(..)
+ , User(..)
+ , Admin(..)
+
+ -- * Server pool
+ , ServerPool
+ , newServerPool
+ , activeServer
+ , failover
+ ) where
+
+import Control.Applicative (empty)
+import Data.DList (DList)
+import Data.Data (Data)
+import Data.IORef
+import Data.Int (Int64)
+import Data.Sequence (Seq, ViewL(..), (|>))
+import Data.Text (Text)
+import Data.Typeable (Typeable)
+import Data.Vector (Vector)
+import qualified Data.DList as DL
+import qualified Data.Sequence as Seq
+
+import Data.Aeson ((.=), (.:))
+import Data.Aeson.TH
+import Data.Scientific
+import qualified Data.Aeson as A
+
+import Database.InfluxDB.Types.Internal (stripPrefixOptions)
+
+-----------------------------------------------------------
+-- Compatibility for older GHC
+
+#if __GLASGOW_HASKELL__ < 706
+import Control.Exception (evaluate)
+
+atomicModifyIORef' :: IORef a -> (a -> (a, b)) -> IO b
+atomicModifyIORef' ref f = do
+ b <- atomicModifyIORef ref $ \x ->
+ let (a, b) = f x
+ in (a, a `seq` b)
+ evaluate b
+#endif
+-----------------------------------------------------------
+
+-- | A series consists of name, columns and points. The columns and points are
+-- expressed in a separate type @SeriesData@.
+data Series = Series
+ { seriesName :: {-# UNPACK #-} !Text
+ -- ^ Series name
+ , seriesData :: {-# UNPACK #-} !SeriesData
+ -- ^ Columns and data points in the series
+ }
+
+-- | Convenient accessor for columns.
+seriesColumns :: Series -> Vector Column
+seriesColumns = seriesDataColumns . seriesData
+
+-- | Convenient accessor for points.
+seriesPoints :: Series -> DList (Vector Value)
+seriesPoints = seriesDataPoints . seriesData
+
+instance A.ToJSON Series where
+ toJSON Series {..} = A.object
+ [ "name" .= seriesName
+ , "columns" .= seriesDataColumns
+ , "points" .= DL.toList seriesDataPoints
+ ]
+ where
+ SeriesData {..} = seriesData
+
+instance A.FromJSON Series where
+ parseJSON (A.Object v) = do
+ name <- v .: "name"
+ columns <- v .: "columns"
+ points <- v .: "points"
+ return Series
+ { seriesName = name
+ , seriesData = SeriesData
+ { seriesDataColumns = columns
+ , seriesDataPoints = DL.fromList points
+ }
+ }
+ parseJSON _ = empty
+
+-- | @SeriesData@ consists of columns and points.
+data SeriesData = SeriesData
+ { seriesDataColumns :: Vector Column
+ , seriesDataPoints :: DList (Vector Value)
+ }
+
+type Column = Text
+
+-- | An InfluxDB value represented as a Haskell value.
+data Value
+ = Int !Int64
+ | Float !Double
+ | String !Text
+ | Bool !Bool
+ | Null
+ deriving (Eq, Show, Data, Typeable)
+
+instance A.ToJSON Value where
+ toJSON (Int n) = A.toJSON n
+ toJSON (Float d) = A.toJSON d
+ toJSON (String xs) = A.toJSON xs
+ toJSON (Bool b) = A.toJSON b
+ toJSON Null = A.Null
+
+instance A.FromJSON Value where
+ parseJSON (A.Object o) = fail $ "Unexpected object: " ++ show o
+ parseJSON (A.Array a) = fail $ "Unexpected array: " ++ show a
+ parseJSON (A.String xs) = return $ String xs
+ parseJSON (A.Bool b) = return $ Bool b
+ parseJSON A.Null = return Null
+ parseJSON (A.Number n) = return $! if base10Exponent n == 0
+ then Int $ fromIntegral $ coefficient n
+ else Float $ realToFrac n
+
+-----------------------------------------------------------
+
+-- | User credentials.
+data Credentials = Credentials
+ { credsUser :: !Text
+ , credsPassword :: !Text
+ } deriving Show
+
+-- | Server location.
+data Server = Server
+ { serverHost :: !Text
+ -- ^ Hostname or IP address
+ , serverPort :: !Int
+ , serverSsl :: !Bool
+ -- ^ SSL is enabled or not in the server side
+ } deriving Show
+
+-- | Non-empty set of server locations. The active server will always be used
+-- until any HTTP communications fail.
+data ServerPool = ServerPool
+ { serverActive :: !Server
+ -- ^ Current active server
+ , serverBackup :: !(Seq Server)
+ -- ^ The rest of the servers in the pool.
+ }
+
+-- | Database consits of name and replication factor.
+data Database = Database
+ { databaseName :: !Text
+ , databaseReplicationFactor :: !(Maybe Int)
+ } deriving Show
+
+newtype ScheduledDelete = ScheduledDelete
+ { scheduledDeleteId :: Int
+ } deriving Show
+
+-- | User
+newtype User = User
+ { userName :: Text
+ } deriving Show
+
+-- | Administrator
+newtype Admin = Admin
+ { adminUsername :: Text
+ } deriving Show
+
+
+-----------------------------------------------------------
+-- Server pool manipulation
+
+-- | Create a non-empty server pool. You must specify at least one server
+-- location to create a pool.
+newServerPool :: Server -> [Server] -> IO (IORef ServerPool)
+newServerPool active backups = newIORef ServerPool
+ { serverActive = active
+ , serverBackup = Seq.fromList backups
+ }
+
+-- | Get a server from the pool.
+activeServer :: IORef ServerPool -> IO Server
+activeServer ref = do
+ ServerPool { serverActive } <- readIORef ref
+ return serverActive
+
+-- | Move the current server to the backup pool and pick one of the backup
+-- server as the new active server. Currently the scheduler works in
+-- round-robin fashion.
+failover :: IORef ServerPool -> IO ()
+failover ref = atomicModifyIORef' ref $ \pool@ServerPool {..} ->
+ case Seq.viewl serverBackup of
+ EmptyL -> (pool, ())
+ active :< rest -> (pool', ())
+ where
+ pool' = ServerPool
+ { serverActive = active
+ , serverBackup = rest |> serverActive
+ }
+
+-----------------------------------------------------------
+-- Aeson instances
+
+deriveFromJSON (stripPrefixOptions "database") ''Database
+deriveFromJSON (stripPrefixOptions "admin") ''Admin
+deriveFromJSON (stripPrefixOptions "user") ''User
diff --git a/src/Database/InfluxDB/Types/Internal.hs b/src/Database/InfluxDB/Types/Internal.hs
new file mode 100644
index 0000000..2fbfb16
--- /dev/null
+++ b/src/Database/InfluxDB/Types/Internal.hs
@@ -0,0 +1,15 @@
+module Database.InfluxDB.Types.Internal
+ ( stripPrefixOptions
+ ) where
+import Data.Char (toLower)
+
+import Data.Aeson.TH
+
+stripPrefixOptions :: String -> Options
+stripPrefixOptions name = defaultOptions
+ { fieldLabelModifier = modifier
+ }
+ where
+ modifier xs = case drop (length name) xs of
+ [] -> error "Insufficient length of field name"
+ c:cs -> toLower c:cs