summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlyokha <>2019-04-23 10:37:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-04-23 10:37:00 (GMT)
commit27b0a2dfbb41aefa0d5a48f2b24af12f2f533fb8 (patch)
tree9814493af66ef0a977fa46c441efa07c6ddc8396
version 0.1.0.00.1.0.0
-rw-r--r--Changelog.md4
-rw-r--r--LICENSE25
-rw-r--r--NgxExport/Tools/Aggregate.hs448
-rw-r--r--Setup.hs3
-rw-r--r--ngx-export-tools-extra.cabal35
5 files changed, 515 insertions, 0 deletions
diff --git a/Changelog.md b/Changelog.md
new file mode 100644
index 0000000..ca402b9
--- /dev/null
+++ b/Changelog.md
@@ -0,0 +1,4 @@
+### 0.1.0.0
+
+- Initial version.
+
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..2990346
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,25 @@
+The following license covers this documentation, and the source code, except
+where otherwise indicated.
+
+Copyright 2019, Alexey Radkov. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS "AS IS" AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/NgxExport/Tools/Aggregate.hs b/NgxExport/Tools/Aggregate.hs
new file mode 100644
index 0000000..881e7be
--- /dev/null
+++ b/NgxExport/Tools/Aggregate.hs
@@ -0,0 +1,448 @@
+{-# LANGUAGE TemplateHaskell, OverloadedStrings, BangPatterns #-}
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : NgxExport.Tools.Aggregate
+-- Copyright : (c) Alexey Radkov 2019
+-- License : BSD-style
+--
+-- Maintainer : alexey.radkov@gmail.com
+-- Stability : experimental
+-- Portability : non-portable (requires Template Haskell)
+--
+-- An aggregate service from the more extra tools collection for
+-- <http://github.com/lyokha/nginx-haskell-module nginx-haskell-module>.
+--
+-----------------------------------------------------------------------------
+
+
+module NgxExport.Tools.Aggregate (
+ -- * The typed service exporter
+ -- $aggregateServiceExporter
+ AggregateServerConf
+ ,ngxExportAggregateService
+ -- * The worker-side reporter
+ ,reportAggregate
+ -- * Re-exported data constructors from /Foreign.C/
+ -- | Re-exports are needed by the exporter for marshalling in foreign calls.
+ ,Foreign.C.Types.CInt (..)
+ ,Foreign.C.Types.CUInt (..)
+ ) where
+
+import NgxExport.Tools
+
+import Language.Haskell.TH
+import Network.HTTP.Client
+import Foreign.C.Types
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Char8 as C8
+import Data.ByteString (ByteString)
+import qualified Data.ByteString.Lazy as L
+import Data.Map.Strict (Map)
+import qualified Data.Map.Strict as M
+import Data.IORef
+import Data.Int
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as T
+import Data.Time.Clock.POSIX
+import Data.Aeson
+import Data.Maybe
+import Control.Monad
+import Control.Monad.IO.Class
+import Control.Arrow
+import Control.Exception
+import Control.Exception.Enclosed (handleAny)
+import System.IO.Unsafe
+import Snap.Http.Server
+import Snap.Core
+
+type Aggregate a = IORef (CTime, Map Int32 (CTime, Maybe a))
+
+-- $aggregateServiceExporter
+--
+-- An aggregate service collects custom typed data reported by worker processes
+-- and sends this via HTTP when requested. This is an 'ignitionService' in terms
+-- of module "NgxExport.Tools", which means that it starts upon the startup of
+-- the worker process and runs until termination of the worker. Internally, an
+-- aggregate service starts an HTTP server implemented via the [Snap
+-- framework](http://snapframework.com/), which serves incoming requests from
+-- worker processes (collecting data) as well as from the Nginx server's
+-- clients (reporting collected data for administration purpose).
+--
+-- Below is a simple example.
+--
+-- ==== File /test_tools_extra.hs/
+-- @
+-- {-\# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications \#-}
+-- {-\# LANGUAGE OverloadedStrings, BangPatterns \#-}
+--
+-- module TestToolsExtra where
+--
+-- import NgxExport
+-- import NgxExport.Tools
+-- import NgxExport.Tools.Aggregate
+--
+-- import Data.ByteString (ByteString)
+-- import qualified Data.ByteString.Lazy.Char8 as C8L
+-- import Data.Aeson
+-- import Data.Maybe
+-- import Data.IORef
+-- import Control.Monad
+-- import System.IO.Unsafe
+-- import GHC.Generics
+--
+-- data Stats = Stats { bytesSent :: Int
+-- , requests :: Int
+-- , meanBytesSent :: Int
+-- } deriving Generic
+-- instance FromJSON Stats
+-- instance ToJSON Stats
+--
+-- stats :: IORef Stats
+-- stats = unsafePerformIO $ newIORef $ Stats 0 0 0
+-- {-\# NOINLINE stats \#-}
+--
+-- updateStats :: ByteString -> IO C8L.ByteString
+-- __/updateStats/__ s = do
+-- let cbs = 'readFromByteString' \@Int s
+-- modifyIORef\' stats $ \\(Stats bs rs _) ->
+-- let !nbs = bs + fromMaybe 0 cbs
+-- !nrs = rs + 1
+-- !nmbs = nbs \`div\` nrs
+-- in Stats nbs nrs nmbs
+-- return \"\"
+-- 'NgxExport.ngxExportIOYY' \'updateStats
+--
+-- reportStats :: ByteString -> Bool -> IO C8L.ByteString
+-- __/reportStats/__ = 'deferredService' $ \conf -> do
+-- let port = 'readFromByteString' \@Int conf
+-- when (isJust port) $ do
+-- s <- readIORef stats
+-- 'reportAggregate' (fromJust port) (Just s) \"__/stats/__\"
+-- return \"\"
+-- 'ngxExportSimpleService' \'reportStats $ PersistentService $ Just $ Sec 5
+--
+-- 'ngxExportAggregateService' \"__/stats/__\" \'\'Stats
+-- @
+--
+-- Here, on the bottom line, aggregate service /stats/ is declared. It expects
+-- from worker processes reports in JSON format with data of type /Stats/ which
+-- includes the number of bytes sent so far, the number of client requests, and
+-- the mean value of bytes sent per a single request. Its own configuration
+-- (a TCP port and the /purge interval/) shall be defined in the Nginx
+-- configuration file. The reports from worker processes are sent from a
+-- 'deferredService' /reportStats/ every 5 seconds: it merely reads data
+-- collected in a global IORef /stats/ and then sends this to the aggregate
+-- service using 'reportAggregate'. Handler /updateStats/ updates the /stats/
+-- on every run. It accepts a /ByteString/ from Nginx, then converts it to an
+-- /Int/ value and interprets this as the number of bytes sent in the current
+-- request. It also increments the number or requests and calculates the mean
+-- value of bytes sent in all requests to this worker so far. Notice that all
+-- the parts of /stats/ are evaluated /strictly/, it is important!
+--
+-- ==== File /nginx.conf/
+-- @
+-- user nobody;
+-- worker_processes 2;
+--
+-- events {
+-- worker_connections 1024;
+-- }
+--
+-- http {
+-- default_type application\/octet-stream;
+-- sendfile on;
+--
+-- log_format combined1 \'$remote_addr - $remote_user [$time_local] \'
+-- \'\"$request\" $status $body_bytes_sent \'
+-- \'\"$http_referer\" \"$http_user_agent\"\'
+-- \'$hs_updateStats\';
+--
+-- haskell load \/var\/lib\/nginx\/test_tools_extra.so;
+--
+-- haskell_run_service __/simpleService_aggregate_stats/__ $hs_stats
+-- \'__/AggregateServerConf/__ { __/asPort/__ = 8100, __/asPurgeInterval/__ = Min 5 }\';
+--
+-- haskell_service_var_in_shm stats 64k \/tmp $hs_stats;
+--
+-- haskell_run_service __/simpleService_reportStats/__ $hs_reportStats 8100;
+--
+-- server {
+-- listen 8010;
+-- server_name main;
+-- error_log \/tmp\/nginx-test-haskell-error.log;
+-- access_log \/tmp\/nginx-test-haskell-access.log combined1;
+--
+-- haskell_run __/updateStats/__ $hs_updateStats $bytes_sent;
+--
+-- location \/ {
+-- echo Ok;
+-- }
+-- }
+--
+-- server {
+-- listen 8020;
+-- server_name stat;
+--
+-- location \/ {
+-- allow 127.0.0.1;
+-- deny all;
+-- proxy_pass http:\/\/127.0.0.1:8100\/get\/__/stats/__;
+-- }
+-- }
+-- }
+-- @
+--
+-- The aggregate service /stats/ must be referred from the Nginx configuration
+-- file with prefix __/simpleService_aggregate_/__. Its configuration is typed,
+-- the type is 'AggregateServerConf'. Though its only constructor
+-- /AggregateServerConf/ is not exported from this module, the service is still
+-- configurable from an Nginx configuration. Here, the aggregate service listens
+-- on TCP port /8100/, and its /purge interval/ is 5 minutes. Notice that an
+-- aggregate service must be /shared/ (here, variable /$hs_stats/ is declared as
+-- shared with Nginx directive /haskell_service_var_in_shm/), otherwise it won't
+-- even start because the internal HTTP servers on each worker process won't be
+-- able to bind to the same TCP port. Inside the upper /server/ clause, handler
+-- /updateStats/ runs on every client request. However, as soon as Nginx
+-- variable handlers are /lazy/, evaluation of /$hs_updateStats/ must be forced
+-- somewhere: the log phase is a good choice for this (Nginx internal variable
+-- /$bytes_sent/ has already been evaluated at this point). That's why
+-- /$hs_updateStats/ (which is always empty, but has valuable side effects) is
+-- put inside of the /log_format combined1/ without any risk of affecting the
+-- actual formatting.
+--
+-- Data collected by the aggregate server can be obtained in a request to the
+-- virtual server listening on TCP port /8020/. It simply proxies requests to
+-- the internal aggregate server with URL /\/get\/__stats__/ where __/stats/__
+-- corresponds to the /name/ of the aggregate service.
+--
+-- ==== A simple test
+-- As far as /reportStats/ is a deferred service, we won't get useful data in 5
+-- seconds after Nginx start.
+--
+-- > $ curl 'http://127.0.0.1:8020/' | jq
+-- > [
+-- > "1970-01-01T00:00:00Z",
+-- > {}
+-- > ]
+--
+-- However, later we should get some useful data.
+--
+-- > $ curl 'http://127.0.0.1:8020/' | jq
+-- > [
+-- > "2019-04-22T14:19:04Z",
+-- > {
+-- > "5910": [
+-- > "2019-04-22T14:19:19Z",
+-- > {
+-- > "bytesSent": 0,
+-- > "requests": 0,
+-- > "meanBytesSent": 0
+-- > }
+-- > ],
+-- > "5911": [
+-- > "2019-04-22T14:19:14Z",
+-- > {
+-- > "bytesSent": 0,
+-- > "requests": 0,
+-- > "meanBytesSent": 0
+-- > }
+-- > ]
+-- > }
+-- > ]
+--
+-- Here we have collected stats from the two Nginx worker processes with /PIDs/
+-- /5910/ and /5911/. The timestamps show when the stats was updated the last
+-- time. The topmost timestamp shows the time of the latest /purge/ event. The
+-- data itself have only zeros as soon we have made no request to the main
+-- server so far. Let's run 100 simultaneous requests and look at the stats (it
+-- should update at worst in 5 seconds after running them).
+--
+-- > $ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done
+--
+-- Wait 5 seconds...
+--
+-- > $ curl 'http://127.0.0.1:8020/' | jq
+-- > [
+-- > "2019-04-22T14:29:04Z",
+-- > {
+-- > "5910": [
+-- > "2019-04-22T14:31:34Z",
+-- > {
+-- > "bytesSent": 17751,
+-- > "requests": 97,
+-- > "meanBytesSent": 183
+-- > }
+-- > ],
+-- > "5911": [
+-- > "2019-04-22T14:31:31Z",
+-- > {
+-- > "bytesSent": 549,
+-- > "requests": 3,
+-- > "meanBytesSent": 183
+-- > }
+-- > ]
+-- > }
+-- > ]
+
+-- | Configuration of an aggregate service.
+--
+-- This type is exported because Template Haskell requires that. Though its
+-- only constructor /AggregateServerConf/ is not exported, it is still reachable
+-- from Nginx configuration files. Below is definition of the constructor.
+--
+-- @
+-- AggregateServerConf { asPort :: Int
+-- , asPurgeInterval :: 'TimeInterval'
+-- }
+-- @
+--
+-- The value of /asPort/ corresponds to the TCP port of the internal aggregate
+-- server. The /asPurgeInterval/ is the /purge/ interval. An aggregate server
+-- should sometimes purge data from worker processes which did not report for a
+-- long time. For example, it makes no sense to keep data from workers that
+-- have already been terminated. The inactive PIDs get checked every
+-- /asPurgeInterval/, and data which correspond to PIDs with timestamps older
+-- than /asPurgeInterval/ get removed.
+--
+-- Be aware that due to limitations of Template Haskell, this name must be
+-- imported unqualified!
+data AggregateServerConf =
+ AggregateServerConf { asPort :: Int
+ , asPurgeInterval :: TimeInterval
+ } deriving Read
+
+aggregateServer :: (FromJSON a, ToJSON a) =>
+ Aggregate a -> ByteString -> AggregateServerConf -> Bool -> IO L.ByteString
+aggregateServer a u = ignitionService $ \conf ->
+ simpleHttpServe (asConfig $ asPort conf) (asHandler a u conf) >> return ""
+
+asConfig :: Int -> Config Snap a
+asConfig p = setPort p
+ $ setBind "127.0.0.1"
+ $ setAccessLog ConfigNoLog
+ $ setErrorLog ConfigNoLog
+ $ setVerbose False mempty
+
+asHandler :: (FromJSON a, ToJSON a) =>
+ Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
+asHandler a u conf =
+ route [(B.append "put/" u, Snap.Core.method POST $ receiveAggregate a conf)
+ ,(B.append "get/" u, Snap.Core.method GET $ sendAggregate a)
+ ]
+
+receiveAggregate :: FromJSON a =>
+ Aggregate a -> AggregateServerConf -> Snap ()
+receiveAggregate a conf =
+ handleAggregateExceptions "Exception while receiving aggregate" $ do
+ !s <- decode' <$> readRequestBody 65536
+ when (isNothing s) $ liftIO $ throwUserError "Unreadable aggregate!"
+ liftIO $ do
+ let (pid, v) = fromJust s
+ int = fromIntegral . toSec . asPurgeInterval $ conf
+ !t <- ngxNow
+ atomicModifyIORef' a $
+ \(t', v') ->
+ (let (!tn, f) =
+ if t - t' >= int
+ then (t, M.filter $ \(t'', _) -> t - t'' < int)
+ else (t', id)
+ !vn = f $ M.alter
+ (\old ->
+ let !new' =
+ if isNothing old || isJust v
+ then v
+ else snd $ fromJust old
+ in Just (t, new')
+ ) pid v'
+ in (tn, vn)
+ ,()
+ )
+ finishWith emptyResponse
+
+sendAggregate :: ToJSON a => Aggregate a -> Snap ()
+sendAggregate a =
+ handleAggregateExceptions "Exception while sending aggregate" $ do
+ s <- liftIO $ readIORef a
+ modifyResponse $ setContentType "application/json"
+ writeLBS $ encode $ (toUTCTime *** M.map (first toUTCTime)) s
+ where toUTCTime (CTime t) = posixSecondsToUTCTime $ fromIntegral t
+
+handleAggregateExceptions :: String -> Snap () -> Snap ()
+handleAggregateExceptions cmsg = handleAny $ \e ->
+ writeErrorResponse 500 $ show (e :: SomeException)
+ where writeErrorResponse c msg = do
+ modifyResponse $ setResponseStatus c $ T.encodeUtf8 $ T.pack cmsg
+ writeBS $ T.encodeUtf8 $ T.pack msg
+
+throwUserError :: String -> IO a
+throwUserError = ioError . userError
+
+-- | Exports a simple aggregate service with specified name and the aggregate
+-- type.
+--
+-- The name of the service can be chosen arbitrarily, however it must be
+-- exactly referred from 'reportAggregate' and client requests to the service
+-- because the URL of the internal HTTP server contains this.
+--
+-- The service is implemented via 'ngxExportSimpleServiceTyped' with
+-- 'AggregateServerConf' as the name of its custom type. This is an
+-- 'ignitionService' with an HTTP server based on the [Snap
+-- framework](http://snapframework.com/) running inside. The internal HTTP
+-- server collects data from worker processes on URL
+-- /\/put\/__\<name_of_the_service\>__/ and reports data on URL
+-- /\/get\/__\<name_of_the_service\>__/.
+ngxExportAggregateService :: String -- ^ Name of the service
+ -> Name -- ^ Name of the aggregate type
+ -> Q [Dec]
+ngxExportAggregateService f a = do
+ let nameF = 'aggregateServer
+ fName = mkName $ "aggregate_" ++ f
+ sName = mkName $ "aggregate_storage_" ++ f
+ uName = mkName $ "aggregate_url_" ++ f
+ concat <$> sequence
+ [sequence
+ [sigD uName [t|ByteString|]
+ ,funD uName [clause [] (normalB [|C8.pack f|]) []]
+ ,sigD sName [t|Aggregate $(conT a)|]
+ ,funD sName
+ [clause []
+ (normalB [|unsafePerformIO $ newIORef (0, M.empty)|])
+ []
+ ]
+ ,pragInlD sName NoInline FunLike AllPhases
+ ,sigD fName [t|AggregateServerConf -> Bool -> IO L.ByteString|]
+ ,funD fName
+ [clause []
+ (normalB [|$(varE nameF) $(varE sName) $(varE uName)|])
+ []
+ ]
+ ]
+ -- FIXME: name AggregateServerConf must be imported from the user's
+ -- module unqualified (see details in NgxExport/Tools.hs, function
+ -- ngxExportSimpleService')!
+ ,ngxExportSimpleServiceTyped
+ fName ''AggregateServerConf SingleShotService
+ ]
+-- | Reports data to an aggregate server.
+--
+-- If reported data is 'Nothing' then the aggregated data won't alter, but the
+-- timestamp associated with the PID of this worker process will be updated.
+reportAggregate :: ToJSON a => Int -- ^ Port of the aggregate server
+ -> Maybe a -- ^ Reported data
+ -> ByteString -- ^ Name of the aggregate server
+ -> IO ()
+reportAggregate p v u =
+ handle (const $ return () :: SomeException -> IO ()) $ do
+ req <- parseRequest "POST http://127.0.0.1"
+ pid <- fromIntegral <$> ngxPid :: IO Int32
+ let !req' = req { requestBody = RequestBodyLBS $ encode (pid, v)
+ , port = p
+ , Network.HTTP.Client.path = B.append "put/" u
+ }
+ void $ httpNoBody req' httpManager
+
+httpManager :: Manager
+httpManager = unsafePerformIO $ newManager defaultManagerSettings
+{-# NOINLINE httpManager #-}
+
diff --git a/Setup.hs b/Setup.hs
new file mode 100644
index 0000000..200a2e5
--- /dev/null
+++ b/Setup.hs
@@ -0,0 +1,3 @@
+import Distribution.Simple
+main = defaultMain
+
diff --git a/ngx-export-tools-extra.cabal b/ngx-export-tools-extra.cabal
new file mode 100644
index 0000000..3654c57
--- /dev/null
+++ b/ngx-export-tools-extra.cabal
@@ -0,0 +1,35 @@
+name: ngx-export-tools-extra
+version: 0.1.0.0
+synopsis: More extra tools for Nginx haskell module
+description: More extra tools for
+ <http://github.com/lyokha/nginx-haskell-module Nginx haskell module>.
+homepage: http://github.com/lyokha/ngx-export-tools-extra
+license: BSD3
+license-file: LICENSE
+extra-source-files: Changelog.md
+author: Alexey Radkov <alexey.radkov@gmail.com>
+maintainer: Alexey Radkov <alexey.radkov@gmail.com>
+stability: experimental
+copyright: 2019 Alexey Radkov
+category: Network
+build-type: Simple
+cabal-version: >= 1.8
+
+library
+ build-depends: base >= 4.8 && < 5
+ , template-haskell >= 2.11.0.0
+ , bytestring >= 0.10.0.0
+ , ngx-export-tools >= 0.4.5.0
+ , aeson >= 1.0.0.0
+ , http-client
+ , containers
+ , enclosed-exceptions
+ , snap-core
+ , snap-server
+ , text
+ , time
+
+ exposed-modules: NgxExport.Tools.Aggregate
+
+ ghc-options: -Wall
+