summaryrefslogtreecommitdiff
path: root/NgxExport/Tools/Aggregate.hs
blob: 881e7be3bb3e80d4804f0bf9e06b0ce1b4b716f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
{-# LANGUAGE TemplateHaskell, OverloadedStrings, BangPatterns #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  NgxExport.Tools.Aggregate
-- Copyright   :  (c) Alexey Radkov 2019
-- License     :  BSD-style
--
-- Maintainer  :  alexey.radkov@gmail.com
-- Stability   :  experimental
-- Portability :  non-portable (requires Template Haskell)
--
-- An aggregate service from the more extra tools collection for
-- <http://github.com/lyokha/nginx-haskell-module nginx-haskell-module>.
--
-----------------------------------------------------------------------------


module NgxExport.Tools.Aggregate (
    -- * The typed service exporter
    -- $aggregateServiceExporter
                                  AggregateServerConf
                                 ,ngxExportAggregateService
    -- * The worker-side reporter
                                 ,reportAggregate
    -- * Re-exported data constructors from /Foreign.C/
    -- | Re-exports are needed by the exporter for marshalling in foreign calls.
                                 ,Foreign.C.Types.CInt (..)
                                 ,Foreign.C.Types.CUInt (..)
                                 ) where

import           NgxExport.Tools

import           Language.Haskell.TH
import           Network.HTTP.Client
import           Foreign.C.Types
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C8
import           Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import           Data.IORef
import           Data.Int
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import           Data.Time.Clock.POSIX
import           Data.Aeson
import           Data.Maybe
import           Control.Monad
import           Control.Monad.IO.Class
import           Control.Arrow
import           Control.Exception
import           Control.Exception.Enclosed (handleAny)
import           System.IO.Unsafe
import           Snap.Http.Server
import           Snap.Core

type Aggregate a = IORef (CTime, Map Int32 (CTime, Maybe a))

-- $aggregateServiceExporter
--
-- An aggregate service collects custom typed data reported by worker processes
-- and sends this via HTTP when requested. This is an 'ignitionService' in terms
-- of module "NgxExport.Tools", which means that it starts upon the startup of
-- the worker process and runs until termination of the worker. Internally, an
-- aggregate service starts an HTTP server implemented via the [Snap
-- framework](http://snapframework.com/), which serves incoming requests from
-- worker processes (collecting data) as well as from the Nginx server's
-- clients (reporting collected data for administration purpose).
--
-- Below is a simple example.
--
-- ==== File /test_tools_extra.hs/
-- @
-- {-\# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications \#-}
-- {-\# LANGUAGE OverloadedStrings, BangPatterns \#-}
--
-- module TestToolsExtra where
--
-- import           NgxExport
-- import           NgxExport.Tools
-- import           NgxExport.Tools.Aggregate
--
-- import           Data.ByteString (ByteString)
-- import qualified Data.ByteString.Lazy.Char8 as C8L
-- import           Data.Aeson
-- import           Data.Maybe
-- import           Data.IORef
-- import           Control.Monad
-- import           System.IO.Unsafe
-- import           GHC.Generics
--
-- data Stats = Stats { bytesSent :: Int
--                    , requests :: Int
--                    , meanBytesSent :: Int
--                    } deriving Generic
-- instance FromJSON Stats
-- instance ToJSON Stats
--
-- stats :: IORef Stats
-- stats = unsafePerformIO $ newIORef $ Stats 0 0 0
-- {-\# NOINLINE stats \#-}
--
-- updateStats :: ByteString -> IO C8L.ByteString
-- __/updateStats/__ s = do
--     let cbs = 'readFromByteString' \@Int s
--     modifyIORef\' stats $ \\(Stats bs rs _) ->
--         let !nbs = bs + fromMaybe 0 cbs
--             !nrs = rs + 1
--             !nmbs = nbs \`div\` nrs
--         in Stats nbs nrs nmbs
--     return \"\"
-- 'NgxExport.ngxExportIOYY' \'updateStats
--
-- reportStats :: ByteString -> Bool -> IO C8L.ByteString
-- __/reportStats/__ = 'deferredService' $ \conf -> do
--     let port = 'readFromByteString' \@Int conf
--     when (isJust port) $ do
--         s <- readIORef stats
--         'reportAggregate' (fromJust port) (Just s) \"__/stats/__\"
--     return \"\"
-- 'ngxExportSimpleService' \'reportStats $ PersistentService $ Just $ Sec 5
--
-- 'ngxExportAggregateService' \"__/stats/__\" \'\'Stats
-- @
--
-- Here, on the bottom line, aggregate service /stats/ is declared. It expects
-- from worker processes reports in JSON format with data of type /Stats/ which
-- includes the number of bytes sent so far, the number of client requests, and
-- the mean value of bytes sent per a single request. Its own configuration
-- (a TCP port and the /purge interval/) shall be defined in the Nginx
-- configuration file. The reports from worker processes are sent from a
-- 'deferredService' /reportStats/ every 5 seconds: it merely reads data
-- collected in a global IORef /stats/ and then sends this to the aggregate
-- service using 'reportAggregate'. Handler /updateStats/ updates the /stats/
-- on every run. It accepts a /ByteString/ from Nginx, then converts it to an
-- /Int/ value and interprets this as the number of bytes sent in the current
-- request. It also increments the number or requests and calculates the mean
-- value of bytes sent in all requests to this worker so far. Notice that all
-- the parts of /stats/ are evaluated /strictly/, it is important!
--
-- ==== File /nginx.conf/
-- @
-- user                    nobody;
-- worker_processes        2;
--
-- events {
--     worker_connections  1024;
-- }
--
-- http {
--     default_type        application\/octet-stream;
--     sendfile            on;
--
--     log_format combined1 \'$remote_addr - $remote_user [$time_local] \'
--                          \'\"$request\" $status $body_bytes_sent \'
--                          \'\"$http_referer\" \"$http_user_agent\"\'
--                          \'$hs_updateStats\';
--
--     haskell load \/var\/lib\/nginx\/test_tools_extra.so;
--
--     haskell_run_service __/simpleService_aggregate_stats/__ $hs_stats
--             \'__/AggregateServerConf/__ { __/asPort/__ = 8100, __/asPurgeInterval/__ = Min 5 }\';
--
--     haskell_service_var_in_shm stats 64k \/tmp $hs_stats;
--
--     haskell_run_service __/simpleService_reportStats/__ $hs_reportStats 8100;
--
--     server {
--         listen       8010;
--         server_name  main;
--         error_log    \/tmp\/nginx-test-haskell-error.log;
--         access_log   \/tmp\/nginx-test-haskell-access.log combined1;
--
--         haskell_run __/updateStats/__ $hs_updateStats $bytes_sent;
--
--         location \/ {
--             echo Ok;
--         }
--     }
--
--     server {
--         listen       8020;
--         server_name  stat;
--
--         location \/ {
--             allow 127.0.0.1;
--             deny all;
--             proxy_pass http:\/\/127.0.0.1:8100\/get\/__/stats/__;
--         }
--     }
-- }
-- @
--
-- The aggregate service /stats/ must be referred from the Nginx configuration
-- file with prefix __/simpleService_aggregate_/__. Its configuration is typed,
-- the type is 'AggregateServerConf'. Though its only constructor
-- /AggregateServerConf/ is not exported from this module, the service is still
-- configurable from an Nginx configuration. Here, the aggregate service listens
-- on TCP port /8100/, and its /purge interval/ is 5 minutes. Notice that an
-- aggregate service must be /shared/ (here, variable /$hs_stats/ is declared as
-- shared with Nginx directive /haskell_service_var_in_shm/), otherwise it won't
-- even start because the internal HTTP servers on each worker process won't be
-- able to bind to the same TCP port. Inside the upper /server/ clause, handler
-- /updateStats/ runs on every client request. However, as soon as Nginx
-- variable handlers are /lazy/, evaluation of /$hs_updateStats/ must be forced
-- somewhere: the log phase is a good choice for this (Nginx internal variable
-- /$bytes_sent/ has already been evaluated at this point). That's why
-- /$hs_updateStats/ (which is always empty, but has valuable side effects) is
-- put inside of the /log_format combined1/ without any risk of affecting the
-- actual formatting.
--
-- Data collected by the aggregate server can be obtained in a request to the
-- virtual server listening on TCP port /8020/. It simply proxies requests to
-- the internal aggregate server with URL /\/get\/__stats__/ where __/stats/__
-- corresponds to the /name/ of the aggregate service.
--
-- ==== A simple test
-- As far as /reportStats/ is a deferred service, we won't get useful data in 5
-- seconds after Nginx start.
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "1970-01-01T00:00:00Z",
-- >   {}
-- > ]
--
-- However, later we should get some useful data.
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2019-04-22T14:19:04Z",
-- >   {
-- >     "5910": [
-- >       "2019-04-22T14:19:19Z",
-- >       {
-- >         "bytesSent": 0,
-- >         "requests": 0,
-- >         "meanBytesSent": 0
-- >       }
-- >     ],
-- >     "5911": [
-- >       "2019-04-22T14:19:14Z",
-- >       {
-- >         "bytesSent": 0,
-- >         "requests": 0,
-- >         "meanBytesSent": 0
-- >       }
-- >     ]
-- >   }
-- > ]
--
-- Here we have collected stats from the two Nginx worker processes with /PIDs/
-- /5910/ and /5911/. The timestamps show when the stats was updated the last
-- time. The topmost timestamp shows the time of the latest /purge/ event. The
-- data itself have only zeros as soon we have made no request to the main
-- server so far. Let's run 100 simultaneous requests and look at the stats (it
-- should update at worst in 5 seconds after running them).
--
-- > $ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done
--
-- Wait 5 seconds...
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2019-04-22T14:29:04Z",
-- >   {
-- >     "5910": [
-- >       "2019-04-22T14:31:34Z",
-- >       {
-- >         "bytesSent": 17751,
-- >         "requests": 97,
-- >         "meanBytesSent": 183
-- >       }
-- >     ],
-- >     "5911": [
-- >       "2019-04-22T14:31:31Z",
-- >       {
-- >         "bytesSent": 549,
-- >         "requests": 3,
-- >         "meanBytesSent": 183
-- >       }
-- >     ]
-- >   }
-- > ]

-- | Configuration of an aggregate service.
--
-- This type is exported because Template Haskell requires that. Though its
-- only constructor /AggregateServerConf/ is not exported, it is still reachable
-- from Nginx configuration files. Below is definition of the constructor.
--
-- @
--     AggregateServerConf { asPort          :: Int
--                         , asPurgeInterval :: 'TimeInterval'
--                         }
-- @
--
-- The value of /asPort/ corresponds to the TCP port of the internal aggregate
-- server. The /asPurgeInterval/ is the /purge/ interval. An aggregate server
-- should sometimes purge data from worker processes which did not report for a
-- long time. For example, it makes no sense to keep data from workers that
-- have already been terminated. The inactive PIDs get checked every
-- /asPurgeInterval/, and data which correspond to PIDs with timestamps older
-- than /asPurgeInterval/ get removed.
--
-- Be aware that due to limitations of Template Haskell, this name must be
-- imported unqualified!
data AggregateServerConf =
    AggregateServerConf { asPort          :: Int
                        , asPurgeInterval :: TimeInterval
                        } deriving Read

aggregateServer :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Bool -> IO L.ByteString
aggregateServer a u = ignitionService $ \conf ->
    simpleHttpServe (asConfig $ asPort conf) (asHandler a u conf) >> return ""

asConfig :: Int -> Config Snap a
asConfig p = setPort p
           $ setBind "127.0.0.1"
           $ setAccessLog ConfigNoLog
           $ setErrorLog ConfigNoLog
           $ setVerbose False mempty

asHandler :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
asHandler a u conf =
    route [(B.append "put/" u, Snap.Core.method POST $ receiveAggregate a conf)
          ,(B.append "get/" u, Snap.Core.method GET $ sendAggregate a)
          ]

receiveAggregate :: FromJSON a =>
    Aggregate a -> AggregateServerConf -> Snap ()
receiveAggregate a conf =
    handleAggregateExceptions "Exception while receiving aggregate" $ do
        !s <- decode' <$> readRequestBody 65536
        when (isNothing s) $ liftIO $ throwUserError "Unreadable aggregate!"
        liftIO $ do
            let (pid, v) = fromJust s
                int = fromIntegral . toSec . asPurgeInterval $ conf
            !t <- ngxNow
            atomicModifyIORef' a $
                \(t', v') ->
                    (let (!tn, f) =
                             if t - t' >= int
                                 then (t, M.filter $ \(t'', _) -> t - t'' < int)
                                 else (t', id)
                         !vn = f $ M.alter
                                   (\old ->
                                       let !new' =
                                               if isNothing old || isJust v
                                                   then v
                                                   else snd $ fromJust old
                                       in Just (t, new')
                                   ) pid v'
                     in (tn, vn)
                    ,()
                    )
        finishWith emptyResponse

sendAggregate :: ToJSON a => Aggregate a -> Snap ()
sendAggregate a =
    handleAggregateExceptions "Exception while sending aggregate" $ do
        s <- liftIO $ readIORef a
        modifyResponse $ setContentType "application/json"
        writeLBS $ encode $ (toUTCTime *** M.map (first toUTCTime)) s
    where toUTCTime (CTime t) = posixSecondsToUTCTime $ fromIntegral t

handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions cmsg = handleAny $ \e ->
    writeErrorResponse 500 $ show (e :: SomeException)
    where writeErrorResponse c msg = do
              modifyResponse $ setResponseStatus c $ T.encodeUtf8 $ T.pack cmsg
              writeBS $ T.encodeUtf8 $ T.pack msg

throwUserError :: String -> IO a
throwUserError = ioError . userError

-- | Exports a simple aggregate service with specified name and the aggregate
--   type.
--
-- The name of the service can be chosen arbitrarily, however it must be
-- exactly referred from 'reportAggregate' and client requests to the service
-- because the URL of the internal HTTP server contains this.
--
-- The service is implemented via 'ngxExportSimpleServiceTyped' with
-- 'AggregateServerConf' as the name of its custom type. This is an
-- 'ignitionService' with an HTTP server based on the [Snap
-- framework](http://snapframework.com/) running inside. The internal HTTP
-- server collects data from worker processes on URL
-- /\/put\/__\<name_of_the_service\>__/ and reports data on URL
-- /\/get\/__\<name_of_the_service\>__/.
ngxExportAggregateService :: String       -- ^ Name of the service
                          -> Name         -- ^ Name of the aggregate type
                          -> Q [Dec]
ngxExportAggregateService f a = do
    let nameF = 'aggregateServer
        fName = mkName $ "aggregate_" ++ f
        sName = mkName $ "aggregate_storage_" ++ f
        uName = mkName $ "aggregate_url_" ++ f
    concat <$> sequence
        [sequence
            [sigD uName [t|ByteString|]
            ,funD uName [clause [] (normalB [|C8.pack f|]) []]
            ,sigD sName [t|Aggregate $(conT a)|]
            ,funD sName
                [clause []
                    (normalB [|unsafePerformIO $ newIORef (0, M.empty)|])
                    []
                ]
            ,pragInlD sName NoInline FunLike AllPhases
            ,sigD fName [t|AggregateServerConf -> Bool -> IO L.ByteString|]
            ,funD fName
                [clause []
                    (normalB [|$(varE nameF) $(varE sName) $(varE uName)|])
                    []
                ]
            ]
        -- FIXME: name AggregateServerConf must be imported from the user's
        -- module unqualified (see details in NgxExport/Tools.hs, function
        -- ngxExportSimpleService')!
        ,ngxExportSimpleServiceTyped
            fName ''AggregateServerConf SingleShotService
        ]
-- | Reports data to an aggregate server.
--
-- If reported data is 'Nothing' then the aggregated data won't alter, but the
-- timestamp associated with the PID of this worker process will be updated.
reportAggregate :: ToJSON a => Int          -- ^ Port of the aggregate server
                            -> Maybe a      -- ^ Reported data
                            -> ByteString   -- ^ Name of the aggregate server
                            -> IO ()
reportAggregate p v u =
    handle (const $ return () :: SomeException -> IO ()) $ do
        req <- parseRequest "POST http://127.0.0.1"
        pid <- fromIntegral <$> ngxPid :: IO Int32
        let !req' = req { requestBody = RequestBodyLBS $ encode (pid, v)
                        , port = p
                        , Network.HTTP.Client.path = B.append "put/" u
                        }
        void $ httpNoBody req' httpManager

httpManager :: Manager
httpManager = unsafePerformIO $ newManager defaultManagerSettings
{-# NOINLINE httpManager #-}