summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cleaner.hs42
-rw-r--r--Mailer.hs79
-rw-r--r--amazon-emailer.cabal18
3 files changed, 103 insertions, 36 deletions
diff --git a/Cleaner.hs b/Cleaner.hs
new file mode 100644
index 0000000..dd1287a
--- /dev/null
+++ b/Cleaner.hs
@@ -0,0 +1,42 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+import Control.Concurrent (threadDelay)
+import Control.Exception (bracket)
+
+import System.Exit (exitWith, ExitCode(..))
+import System.Environment (getArgs)
+import Data.Configurator (load, Worth(..), require, lookupDefault)
+
+import Data.String (fromString)
+
+import Database.PostgreSQL.Simple
+
+main :: IO ()
+main = do
+ args <- getArgs
+ if length args /= 1
+ then putStrLn "config file argument required" >> exitWith (ExitFailure 1)
+ else do
+ config <- load [Required (head args)]
+ host <- lookupDefault "127.0.0.1" config "host"
+ port <- lookupDefault 5432 config "port"
+ user <- require config "user"
+ pass <- require config "pass"
+ db <- require config "db"
+ cols <- require config "columns"
+ bracket (connect $ defaultConnectInfo { connectHost = host,
+ connectPort = port,
+ connectUser = user,
+ connectPassword = pass,
+ connectDatabase = db })
+ close
+ (runQueue cols)
+
+runQueue :: String -> Connection -> IO ()
+runQueue cols c = do
+ withTransaction c $ do
+ execute_ c (fromString $ "INSERT INTO amazon_email_archive SELECT " ++ cols ++ " FROM amazon_email_queue WHERE sent_at IS NOT NULL")
+ execute_ c "DELETE FROM amazon_email_queue WHERE sent_at IS NOT NULL"
+
+ threadDelay 60000000 -- clean once per minute
+ runQueue cols c
diff --git a/Mailer.hs b/Mailer.hs
index 91fd517..85dab77 100644
--- a/Mailer.hs
+++ b/Mailer.hs
@@ -1,7 +1,7 @@
{-# LANGUAGE OverloadedStrings #-}
import Control.Applicative
-import Control.Monad (forM, replicateM)
+import Control.Monad (forM, replicateM_)
import Control.Monad.Trans.Resource (runResourceT)
import Control.Concurrent (threadDelay)
import Control.Exception (bracket)
@@ -10,30 +10,45 @@ import Data.Text (Text)
import qualified Data.Text.Lazy as LT
import qualified Data.Text.Lazy.Encoding as LT
import qualified Data.Text.Encoding as T
+import Data.ByteString (ByteString)
import Data.Time.Clock (UTCTime, getCurrentTime)
+import System.Exit (exitWith, ExitCode(..))
+import System.Environment (getArgs)
+import Data.Configurator (load, Worth(..), require, lookupDefault)
+
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.FromRow
import Network.HTTP.Conduit (Manager(..), newManager, closeManager, conduitManagerSettings)
import Network.Mail.Mime (Mail(..), Address(..), Part(..), Encoding (QuotedPrintableText))
import Network.Mail.Mime.SES
-import Config (configHost, configUser, configPass,
- configDB, configAccessKey,
- configSecretKey, configLimit)
-
main :: IO ()
-main =
- bracket ((,) <$> (connect $ defaultConnectInfo { connectHost = configHost,
- connectUser = configUser,
- connectPassword = configPass,
- connectDatabase = configDB })
+main = do
+ args <- getArgs
+ if length args /= 1
+ then putStrLn "config file argument required" >> exitWith (ExitFailure 1)
+ else do
+ config <- load [Required (head args)]
+ host <- lookupDefault "127.0.0.1" config "host"
+ port <- lookupDefault 5432 config "port"
+ user <- require config "user"
+ pass <- require config "pass"
+ db <- require config "db"
+ accessKey <- require config "ses-access-key"
+ secretKey <- require config "ses-secret-key"
+ limit <- require config "limit"
+ bracket ((,) <$> (connect $ defaultConnectInfo { connectHost = host,
+ connectPort = port,
+ connectUser = user,
+ connectPassword = pass,
+ connectDatabase = db })
<*> (newManager conduitManagerSettings))
(\(c,m) -> close c >> closeManager m)
- (\(c,m) -> runQueue c m)
+ (\(c,m) -> runQueue limit accessKey secretKey c m)
-data AmEmail = AmEmail { aId :: Int, aTo :: Text, aToName :: Maybe Text, aFrom :: Text,
- aFromName :: Text, aSubject :: Text, aBody :: Text, aHtml :: Bool }
+data AmEmail = AmEmail { aId :: Int, aTo :: Text, aToName :: Maybe Text, aFrom :: Text,
+ aFromName :: Text, aSubject :: Text, aBody :: Text, aHtml :: Bool }
deriving (Show, Eq)
instance FromRow AmEmail where
@@ -41,10 +56,10 @@ instance FromRow AmEmail where
<*> field <*> field <*> field <*> field
-runQueue :: Connection -> Manager -> IO ()
-runQueue c m = do
+runQueue :: Int -> ByteString -> ByteString -> Connection -> Manager -> IO ()
+runQueue limit access secret c m = do
now <- getCurrentTime
- replicateM configLimit $ do
+ replicateM_ limit $ do
-- NOTE(dbp 2013-12-12): This query has all the magic in it: it grabs an email
-- off the queue, marks it as processing, and returns it. The nested query is to
-- do the limiting, and the seeming redundant 'and processing = false' is b/c
@@ -52,35 +67,35 @@ runQueue c m = do
-- result would just be to have no message found even if one existed in database.
email <- query_ c "update amazon_email_queue set processing = true where id = (select id from amazon_email_queue where sent_at is null and failed_count < 3 and processing = false order by date asc limit 1) and processing = false returning id, to_addr, to_name, from_addr, from_name, subject, body, html"
sent <- EX.catch
- (sendEmails m email)
- (\e -> do putStrLn $ show (e::EX.SomeException)
- execute c "update amazon_email_queue set failed_count = failed_count + 1, processing = false where id = ?"
+ (sendEmails access secret m email)
+ (\e -> do print (e::EX.SomeException)
+ execute c "update amazon_email_queue set failed_count = failed_count + 1, processing = false where id = ?"
(Only (aId $ head email))
return [])
- mapM (\i -> execute c "update amazon_email_queue set sent_at = ?, processing = false where id = ?"
+ mapM (\i -> execute c "update amazon_email_queue set sent_at = ?, processing = false where id = ?"
(now, i))
sent
-
+
threadDelay 1000000 -- note, this is a conservative processing of the queue, as we don't include
-- the time that is spent actually sending the emails.
- runQueue c m
+ runQueue limit access secret c m
-sendEmails :: Manager -> [AmEmail] -> IO [Int]
-sendEmails m es =
+sendEmails :: ByteString -> ByteString -> Manager -> [AmEmail] -> IO [Int]
+sendEmails access secret m es =
forM es (\e -> do
- runResourceT (renderSendMailSES m (mkSES e) (mkMail e))
+ runResourceT (renderSendMailSES m (mkSES access secret e) (mkMail e))
return (aId e))
-mkSES :: AmEmail -> SES
-mkSES (AmEmail _ to _ from _ _ _ _) = SES (T.encodeUtf8 from)
- [T.encodeUtf8 to]
- configAccessKey
- configSecretKey
+mkSES :: ByteString -> ByteString -> AmEmail -> SES
+mkSES access secret (AmEmail _ to _ from _ _ _ _) = SES (T.encodeUtf8 from)
+ [T.encodeUtf8 to]
+ access
+ secret
mkMail :: AmEmail -> Mail
-mkMail (AmEmail _ to tname from fname subj body html) =
+mkMail (AmEmail _ to tname from fname subj body html) =
Mail (Address (Just fname) from)
- [(Address tname to)]
+ [Address tname to]
[] -- CC
[] -- BCC
[ ("Subject", subj) ]
diff --git a/amazon-emailer.cabal b/amazon-emailer.cabal
index 10459a7..48bf459 100644
--- a/amazon-emailer.cabal
+++ b/amazon-emailer.cabal
@@ -1,5 +1,5 @@
name: amazon-emailer
-version: 0.3.0.1
+version: 0.4.0.0
synopsis: A queue daemon for Amazon's SES with a PostgreSQL table as a queue.
description: This application checks every second for messages in a queue table,
if there exist some that haven't been sent, it grabs
@@ -20,12 +20,22 @@ executable amazon-emailer
-- other-modules:
Build-depends:
base == 4.*,
- resourcet == 0.4.*,
+ resourcet == 1.*,
lifted-base == 0.2.*,
text == 0.11.*,
bytestring >= 0.9.1 && < 0.11,
postgresql-simple == 0.3.*,
- http-conduit == 2.*,
+ http-conduit == 2.1.*,
mime-mail == 0.4.*,
mime-mail-ses == 0.2.*,
- time >= 1.1 && < 1.5 \ No newline at end of file
+ time >= 1.1 && < 1.5,
+ configurator == 0.2.*
+
+executable amazon-emailer-queue-cleaner
+ main-is: Cleaner.hs
+ -- other-modules:
+ Build-depends:
+ base == 4.*,
+ lifted-base == 0.2.*,
+ postgresql-simple == 0.3.*,
+ configurator == 0.2.* \ No newline at end of file