summaryrefslogtreecommitdiff
path: root/src/Network/AWS/Wolf/Act.hs
blob: 4356d8fb1e4d29f18b8356036b2c943cfd18d2e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
{-# LANGUAGE FlexibleContexts  #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}

-- | SWF Actor logic.
--
module Network.AWS.Wolf.Act
  ( act
  , actMain
  ) where

import Data.Aeson
import Data.Time
import Network.AWS.Wolf.Ctx
import Network.AWS.Wolf.File
import Network.AWS.Wolf.Prelude
import Network.AWS.Wolf.SWF
import Network.AWS.Wolf.Types
import System.Directory
import System.Exit
import System.Process

-- | S3 copy call.
--
cp :: MonadIO m => [FilePath] -> m ()
cp = liftIO . callProcess "aws" . (["s3", "cp", "--quiet", "--recursive"] <>)

-- | Key to download and upload objects from.
--
key :: MonadAmazonStore c m => m FilePath
key = do
  b <- view cBucket <$> view ccConf
  p <- view ascPrefix
  pure $ "s3:/" -/- textToString b -/- textToString p

-- | Download artifacts to the store input directory.
--
download :: MonadAmazonStore c m => FilePath -> [FilePath] -> m ()
download dir includes = do
  traceInfo "download" [ "dir" .= dir, "includes" .= includes ]
  let includes' = bool ([ "--exclude", "*" ] <> interleave (repeat "--include") includes) mempty $ null includes
  k <- key
  cp $ includes' <> [ k, dir ]

-- | Upload artifacts from the store output directory.
--
upload :: MonadAmazonStore c m => FilePath -> m ()
upload dir = do
  traceInfo "upload" [ "dir" .= dir ]
  k <- key
  cp [ dir, k ]

-- | callCommand wrapper that maybe returns an exception.
--
callCommand' :: MonadControl m => String -> m (Maybe SomeException)
callCommand' command =
  handle (pure . Just) $ do
    liftIO $ callCommand command
    pure Nothing

-- | Run command and maybe returns an exception.
--
run :: MonadStatsCtx c m => String -> m (Maybe SomeException)
run command =
  preCtx [ "command" .= command ] $ do
    traceInfo "begin" mempty
    e <- callCommand' command
    traceInfo "end" [ "exception" .= (displayException <$> e) ]
    pure e

-- | Check if quiesce file is present.
--
check :: MonadIO m => Maybe FilePath -> m Bool
check = maybe (pure False) (liftIO . doesFileExist)

-- | Actor logic - poll for work, download artifacts, run command, upload artifacts.
--
act :: MonadConf c m => Text -> Bool -> Bool -> [FilePath] -> String -> m ()
act queue nocopy local includes command =
  preConfCtx [ "label" .= LabelAct ] $
    runAmazonWorkCtx queue $ do
      traceInfo "poll" mempty
      t0 <- liftIO getCurrentTime
      (token, uid, input) <- pollActivity
      t1 <- liftIO getCurrentTime
      statsIncrement "wolf.act.poll.count" [ "queue" =. queue ]
      statsHistogram "wolf.act.poll.elapsed" (realToFrac (diffUTCTime t1 t0) :: Double) [ "queue" =. queue ]
      maybe_ token $ \token' ->
        maybe_ uid $ \uid' ->
          withCurrentWorkDirectory uid' nocopy local $ \wd ->
            runAmazonStoreCtx uid' $ do
              traceInfo "start" [ "dir" .= wd ]
              t2  <- liftIO getCurrentTime
              dd  <- dataDirectory wd
              sd  <- storeDirectory wd
              isd <- inputDirectory sd
              osd <- outputDirectory sd
              msd <- metaDirectory sd
              writeJson (dd </> "control.json") (Control uid')
              writeText (dd </> "input.json") input
              writeText (msd </> (textToString queue <> "_input.json")) input
              download isd includes
              e <- run command
              upload osd
              output <- readText (dd </> "output.json")
              writeText (msd </> (textToString queue <> "_output.json")) output
              maybe (completeActivity token' output) (const $ failActivity token') e
              t3 <- liftIO getCurrentTime
              traceInfo "finish" [ "dir" .= wd ]
              let status = textFromString $ maybe "complete" (const "fail") e
              statsIncrement "wolf.act.activity.count" [ "queue" =. queue, "status" =. status ]
              statsHistogram "wolf.act.activity.elapsed" (realToFrac (diffUTCTime t3 t2) :: Double) [ "queue" =. queue ]

-- | Run actor from main with config file.
--
actMain :: MonadControl m => FilePath -> Maybe FilePath -> Maybe Text -> Maybe Text -> Maybe Text -> Text -> Int -> Bool -> Bool -> [FilePath] -> String -> m ()
actMain cf quiesce domain bucket prefix queue num nocopy local includes command =
  runCtx $ runTop $ do
    conf <- readYaml cf
    let conf' = override cPrefix prefix $ override cBucket bucket $ override cDomain domain conf
    runConfCtx conf' $
      runConcurrent $ replicate num $ forever $ do
        ok <- check quiesce
        when ok $
          liftIO exitSuccess
        act queue nocopy local includes command