summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrickBrisbin <>2018-12-05 20:23:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-12-05 20:23:00 (GMT)
commit814993ed32690ea821d6d8899544e3f521ed06e4 (patch)
treed2bae17f0c11b2447741fb5d759dccd53dd4c93e
version 1.0.0.0HEAD1.0.0.0master
-rw-r--r--CHANGELOG.md7
-rw-r--r--LICENSE19
-rw-r--r--README.lhs175
-rw-r--r--README.md175
-rw-r--r--examples/consumer/Main.hs25
-rw-r--r--examples/producer/Main.hs26
-rw-r--r--faktory.cabal152
-rw-r--r--library/Faktory/Client.hs218
-rw-r--r--library/Faktory/Connection.hs92
-rw-r--r--library/Faktory/Job.hs124
-rw-r--r--library/Faktory/Prelude.hs23
-rw-r--r--library/Faktory/Protocol.hs78
-rw-r--r--library/Faktory/Settings.hs67
-rw-r--r--library/Faktory/Worker.hs102
-rw-r--r--tests/Faktory/ConnectionSpec.hs108
-rw-r--r--tests/FaktorySpec.hs46
-rw-r--r--tests/Spec.hs2
17 files changed, 1439 insertions, 0 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..005cf85
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,7 @@
+## [*Unreleased*](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.0.0.0...master)
+
+None
+
+## [v1.0.0.0](https://github.com/frontrowed/faktory_worker_haskell/tree/v1.0.0.0)
+
+Initial release.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..58143ac
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2018 Freckle Education <engineering@freckle.com>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/README.lhs b/README.lhs
new file mode 100644
index 0000000..2adc617
--- /dev/null
+++ b/README.lhs
@@ -0,0 +1,175 @@
+# faktory\_worker\_haskell
+
+[![CircleCI](https://circleci.com/gh/frontrowed/faktory_worker_haskell.svg?style=svg)](https://circleci.com/gh/frontrowed/faktory_worker_haskell)
+
+Haskell client and worker process for the Faktory background job server.
+
+Architecture overview from [Ruby client README](https://github.com/contribsys/faktory_worker_ruby#readme):
+
+```
+ +--------------------+
+ | |
+ | Faktory |
+ | Server |
+ +---------->>>>| +>>>>--------+
+ | | | |
+ | | | |
+ | +--------------------+ |
++-----------------+ +-------------------+
+| | | |
+| Client | | Worker |
+| pushes | | pulls |
+| jobs | | jobs |
+| | | |
+| | | |
++-----------------+ +-------------------+
+```
+
+- Client - an API any process can use to push jobs to the Faktory server.
+- Worker - a process that pulls jobs from Faktory and executes them.
+- Server - the Faktory daemon which stores background jobs in queues to be
+ processed by Workers.
+
+This package contains only the client and worker parts. The server part is
+[here](https://github.com/contribsys/faktory/)
+
+## Installation
+
+TODO.
+
+## Documentation
+
+See the [wiki](//github.com/contribsys/faktory_worker_ruby/wiki) for more
+details.
+
+## Usage
+
+<!--
+```haskell
+import Data.Aeson
+import Prelude
+import Faktory.Client
+import Faktory.Job
+import Faktory.Settings
+import Faktory.Worker
+import GHC.Generics
+
+{- Don't actually run anything -}
+main :: IO ()
+main = if True then pure () else (workerMain >> clientMain)
+workerMain :: IO ()
+clientMain :: IO ()
+```
+-->
+
+### Job
+
+Any value can be a "Job" that is pushed and pulled to and from Faktory via its
+`ToJSON` and `FromJSON` instances:
+
+```haskell
+newtype MyJob = MyJob
+ { myJobMessage :: String
+ }
+ deriving (Generic)
+
+instance ToJSON MyJob
+instance FromJSON MyJob
+```
+
+### Worker
+
+```haskell
+workerMain = do
+ settings <- envSettings
+
+ runWorker settings $ \job ->
+ -- Process your Job here
+ putStrLn $ myJobMessage job
+
+ -- If any exception is thrown, the job will be marked as Failed in Faktory
+ -- and retried. Note: you will not otherwise hear about any such exceptions,
+ -- unless you catch-and-rethrow them yourself.
+```
+
+### Client
+
+```haskell
+clientMain = do
+ settings <- envSettings
+ client <- newClient settings Nothing -- N.B. A WorkerId is not necessary if
+ -- only pushing Jobs.
+
+ jobId <- perform mempty client $ MyJob "Hello world"
+
+ print jobId
+
+ closeClient client
+```
+
+### Configuration
+
+When using `envSettings`, the following variables will be used:
+
+- `FAKTORY_QUEUE`: the name of the queue to consume from. This is Worker-only,
+ for `perform`, a non-default Queue should be given by the `queue` option
+- `FAKTORY_PROVIDER`: the name of another environment variable where the
+ connection string can be found. Defaults to `FAKTORY_URL`.
+- `FAKTORY_URL` (or whatever you named in `FAKTORY_PROVIDER`): connection string
+ to the Faktory server. Format is `tcp(+tls)://(:password@)host:port`. Defaults
+ to `tcp://localhost:4719`.
+
+## Examples
+
+See the [examples](./examples). To run them:
+
+1. Run a local Faktory server
+
+ ```console
+ docker run --rm \
+ --publish 7419:7419 \
+ --publish 7420:7420 \
+ contribsys/faktory
+ ```
+
+1. Run the consumer example
+
+ ```console
+ % stack exec faktory-example-consumer
+ Starting consumer loop
+ ```
+
+ (Assumes you've built the project.)
+
+1. Submit a Job through the producer example
+
+ ```console
+ % stack exec faktory-example-producer hello world
+ Pushed job: "ljcjlbexbgun"
+ ```
+
+ *NOTE*: if you submit "BOOM" as a Job, the processing loop will raise an
+ exception, so you can see how a Failed Job looks in Faktory.
+
+1. See that your Job was processed back in the consumer
+
+ ```console
+ % stack exec faktory-example-consumer
+ Starting consumer loop
+ hello world
+ ```
+
+## Development & Tests
+
+```console
+stack build --dependencies-only --test --no-run-tests
+stack build --pedantic --test --no-run-tests
+stack build --pedantic --test
+```
+
+*NOTE*: `FactorySpec` requires a local Faktory server is running, and it will
+flush all Jobs from this server as part of running the tests.
+
+---
+
+[CHANGELOG](./CHANGELOG.md) | [LICENSE](./LICENSE)
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..2adc617
--- /dev/null
+++ b/README.md
@@ -0,0 +1,175 @@
+# faktory\_worker\_haskell
+
+[![CircleCI](https://circleci.com/gh/frontrowed/faktory_worker_haskell.svg?style=svg)](https://circleci.com/gh/frontrowed/faktory_worker_haskell)
+
+Haskell client and worker process for the Faktory background job server.
+
+Architecture overview from [Ruby client README](https://github.com/contribsys/faktory_worker_ruby#readme):
+
+```
+ +--------------------+
+ | |
+ | Faktory |
+ | Server |
+ +---------->>>>| +>>>>--------+
+ | | | |
+ | | | |
+ | +--------------------+ |
++-----------------+ +-------------------+
+| | | |
+| Client | | Worker |
+| pushes | | pulls |
+| jobs | | jobs |
+| | | |
+| | | |
++-----------------+ +-------------------+
+```
+
+- Client - an API any process can use to push jobs to the Faktory server.
+- Worker - a process that pulls jobs from Faktory and executes them.
+- Server - the Faktory daemon which stores background jobs in queues to be
+ processed by Workers.
+
+This package contains only the client and worker parts. The server part is
+[here](https://github.com/contribsys/faktory/)
+
+## Installation
+
+TODO.
+
+## Documentation
+
+See the [wiki](//github.com/contribsys/faktory_worker_ruby/wiki) for more
+details.
+
+## Usage
+
+<!--
+```haskell
+import Data.Aeson
+import Prelude
+import Faktory.Client
+import Faktory.Job
+import Faktory.Settings
+import Faktory.Worker
+import GHC.Generics
+
+{- Don't actually run anything -}
+main :: IO ()
+main = if True then pure () else (workerMain >> clientMain)
+workerMain :: IO ()
+clientMain :: IO ()
+```
+-->
+
+### Job
+
+Any value can be a "Job" that is pushed and pulled to and from Faktory via its
+`ToJSON` and `FromJSON` instances:
+
+```haskell
+newtype MyJob = MyJob
+ { myJobMessage :: String
+ }
+ deriving (Generic)
+
+instance ToJSON MyJob
+instance FromJSON MyJob
+```
+
+### Worker
+
+```haskell
+workerMain = do
+ settings <- envSettings
+
+ runWorker settings $ \job ->
+ -- Process your Job here
+ putStrLn $ myJobMessage job
+
+ -- If any exception is thrown, the job will be marked as Failed in Faktory
+ -- and retried. Note: you will not otherwise hear about any such exceptions,
+ -- unless you catch-and-rethrow them yourself.
+```
+
+### Client
+
+```haskell
+clientMain = do
+ settings <- envSettings
+ client <- newClient settings Nothing -- N.B. A WorkerId is not necessary if
+ -- only pushing Jobs.
+
+ jobId <- perform mempty client $ MyJob "Hello world"
+
+ print jobId
+
+ closeClient client
+```
+
+### Configuration
+
+When using `envSettings`, the following variables will be used:
+
+- `FAKTORY_QUEUE`: the name of the queue to consume from. This is Worker-only,
+ for `perform`, a non-default Queue should be given by the `queue` option
+- `FAKTORY_PROVIDER`: the name of another environment variable where the
+ connection string can be found. Defaults to `FAKTORY_URL`.
+- `FAKTORY_URL` (or whatever you named in `FAKTORY_PROVIDER`): connection string
+ to the Faktory server. Format is `tcp(+tls)://(:password@)host:port`. Defaults
+ to `tcp://localhost:4719`.
+
+## Examples
+
+See the [examples](./examples). To run them:
+
+1. Run a local Faktory server
+
+ ```console
+ docker run --rm \
+ --publish 7419:7419 \
+ --publish 7420:7420 \
+ contribsys/faktory
+ ```
+
+1. Run the consumer example
+
+ ```console
+ % stack exec faktory-example-consumer
+ Starting consumer loop
+ ```
+
+ (Assumes you've built the project.)
+
+1. Submit a Job through the producer example
+
+ ```console
+ % stack exec faktory-example-producer hello world
+ Pushed job: "ljcjlbexbgun"
+ ```
+
+ *NOTE*: if you submit "BOOM" as a Job, the processing loop will raise an
+ exception, so you can see how a Failed Job looks in Faktory.
+
+1. See that your Job was processed back in the consumer
+
+ ```console
+ % stack exec faktory-example-consumer
+ Starting consumer loop
+ hello world
+ ```
+
+## Development & Tests
+
+```console
+stack build --dependencies-only --test --no-run-tests
+stack build --pedantic --test --no-run-tests
+stack build --pedantic --test
+```
+
+*NOTE*: `FactorySpec` requires a local Faktory server is running, and it will
+flush all Jobs from this server as part of running the tests.
+
+---
+
+[CHANGELOG](./CHANGELOG.md) | [LICENSE](./LICENSE)
diff --git a/examples/consumer/Main.hs b/examples/consumer/Main.hs
new file mode 100644
index 0000000..7b83da4
--- /dev/null
+++ b/examples/consumer/Main.hs
@@ -0,0 +1,25 @@
+module Main (main) where
+
+import Prelude
+
+import Control.Exception.Safe
+import Data.Aeson
+import Faktory.Settings
+import Faktory.Worker
+import GHC.Generics
+
+-- | Must match examples/producer
+newtype Job = Job { jobMessage :: String }
+ deriving Generic
+instance FromJSON Job
+
+main :: IO ()
+main = do
+ putStrLn "Starting consumer loop"
+ settings <- envSettings
+ runWorker settings $ \job -> do
+ let message = jobMessage job
+
+ if message == "BOOM"
+ then throwString "Producer exception: BOOM"
+ else putStrLn message
diff --git a/examples/producer/Main.hs b/examples/producer/Main.hs
new file mode 100644
index 0000000..4b337e5
--- /dev/null
+++ b/examples/producer/Main.hs
@@ -0,0 +1,26 @@
+module Main (main) where
+
+import Prelude
+
+import Control.Exception.Safe
+import Data.Aeson
+import Data.Semigroup ((<>))
+import Faktory.Client
+import Faktory.Job (perform)
+import Faktory.Settings
+import GHC.Generics
+import System.Environment (getArgs)
+
+-- | Must match examples/consumer
+newtype Job = Job { jobMessage :: String }
+ deriving Generic
+instance ToJSON Job
+
+main :: IO ()
+main = do
+ settings <- envSettings
+ bracket (newClient settings Nothing) closeClient $ \client -> do
+ args <- getArgs
+ jobId <- perform mempty client Job {jobMessage = unwords args}
+
+ putStrLn $ "Pushed job: " <> show jobId
diff --git a/faktory.cabal b/faktory.cabal
new file mode 100644
index 0000000..c6ca280
--- /dev/null
+++ b/faktory.cabal
@@ -0,0 +1,152 @@
+cabal-version: 1.18
+
+-- This file has been generated from package.yaml by hpack version 0.31.1.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: 5b58d182a42e13721957b2b5e0234d010b79698c8ed5197f94a5a14d5114964c
+
+name: faktory
+version: 1.0.0.0
+synopsis: Faktory Worker for Haskell
+description: Haskell client and worker process for the Faktory background job server.
+ .
+ == Architecture overview
+ .
+ @
+ | +--------------------+
+ | | |
+ | | Faktory |
+ | | Server |
+ | +---------->>>>| +>>>>--------+
+ | | | | |
+ | | | | |
+ | | +--------------------+ |
+ | +-----------------+ +-------------------+
+ | | | | |
+ | | Client | | Worker |
+ | | pushes | | pulls |
+ | | jobs | | jobs |
+ | | | | |
+ | | | | |
+ | +-----------------+ +-------------------+
+ @
+ .
+ * `Client` - an API any process can use to push jobs to the Faktory server.
+ * `Worker` - a process that pulls jobs from Faktory and executes them.
+ * `Server` - the Faktory daemon which stores background jobs in queues to be processed by Workers.
+ .
+ This package contains only the `Client` and `Worker`.
+category: Network
+homepage: https://github.com/frontrowed/faktory_worker_haskell#readme
+bug-reports: https://github.com/frontrowed/faktory_worker_haskell/issues
+author: Freckle Engineering
+maintainer: engineering@freckle.com
+copyright: 2018 Freckle Education
+license: MIT
+license-file: LICENSE
+tested-with: GHC==8.4.3
+build-type: Simple
+extra-doc-files:
+ CHANGELOG.md
+ README.md
+
+source-repository head
+ type: git
+ location: https://github.com/frontrowed/faktory_worker_haskell
+
+library
+ exposed-modules:
+ Faktory.Client
+ Faktory.Connection
+ Faktory.Job
+ Faktory.Prelude
+ Faktory.Protocol
+ Faktory.Settings
+ Faktory.Worker
+ other-modules:
+ Paths_faktory
+ hs-source-dirs:
+ library
+ default-extensions: BangPatterns DeriveAnyClass DeriveFoldable DeriveFunctor DeriveGeneric DeriveLift DeriveTraversable DerivingStrategies FlexibleContexts FlexibleInstances GADTs GeneralizedNewtypeDeriving LambdaCase MultiParamTypeClasses NoImplicitPrelude NoMonomorphismRestriction OverloadedStrings RankNTypes RecordWildCards ScopedTypeVariables StandaloneDeriving TypeApplications TypeFamilies
+ ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-missed-specialisations -Wno-all-missed-specialisations -Wno-unsafe -Wno-safe -Wno-missing-local-signatures -Wno-monomorphism-restriction -Wno-missing-import-lists
+ build-depends:
+ aeson >=1.3 && <2
+ , aeson-casing >=0.1 && <1
+ , base >=4.11 && <5
+ , bytestring >=0.1 && <1
+ , connection >=0.2 && <1
+ , cryptonite >=0.2 && <1
+ , megaparsec >=6.5 && <7
+ , memory >=0.1 && <1
+ , network >=2.6 && <3
+ , random >=1.1 && <2
+ , safe-exceptions >=0.1 && <1
+ , scanner >=0.2 && <1
+ , text >=1.2 && <2
+ , time >=1.8 && <2
+ , unix >=2.7 && <3
+ default-language: Haskell2010
+
+executable faktory-example-consumer
+ main-is: Main.hs
+ other-modules:
+ Paths_faktory
+ hs-source-dirs:
+ examples/consumer
+ default-extensions: BangPatterns DeriveAnyClass DeriveFoldable DeriveFunctor DeriveGeneric DeriveLift DeriveTraversable DerivingStrategies FlexibleContexts FlexibleInstances GADTs GeneralizedNewtypeDeriving LambdaCase MultiParamTypeClasses NoImplicitPrelude NoMonomorphismRestriction OverloadedStrings RankNTypes RecordWildCards ScopedTypeVariables StandaloneDeriving TypeApplications TypeFamilies
+ ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-missed-specialisations -Wno-all-missed-specialisations -Wno-unsafe -Wno-safe -Wno-missing-local-signatures -Wno-monomorphism-restriction -Wno-missing-import-lists
+ build-depends:
+ aeson
+ , base >=4.11 && <5
+ , faktory
+ , safe-exceptions
+ default-language: Haskell2010
+
+executable faktory-example-producer
+ main-is: Main.hs
+ other-modules:
+ Paths_faktory
+ hs-source-dirs:
+ examples/producer
+ default-extensions: BangPatterns DeriveAnyClass DeriveFoldable DeriveFunctor DeriveGeneric DeriveLift DeriveTraversable DerivingStrategies FlexibleContexts FlexibleInstances GADTs GeneralizedNewtypeDeriving LambdaCase MultiParamTypeClasses NoImplicitPrelude NoMonomorphismRestriction OverloadedStrings RankNTypes RecordWildCards ScopedTypeVariables StandaloneDeriving TypeApplications TypeFamilies
+ ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-missed-specialisations -Wno-all-missed-specialisations -Wno-unsafe -Wno-safe -Wno-missing-local-signatures -Wno-monomorphism-restriction -Wno-missing-import-lists
+ build-depends:
+ aeson
+ , base >=4.11 && <5
+ , faktory
+ , safe-exceptions
+ default-language: Haskell2010
+
+test-suite hspec
+ type: exitcode-stdio-1.0
+ main-is: Spec.hs
+ other-modules:
+ Faktory.ConnectionSpec
+ FaktorySpec
+ Paths_faktory
+ hs-source-dirs:
+ tests
+ default-extensions: BangPatterns DeriveAnyClass DeriveFoldable DeriveFunctor DeriveGeneric DeriveLift DeriveTraversable DerivingStrategies FlexibleContexts FlexibleInstances GADTs GeneralizedNewtypeDeriving LambdaCase MultiParamTypeClasses NoImplicitPrelude NoMonomorphismRestriction OverloadedStrings RankNTypes RecordWildCards ScopedTypeVariables StandaloneDeriving TypeApplications TypeFamilies
+ ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-missed-specialisations -Wno-all-missed-specialisations -Wno-unsafe -Wno-safe -Wno-missing-local-signatures -Wno-monomorphism-restriction -Wno-missing-import-lists -rtsopts
+ build-depends:
+ base >=4.11 && <5
+ , faktory
+ , hspec
+ default-language: Haskell2010
+
+test-suite readme
+ type: exitcode-stdio-1.0
+ main-is: README.lhs
+ other-modules:
+ Paths_faktory
+ hs-source-dirs:
+ ./.
+ default-extensions: BangPatterns DeriveAnyClass DeriveFoldable DeriveFunctor DeriveGeneric DeriveLift DeriveTraversable DerivingStrategies FlexibleContexts FlexibleInstances GADTs GeneralizedNewtypeDeriving LambdaCase MultiParamTypeClasses NoImplicitPrelude NoMonomorphismRestriction OverloadedStrings RankNTypes RecordWildCards ScopedTypeVariables StandaloneDeriving TypeApplications TypeFamilies
+ ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-missed-specialisations -Wno-all-missed-specialisations -Wno-unsafe -Wno-safe -Wno-missing-local-signatures -Wno-monomorphism-restriction -Wno-missing-import-lists -pgmL markdown-unlit
+ build-depends:
+ aeson
+ , base
+ , faktory
+ , markdown-unlit
+ default-language: Haskell2010
diff --git a/library/Faktory/Client.hs b/library/Faktory/Client.hs
new file mode 100644
index 0000000..b734d99
--- /dev/null
+++ b/library/Faktory/Client.hs
@@ -0,0 +1,218 @@
+module Faktory.Client
+ (
+ -- * Client operations
+ Client
+ , newClient
+ , closeClient
+
+ -- * High-level Job operations
+ , pushJob
+ , flush
+
+ -- * High-level Client API
+ , command_
+ , commandOK
+ , commandJSON
+ ) where
+
+import Faktory.Prelude
+
+import Control.Concurrent.MVar
+import Crypto.Hash (Digest, SHA256(..), hashWith)
+import Data.Aeson
+import Data.ByteArray (ByteArrayAccess)
+import Data.ByteString.Lazy (ByteString, fromStrict)
+import qualified Data.ByteString.Lazy.Char8 as BSL8
+import Data.Text (Text)
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as T
+import Faktory.Connection (connect)
+import Faktory.Protocol
+import Faktory.Settings
+import GHC.Stack
+import Network.Connection
+import Network.Socket (HostName)
+import System.Posix.Process (getProcessID)
+
+data Client = Client
+ { clientConnection :: MVar Connection
+ , clientSettings :: Settings
+ }
+
+-- | <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#initial-handshake>
+data HiPayload = HiPayload
+ { hiVersion :: Int
+ , hiNonce :: Maybe Text
+ , hiIterations :: Maybe Int
+ }
+
+instance FromJSON HiPayload where
+ parseJSON = withObject "HiPayload" $ \o ->
+ HiPayload
+ <$> o .: "v"
+ <*> o .:? "s"
+ <*> o .:? "i"
+
+data HelloPayload = HelloPayload
+ { helloWorkerId :: Maybe WorkerId
+ , helloHostname :: HostName
+ , helloProcessId :: Integer -- TODO: Orphan ToJSON ProcessID
+ , helloLabels :: [Text]
+ , helloVersion :: Int
+ , helloPasswordHash :: Maybe Text
+ }
+
+instance ToJSON HelloPayload where
+ toJSON HelloPayload{..} =
+ object
+ [ "wid" .= helloWorkerId
+ , "hostname" .= helloHostname
+ , "pid" .= helloProcessId
+ , "labels" .= helloLabels
+ , "v" .= helloVersion
+ , "pwdhash" .= helloPasswordHash
+ ]
+ toEncoding HelloPayload{..} =
+ pairs $ mconcat
+ [ "wid" .= helloWorkerId
+ , "hostname" .= helloHostname
+ , "pid" .= helloProcessId
+ , "labels" .= helloLabels
+ , "v" .= helloVersion
+ , "pwdhash" .= helloPasswordHash
+ ]
+
+-- | Open a new @'Client'@ connection with the given @'Settings'@
+newClient :: HasCallStack => Settings -> Maybe WorkerId -> IO Client
+newClient settings@Settings {..} mWorkerId =
+ bracketOnError (connect settingsConnection) connectionClose $ \conn -> do
+ client <- Client <$> newMVar conn <*> pure settings
+
+ greeting <-
+ fromJustThrows "Unexpected end of HI message" =<< recvUnsafe settings conn
+ stripped <-
+ fromJustThrows ("Missing HI prefix: " <> show greeting)
+ $ BSL8.stripPrefix "HI" greeting
+ HiPayload {..} <-
+ fromJustThrows ("Failed to parse HI payload: " <> show stripped)
+ $ decode stripped
+
+ when (hiVersion > expectedProtocolVersion) $ settingsLogError $ concat
+ [ "Server's protocol version "
+ , show hiVersion
+ , " higher than client's expected protocol version "
+ , show expectedProtocolVersion
+ ]
+
+ let
+ mPassword = connectionInfoPassword settingsConnection
+ mHashedPassword = hashPassword <$> hiNonce <*> hiIterations <*> mPassword
+
+ helloPayload <-
+ HelloPayload mWorkerId (show . fst $ connectionID conn)
+ <$> (toInteger <$> getProcessID)
+ <*> pure ["haskell"]
+ <*> pure expectedProtocolVersion
+ <*> pure mHashedPassword
+
+ commandOK client "HELLO" [encode helloPayload]
+ pure client
+ where fromJustThrows message = maybe (throwString message) pure
+
+-- | Close a @'Client'@
+closeClient :: Client -> IO ()
+closeClient Client {..} = withMVar clientConnection $ \conn -> do
+ sendUnsafe clientSettings conn "END" []
+ connectionClose conn
+
+-- | Push a Job to the Server
+pushJob :: (HasCallStack, ToJSON a) => Client -> a -> IO ()
+pushJob client job = commandOK client "PUSH" [encode job]
+
+-- | Clear all job data in the Faktory server
+--
+-- Use with caution!
+--
+flush :: HasCallStack => Client -> IO ()
+flush client = commandOK client "FLUSH" []
+
+-- | Send a command, read and discard the response
+command_ :: Client -> ByteString -> [ByteString] -> IO ()
+command_ Client {..} cmd args = withMVar clientConnection $ \conn -> do
+ sendUnsafe clientSettings conn cmd args
+ void $ recvUnsafe clientSettings conn
+
+-- | Send a command, assert the response is @OK@
+commandOK :: HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
+commandOK Client {..} cmd args = withMVar clientConnection $ \conn -> do
+ sendUnsafe clientSettings conn cmd args
+ response <- recvUnsafe clientSettings conn
+ unless (response == Just "OK") $ throwString "Server not OK"
+
+-- | Send a command, parse the response as JSON
+commandJSON
+ :: FromJSON a
+ => Client
+ -> ByteString
+ -> [ByteString]
+ -> IO (Either String (Maybe a))
+commandJSON Client {..} cmd args = withMVar clientConnection $ \conn -> do
+ sendUnsafe clientSettings conn cmd args
+ mByteString <- recvUnsafe clientSettings conn
+ pure $ traverse eitherDecode mByteString
+
+-- | Send a command to the Server socket
+--
+-- Do not use outside of @'withMVar'@, this is not threadsafe.
+--
+sendUnsafe :: Settings -> Connection -> ByteString -> [ByteString] -> IO ()
+sendUnsafe Settings {..} conn cmd args = do
+ let bs = BSL8.unwords (cmd : args)
+ settingsLogDebug $ "> " <> show bs
+ void . connectionPut conn . BSL8.toStrict $ bs <> "\n"
+
+-- | Receive data from the Server socket
+--
+-- Do not use outside of @'withMVar'@, this is not threadsafe.
+--
+recvUnsafe :: Settings -> Connection -> IO (Maybe ByteString)
+recvUnsafe Settings {..} conn = do
+ eByteString <- readReply $ connectionGet conn 4096
+ settingsLogDebug $ "< " <> show eByteString
+
+ case eByteString of
+ Left err -> do
+ settingsLogError err
+ pure Nothing
+ Right mByteString -> pure $ fromStrict <$> mByteString
+
+-- | Iteratively apply a function @n@ times
+--
+-- This is like @iterate f s !! n@ but strict in @s@
+--
+times :: Int -> (s -> s) -> s -> s
+times n f !s
+ | n <= 0 = s
+ | otherwise = times (n - 1) f (f s)
+
+-- | Hash password using provided @nonce@ for @n@ iterations
+hashPassword :: Text -> Int -> String -> Text
+hashPassword nonce n password =
+ T.pack
+ . show
+ . times (n - 1) hash
+ . hash
+ . T.encodeUtf8
+ $ T.pack password
+ <> nonce
+ where
+ -- Note that we use hash at two different types above.
+ --
+ -- 1. hash :: ByteString -> Digest SHA256
+ -- 2. hash :: Digest SHA256 -> Digest SHA256
+ hash :: (ByteArrayAccess b) => b -> Digest SHA256
+ hash = hashWith SHA256
+
+-- | Protocol version the client expects
+expectedProtocolVersion :: Int
+expectedProtocolVersion = 2
diff --git a/library/Faktory/Connection.hs b/library/Faktory/Connection.hs
new file mode 100644
index 0000000..54d0690
--- /dev/null
+++ b/library/Faktory/Connection.hs
@@ -0,0 +1,92 @@
+module Faktory.Connection
+ ( ConnectionInfo(..)
+ , defaultConnectionInfo
+ , envConnectionInfo
+ , connect
+ ) where
+
+import Faktory.Prelude
+
+import Data.Maybe (fromMaybe)
+import Data.Void
+import Network.Connection
+import Network.Socket (HostName, PortNumber)
+import System.Environment (lookupEnv)
+import Text.Megaparsec
+import Text.Megaparsec.Char
+
+data ConnectionInfo = ConnectionInfo
+ { connectionInfoTls :: Bool
+ , connectionInfoPassword :: Maybe String
+ , connectionInfoHostName :: HostName
+ , connectionInfoPort :: PortNumber
+ }
+ deriving (Eq, Show)
+
+defaultConnectionInfo :: ConnectionInfo
+defaultConnectionInfo = ConnectionInfo
+ { connectionInfoTls = False
+ , connectionInfoPassword = Nothing
+ , connectionInfoHostName = "localhost"
+ , connectionInfoPort = 7419
+ }
+
+-- | Parse a @'Connection'@ from environment variables
+--
+-- > FAKTORY_PROVIDER=FAKTORY_URL
+-- > FAKTORY_URL=tcp://:my-password@localhost:7419
+--
+-- Supported format is @tcp(+tls):\/\/(:password@)host:port@.
+--
+-- See <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#url-configuration>.
+--
+envConnectionInfo :: IO ConnectionInfo
+envConnectionInfo = do
+ providerString <- fromMaybe "FAKTORY_URL" <$> lookupEnv "FAKTORY_PROVIDER"
+ provider <- parseThrow parseProvider "FAKTORY_PROVIDER" providerString
+ connectionString <- fromMaybe "tcp://localhost:7419" <$> lookupEnv provider
+ parseThrow parseConnection provider connectionString
+
+-- | Connect to the given @'Connection'@ as a @'Socket'@
+connect :: ConnectionInfo -> IO Connection
+connect ConnectionInfo {..} = bracketOnError open connectionClose pure
+ where
+ open = do
+ ctx <- initConnectionContext
+ connectTo ctx $ ConnectionParams
+ { connectionHostname = connectionInfoHostName
+ , connectionPort = connectionInfoPort
+ , connectionUseSecure = if connectionInfoTls
+ then Just TLSSettingsSimple
+ { settingDisableCertificateValidation = False
+ , settingDisableSession = False
+ , settingUseServerName = False
+ }
+ else Nothing
+ , connectionUseSocks = Nothing
+ }
+
+type Parser = Parsec Void String
+
+parseThrow :: Parser a -> String -> String -> IO a
+parseThrow parser name value = either err pure $ parse parser name value
+ where
+ err ex = throwIO . userError $ unlines
+ [ ""
+ , "\"" <> value <> "\" is an invalid value for " <> name <> ":"
+ , parseErrorPretty ex
+ ]
+
+parseProvider :: Parser String
+parseProvider =
+ some (upperChar <|> char '_') <?> "an environment variable name"
+
+parseConnection :: Parser ConnectionInfo
+parseConnection = go <?> "tcp(+tls)://(:<password>@)<host>:<port>"
+ where
+ go =
+ ConnectionInfo
+ <$> (False <$ string "tcp://" <|> True <$ string "tcp+tls://")
+ <*> optional (char ':' *> manyTill anyChar (char '@'))
+ <*> manyTill anyChar (char ':')
+ <*> (read <$> some digitChar)
diff --git a/library/Faktory/Job.hs b/library/Faktory/Job.hs
new file mode 100644
index 0000000..eea3375
--- /dev/null
+++ b/library/Faktory/Job.hs
@@ -0,0 +1,124 @@
+module Faktory.Job
+ ( Job
+ , JobId
+ , perform
+ , retry
+ , once
+ , queue
+ , jobtype
+ , at
+ , in_
+ , newJob
+ , jobJid
+ , jobArg
+ ) where
+
+import Faktory.Prelude
+
+import Data.Aeson
+import Data.Aeson.Casing
+import Data.List.NonEmpty (NonEmpty)
+import qualified Data.List.NonEmpty as NE
+import Data.Time
+import Faktory.Client (Client, pushJob)
+import Faktory.Settings (Queue)
+import GHC.Generics
+import GHC.Stack
+import System.Random
+
+data Job arg = Job
+ { jobJid :: JobId
+ , jobJobtype :: String
+ , jobArgs :: NonEmpty arg
+ -- ^ Faktory needs to serialize args as a list, but we like a single-argument
+ -- interface so that's what we expose. See @'jobArg'@.
+ , jobRetry :: Maybe Int
+ , jobQueue :: Maybe Queue
+ , jobAt :: Maybe UTCTime
+ }
+ deriving Generic
+
+-- | Individual changes to a @'Job'@ to be 'perform'ed
+data JobUpdate
+ = SetRetry Int
+ | SetQueue Queue
+ | SetJobtype String
+ | SetAt UTCTime
+ | SetIn NominalDiffTime
+
+newtype JobOptions = JobOptions [JobUpdate]
+ deriving newtype (Semigroup, Monoid)
+
+-- | Perform a Job with the given options
+--
+-- @
+-- 'perform' 'mempty' SomeJob
+-- 'perform' ('queue' "SomeQueue") SomeJob
+-- 'perform' 'once' SomeJob
+-- 'perform' ('at' someTime <> 'once') SomeJob
+-- 'perform' ('in_' 10 <> 'once') SomeJob
+-- 'perform' ('in_' 10 <> 'retry' 3) SomeJob
+-- @
+--
+perform :: (HasCallStack, ToJSON arg) => JobOptions -> Client -> arg -> IO JobId
+perform options client arg = do
+ job <- applyOptions options =<< newJob arg
+ jobJid job <$ pushJob client job
+
+applyOptions :: JobOptions -> Job arg -> IO (Job arg)
+applyOptions (JobOptions patches) = go patches
+ where
+ go [] job = pure job
+ go (set : sets) job = case set of
+ SetRetry n -> go sets $ job { jobRetry = Just n }
+ SetQueue q -> go sets $ job { jobQueue = Just q }
+ SetJobtype jt -> go sets $ job { jobJobtype = jt }
+ SetAt time -> go sets $ job { jobAt = Just time }
+ SetIn diff -> do
+ now <- getCurrentTime
+ go sets $ job { jobAt = Just $ addUTCTime diff now }
+
+retry :: Int -> JobOptions
+retry n = JobOptions [SetRetry n]
+
+-- | Equivalent to @'retry' (-1)@: no retries, and move to Dead on failure
+once :: JobOptions
+once = retry (-1)
+
+queue :: Queue -> JobOptions
+queue q = JobOptions [SetQueue q]
+
+jobtype :: String -> JobOptions
+jobtype jt = JobOptions [SetJobtype jt]
+
+at :: UTCTime -> JobOptions
+at t = JobOptions [SetAt t]
+
+in_ :: NominalDiffTime -> JobOptions
+in_ i = JobOptions [SetIn i]
+
+newJob :: arg -> IO (Job arg)
+newJob arg = do
+ -- Ruby uses 12 random hex
+ jobId <- take 12 . randomRs ('a', 'z') <$> newStdGen
+
+ pure Job
+ { jobJid = jobId
+ , jobJobtype = "Default"
+ , jobArgs = pure arg
+ , jobRetry = Nothing
+ , jobQueue = Nothing
+ , jobAt = Nothing
+ }
+
+jobArg :: Job arg -> arg
+jobArg Job {..} = NE.head jobArgs
+
+instance ToJSON args => ToJSON (Job args) where
+ toJSON = genericToJSON $ aesonPrefix snakeCase
+ toEncoding = genericToEncoding $ aesonPrefix snakeCase
+
+instance FromJSON args => FromJSON (Job args) where
+ parseJSON = genericParseJSON $ aesonPrefix snakeCase
+
+type JobId = String
diff --git a/library/Faktory/Prelude.hs b/library/Faktory/Prelude.hs
new file mode 100644
index 0000000..cd1b394
--- /dev/null
+++ b/library/Faktory/Prelude.hs
@@ -0,0 +1,23 @@
+module Faktory.Prelude
+ ( module X
+ , module Faktory.Prelude
+ )
+where
+
+import Prelude as X
+
+import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay)
+import Control.Exception.Safe as X
+import Control.Monad as X
+import Data.Foldable as X
+import Data.Semigroup as X ((<>))
+import Data.Text as X (Text, pack, unpack)
+import Data.Traversable as X
+
+threadDelaySeconds :: Int -> IO ()
+threadDelaySeconds n = threadDelay $ n * 1000000
+
+forkIOWithThrowToParent :: IO () -> IO ThreadId
+forkIOWithThrowToParent action = do
+ parent <- myThreadId
+ forkIO $ action `X.catchAny` \err -> throwTo parent err
diff --git a/library/Faktory/Protocol.hs b/library/Faktory/Protocol.hs
new file mode 100644
index 0000000..7f4f714
--- /dev/null
+++ b/library/Faktory/Protocol.hs
@@ -0,0 +1,78 @@
+-- | Modified version of @"Database.Redis.Protocol"@
+--
+-- <https://github.com/informatikr/hedis/blob/master/src/Database/Redis/Protocol.hs>
+--
+-- Faktory takes a lot of inspiration from Redis, so the connection and
+-- protocol-related code translated well with minor simplifications.
+--
+module Faktory.Protocol
+ ( readReply
+ , Reply(..)
+ , reply
+ ) where
+
+import Prelude hiding (error)
+
+import Data.ByteString (ByteString)
+import qualified Data.ByteString.Char8 as BS8
+import qualified Data.Text.Encoding as T
+import qualified Data.Text.Read as T
+import Scanner (Scanner)
+import qualified Scanner
+
+data Reply
+ = SingleLine ByteString
+ | Error ByteString
+ | Bulk (Maybe ByteString)
+
+readReply :: IO ByteString -> IO (Either String (Maybe ByteString))
+readReply getMore = fromScanResult <$> Scanner.scanWith getMore reply BS8.empty
+ where
+ fromScanResult = \case
+ Scanner.Fail _ msg -> Left msg
+ Scanner.More _ -> Left "Unexpected More"
+ Scanner.Done _ (SingleLine bs) -> Right $ Just bs
+ Scanner.Done _ (Error bs) -> Left $ BS8.unpack bs
+ Scanner.Done _ (Bulk mByteString) -> Right mByteString
+
+{-# INLINE reply #-}
+reply :: Scanner Reply
+reply = do
+ c <- Scanner.anyChar8
+ case c of
+ '+' -> string
+ '-' -> error
+ '$' -> bulk
+ _ -> fail "Unknown reply type"
+
+{-# INLINE string #-}
+string :: Scanner Reply
+string = SingleLine <$> line
+
+{-# INLINE error #-}
+error :: Scanner Reply
+error = Error <$> line
+
+{-# INLINE bulk #-}
+bulk :: Scanner Reply
+bulk = Bulk <$> do
+ len <- integral
+ if len < 0 then return Nothing else Just <$> Scanner.take len <* eol
+
+{-# INLINE integral #-}
+integral :: Integral i => Scanner i
+integral = do
+ str <- line
+ case T.signed T.decimal (T.decodeUtf8 str) of
+ Left err -> fail (show err)
+ Right (l, _) -> return l
+
+{-# INLINE line #-}
+line :: Scanner ByteString
+line = Scanner.takeWhileChar8 (/= '\r') <* eol
+
+{-# INLINE eol #-}
+eol :: Scanner ()
+eol = do
+ Scanner.char8 '\r'
+ Scanner.char8 '\n'
diff --git a/library/Faktory/Settings.hs b/library/Faktory/Settings.hs
new file mode 100644
index 0000000..04b9a45
--- /dev/null
+++ b/library/Faktory/Settings.hs
@@ -0,0 +1,67 @@
+module Faktory.Settings
+ ( Settings(..)
+ , ConnectionInfo(..)
+ , defaultSettings
+ , envSettings
+ , Queue(..)
+ , queueArg
+ , defaultQueue
+ , WorkerId
+ , randomWorkerId
+ ) where
+
+import Faktory.Prelude
+
+import Data.Aeson
+import Data.ByteString.Lazy (ByteString, fromStrict)
+import Data.String
+import Data.Text.Encoding (encodeUtf8)
+import Faktory.Connection
+import System.Environment (lookupEnv)
+import System.IO (hPutStrLn, stderr)
+import System.Random
+
+data Settings = Settings
+ { settingsQueue :: Queue
+ , settingsConnection :: ConnectionInfo
+ , settingsLogDebug :: String -> IO ()
+ , settingsLogError :: String -> IO ()
+ , settingsWorkerIdleDelay :: Int
+ }
+
+defaultSettings :: Settings
+defaultSettings = Settings
+ { settingsQueue = defaultQueue
+ , settingsConnection = defaultConnectionInfo
+ , settingsLogDebug = \_msg -> pure ()
+ , settingsLogError = hPutStrLn stderr . ("[ERROR]: " <>)
+ , settingsWorkerIdleDelay = 1
+ }
+
+-- | Defaults, but read @'Connection'@ from the environment
+--
+-- See @'envConnection'@
+--
+envSettings :: IO Settings
+envSettings = do
+ mQueue <- lookupEnv "FAKTORY_QUEUE"
+ connection <- envConnectionInfo
+ pure defaultSettings
+ { settingsQueue = maybe defaultQueue (Queue . pack) mQueue
+ , settingsConnection = connection
+ }
+
+newtype Queue = Queue Text
+ deriving newtype (IsString, FromJSON, ToJSON)
+
+queueArg :: Queue -> ByteString
+queueArg (Queue q) = fromStrict $ encodeUtf8 q
+
+defaultQueue :: Queue
+defaultQueue = "default"
+
+newtype WorkerId = WorkerId String
+ deriving newtype (FromJSON, ToJSON)
+
+randomWorkerId :: IO WorkerId
+randomWorkerId = WorkerId . take 8 . randomRs ('a', 'z') <$> newStdGen
diff --git a/library/Faktory/Worker.hs b/library/Faktory/Worker.hs
new file mode 100644
index 0000000..d69dfb8
--- /dev/null
+++ b/library/Faktory/Worker.hs
@@ -0,0 +1,102 @@
+-- | High-level interface for a Worker
+--
+-- Runs forever, @FETCH@-ing Jobs from the given Queue and handing each to your
+-- processing function.
+--
+module Faktory.Worker
+ ( WorkerHalt(..)
+ , runWorker
+ )
+where
+
+import Faktory.Prelude
+
+import Control.Concurrent (killThread)
+import Data.Aeson
+import Data.Aeson.Casing
+import qualified Data.Text as T
+import Faktory.Client
+import Faktory.Job (Job, JobId, jobArg, jobJid)
+import Faktory.Settings
+import GHC.Generics
+import GHC.Stack
+
+-- | If processing functions @'throw'@ this, @'runWorker'@ will exit
+data WorkerHalt = WorkerHalt
+ deriving (Eq, Show, Exception)
+
+newtype BeatPayload = BeatPayload
+ { _bpWid :: WorkerId
+ }
+ deriving Generic
+
+instance ToJSON BeatPayload where
+ toJSON = genericToJSON $ aesonPrefix snakeCase
+ toEncoding = genericToEncoding $ aesonPrefix snakeCase
+
+newtype AckPayload = AckPayload
+ { _apJid :: JobId
+ }
+ deriving Generic
+
+instance ToJSON AckPayload where
+ toJSON = genericToJSON $ aesonPrefix snakeCase
+ toEncoding = genericToEncoding $ aesonPrefix snakeCase
+
+data FailPayload = FailPayload
+ { _fpMessage :: Text
+ , _fpErrtype :: String
+ , _fpJid :: JobId
+ , _fpBacktrace :: [String]
+ }
+ deriving Generic
+
+instance ToJSON FailPayload where
+ toJSON = genericToJSON $ aesonPrefix snakeCase
+ toEncoding = genericToEncoding $ aesonPrefix snakeCase
+
+runWorker :: FromJSON args => Settings -> (args -> IO ()) -> IO ()
+runWorker settings f = do
+ workerId <- randomWorkerId
+ client <- newClient settings $ Just workerId
+ beatThreadId <- forkIOWithThrowToParent $ forever $ heartBeat client workerId
+
+ forever (processorLoop client settings f)
+ `catch` (\(_ex :: WorkerHalt) -> pure ())
+ `finally` (killThread beatThreadId >> closeClient client)
+
+processorLoop :: FromJSON arg => Client -> Settings -> (arg -> IO ()) -> IO ()
+processorLoop client settings f = do
+ let
+ processAndAck job = do
+ f $ jobArg job
+ ackJob client job
+
+ emJob <- fetchJob client $ settingsQueue settings
+
+ case emJob of
+ Left err -> settingsLogError settings $ "Invalid Job: " <> err
+ Right Nothing -> threadDelaySeconds $ settingsWorkerIdleDelay settings
+ Right (Just job) ->
+ processAndAck job
+ `catches` [ Handler $ \(ex :: WorkerHalt) -> throw ex
+ , Handler $ \(ex :: SomeException) ->
+ failJob client job $ T.pack $ show ex
+ ]
+
+-- | <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#heartbeat>
+heartBeat :: Client -> WorkerId -> IO ()
+heartBeat client workerId = do
+ threadDelaySeconds 25
+ command_ client "BEAT" [encode $ BeatPayload workerId]
+
+fetchJob
+ :: FromJSON args => Client -> Queue -> IO (Either String (Maybe (Job args)))
+fetchJob client queue = commandJSON client "FETCH" [queueArg queue]
+
+ackJob :: HasCallStack => Client -> Job args -> IO ()
+ackJob client job = commandOK client "ACK" [encode $ AckPayload $ jobJid job]
+
+failJob :: HasCallStack => Client -> Job args -> Text -> IO ()
+failJob client job message =
+ commandOK client "FAIL" [encode $ FailPayload message "" (jobJid job) []]
diff --git a/tests/Faktory/ConnectionSpec.hs b/tests/Faktory/ConnectionSpec.hs
new file mode 100644
index 0000000..04997fb
--- /dev/null
+++ b/tests/Faktory/ConnectionSpec.hs
@@ -0,0 +1,108 @@
+module Faktory.ConnectionSpec
+ ( spec
+ ) where
+
+import Faktory.Prelude
+
+import Data.List (isInfixOf)
+import Faktory.Connection
+import System.Environment
+import Test.Hspec
+
+spec :: Spec
+spec = do
+ describe "envConnectionInfo" $ do
+ it "returns default settings in an empty environment" $ do
+ let env = [("FAKTORY_PROVIDER", Nothing), ("FAKTORY_URL", Nothing)]
+
+ withEnvironment env $ do
+ connection <- envConnectionInfo
+ connection `shouldBe` defaultConnectionInfo
+
+ it "parses the provided URL" $ do
+ let
+ env =
+ [("FAKTORY_PROVIDER", Nothing), ("FAKTORY_URL", Just "tcp://foo:123")]
+
+ withEnvironment env $ do
+ ConnectionInfo {..} <- envConnectionInfo
+ connectionInfoTls `shouldBe` False
+ connectionInfoPassword `shouldBe` Nothing
+ connectionInfoHostName `shouldBe` "foo"
+ connectionInfoPort `shouldBe` 123
+
+ it "parses tls and password" $ do
+ let
+ env =
+ [ ("FAKTORY_PROVIDER", Nothing)
+ , ("FAKTORY_URL", Just "tcp+tls://:foo@bar:123")
+ ]
+
+ withEnvironment env $ do
+ ConnectionInfo {..} <- envConnectionInfo
+ connectionInfoTls `shouldBe` True
+ connectionInfoPassword `shouldBe` Just "foo"
+ connectionInfoHostName `shouldBe` "bar"
+ connectionInfoPort `shouldBe` 123
+
+ it "follows _PROVIDER to find _URL" $ do
+ let
+ env =
+ [ ("FAKTORY_PROVIDER", Just "OTHER_URL")
+ , ("OTHER_URL", Just "tcp://foo:123")
+ ]
+
+ withEnvironment env $ do
+ ConnectionInfo {..} <- envConnectionInfo
+ connectionInfoHostName `shouldBe` "foo"
+ connectionInfoPort `shouldBe` 123
+
+ it "throws nice errors for invalid PROVIDER" $ do
+ let
+ env =
+ [ ("FAKTORY_PROVIDER", Just "flippity-$flop")
+ , ("FAKTORY_URL", Nothing)
+ ]
+
+ withEnvironment env envConnectionInfo
+ `shouldThrowMessage` "expecting an environment variable name"
+
+ it "throws nice errors for invalid _URL" $ do
+ let
+ env =
+ [ ("FAKTORY_PROVIDER", Nothing)
+ , ("FAKTORY_URL", Just "http://foo:123")
+ ]
+
+ withEnvironment env envConnectionInfo
+ `shouldThrowMessage` "expecting tcp(+tls)://(:<password>@)<host>:<port>"
+
+ it "throws nice errors for missing _PROVIDER" $ do
+ pendingWith "This makes implementation more complicated"
+
+ let
+ env =
+ [("FAKTORY_PROVIDER", Just "MISSING_URL"), ("MISSING_URL", Nothing)]
+
+ withEnvironment env envConnectionInfo `shouldThrowMessage` "..."
+
+-- | Override ENV with the given values, and restore them after
+--
+-- Values are @'Maybe'@ so that @'Nothing'@ can be used to unset values during
+-- override and/or restoration. Note: this is probably not thread-safe.
+--
+withEnvironment :: [(String, Maybe String)] -> IO a -> IO a
+withEnvironment env act = bracket
+ (traverse readAndReset env)
+ (traverse_ readAndReset)
+ (const act)
+ where
+ readAndReset :: (String, Maybe String) -> IO (String, Maybe String)
+ readAndReset (variable, mNewValue) = do
+ mOriginalValue <- lookupEnv variable
+ maybe (unsetEnv variable) (setEnv variable) mNewValue
+ pure (variable, mOriginalValue)
+
+shouldThrowMessage :: IO a -> String -> Expectation
+shouldThrowMessage act msg =
+ act `shouldThrow` \ex -> msg `isInfixOf` show @SomeException ex
diff --git a/tests/FaktorySpec.hs b/tests/FaktorySpec.hs
new file mode 100644
index 0000000..8f80349
--- /dev/null
+++ b/tests/FaktorySpec.hs
@@ -0,0 +1,46 @@
+module FaktorySpec
+ ( spec
+ ) where
+
+import Faktory.Prelude
+
+import Control.Concurrent.MVar
+import Faktory.Client
+import Faktory.Job
+import Faktory.Settings
+import Faktory.Worker
+import Test.Hspec
+
+spec :: Spec
+spec = describe "Faktory" $ do
+ it "can push and process jobs" $ do
+ settings <- envSettings
+ bracket (newClient settings Nothing) closeClient $ \client -> do
+ void $ flush client
+ void $ perform @Text mempty client "a"
+ void $ perform @Text mempty client "b"
+ void $ perform @Text mempty client "HALT"
+
+ processedJobs <- newMVar ([] :: [Text])
+ runWorker settings $ \job -> do
+ modifyMVar_ processedJobs $ pure . (job :)
+ when (job == "HALT") $ throw WorkerHalt
+
+ jobs <- readMVar processedJobs
+ jobs `shouldMatchList` ["a", "b", "HALT"]
+
+ it "can push jobs with optional attributes" $ do
+ settings <- envSettings
+ bracket (newClient settings Nothing) closeClient $ \client -> do
+ void $ flush client
+ void $ perform @Text once client "a"
+ void $ perform @Text (retry 0) client "b"
+ void $ perform @Text mempty client "HALT"
+
+ processedJobs <- newMVar ([] :: [Text])
+ runWorker settings $ \job -> do
+ modifyMVar_ processedJobs $ pure . (job :)
+ when (job == "HALT") $ throw WorkerHalt
+
+ jobs <- readMVar processedJobs
+ jobs `shouldMatchList` ["a", "b", "HALT"]
diff --git a/tests/Spec.hs b/tests/Spec.hs
new file mode 100644
index 0000000..d19954e
--- /dev/null
+++ b/tests/Spec.hs
@@ -0,0 +1,2 @@
+{-# OPTIONS_GHC -Wno-missing-export-lists #-}
+{-# OPTIONS_GHC -F -pgmF hspec-discover #-}