summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoralexeyraga <>2017-12-07 02:01:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2017-12-07 02:01:00 (GMT)
commitfe16d44039ffcdbb77c25a67b9dbeeea064234b6 (patch)
tree07e5ce3de058f64b17fe96280753b6f6e6de97a0
parent13b1df23dfda6539d58ee4e4776763274a44f8ed (diff)
version 2.2.02.2.0
-rw-r--r--example/ProducerExample.hs35
-rw-r--r--hw-kafka-client.cabal4
-rw-r--r--src/Kafka/Internal/RdKafka.chs9
-rw-r--r--src/Kafka/Producer.hs15
-rw-r--r--src/Kafka/Producer/Callbacks.hs28
-rw-r--r--src/Kafka/Producer/ProducerProperties.hs14
6 files changed, 80 insertions, 25 deletions
diff --git a/example/ProducerExample.hs b/example/ProducerExample.hs
index f343bfc..a8586f4 100644
--- a/example/ProducerExample.hs
+++ b/example/ProducerExample.hs
@@ -3,14 +3,17 @@
module ProducerExample
where
-import Control.Monad (forM_)
-import Data.ByteString (ByteString)
+import Control.Monad (forM_)
+import Data.ByteString (ByteString)
+import Data.ByteString.Char8 (pack)
import Data.Monoid
import Kafka.Producer
-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
+ <> sendTimeout (Timeout 10000)
+ <> setCallback (deliveryErrorsCallback print)
<> logLevel KafkaLogDebug
-- Topic to send messages to
@@ -33,17 +36,29 @@ runProducerExample = do
sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
- err1 <- produceMessage prod (mkMessage Nothing (Just "test from producer") )
+ putStrLn "Producer is ready, send your messages!"
+ msg1 <- getLine
+
+ err1 <- produceMessage prod (mkMessage Nothing (Just $ pack msg1))
forM_ err1 print
- err2 <- produceMessage prod (mkMessage (Just "key") (Just "test from producer (with key)"))
+ putStrLn "One more time!"
+ msg2 <- getLine
+
+ err2 <- produceMessage prod (mkMessage (Just "key") (Just $ pack msg2))
forM_ err2 print
- errs <- produceMessageBatch prod
- [ mkMessage (Just "b-1") (Just "batch-1")
- , mkMessage (Just "b-2") (Just "batch-2")
- , mkMessage Nothing (Just "batch-3")
- ]
+ putStrLn "And the last one..."
+ msg3 <- getLine
+ err3 <- produceMessage prod (mkMessage (Just "key3") (Just $ pack msg3))
+
+ -- errs <- produceMessageBatch prod
+ -- [ mkMessage (Just "b-1") (Just "batch-1")
+ -- , mkMessage (Just "b-2") (Just "batch-2")
+ -- , mkMessage Nothing (Just "batch-3")
+ -- ]
+
+ -- forM_ errs (print . snd)
- forM_ errs (print . snd)
+ putStrLn "Thank you."
return $ Right ()
diff --git a/hw-kafka-client.cabal b/hw-kafka-client.cabal
index 7b6ac8c..ed930e7 100644
--- a/hw-kafka-client.cabal
+++ b/hw-kafka-client.cabal
@@ -1,5 +1,5 @@
name: hw-kafka-client
-version: 2.1.3
+version: 2.2.0
homepage: https://github.com/haskell-works/hw-kafka-client
bug-reports: https://github.com/haskell-works/hw-kafka-client/issues
license: MIT
@@ -46,6 +46,7 @@ executable kafka-client-example
, transformers
, unix
, hw-kafka-client
+ ghc-options: -threaded -rtsopts
if flag(examples)
buildable: True
@@ -84,6 +85,7 @@ library
Kafka.Internal.Setup
Kafka.Internal.Shared
Kafka.Producer.Convert
+ Kafka.Producer.Callbacks
hs-source-dirs: src
default-language: Haskell2010
ghc-options: -Wall
diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs
index 2bf290a..0ca305c 100644
--- a/src/Kafka/Internal/RdKafka.chs
+++ b/src/Kafka/Internal/RdKafka.chs
@@ -296,17 +296,18 @@ rdKafkaConfSetRebalanceCb conf cb = do
return ()
---- Delivery Callback
-type DeliveryCallback = Ptr RdKafkaT -> Ptr RdKafkaMessageT -> Word8Ptr -> IO ()
+type DeliveryCallback' = Ptr RdKafkaT -> Ptr RdKafkaMessageT -> Word8Ptr -> IO ()
+type DeliveryCallback = Ptr RdKafkaT -> Ptr RdKafkaMessageT -> IO ()
foreign import ccall safe "wrapper"
- mkDeliveryCallback :: DeliveryCallback -> IO (FunPtr DeliveryCallback)
+ mkDeliveryCallback :: DeliveryCallback' -> IO (FunPtr DeliveryCallback')
foreign import ccall safe "rd_kafka.h rd_kafka_conf_set_dr_msg_cb"
- rdKafkaConfSetDrMsgCb' :: Ptr RdKafkaConfT -> FunPtr DeliveryCallback -> IO ()
+ rdKafkaConfSetDrMsgCb' :: Ptr RdKafkaConfT -> FunPtr DeliveryCallback' -> IO ()
rdKafkaConfSetDrMsgCb :: RdKafkaConfTPtr -> DeliveryCallback -> IO ()
rdKafkaConfSetDrMsgCb conf cb = do
- cb' <- mkDeliveryCallback cb
+ cb' <- mkDeliveryCallback (\k m _ -> cb k m)
withForeignPtr conf $ \c -> rdKafkaConfSetDrMsgCb' c cb'
return ()
diff --git a/src/Kafka/Producer.hs b/src/Kafka/Producer.hs
index d40ca46..e97e467 100644
--- a/src/Kafka/Producer.hs
+++ b/src/Kafka/Producer.hs
@@ -26,6 +26,7 @@ import Kafka.Internal.CancellationToken as CToken
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
+import Kafka.Producer.Callbacks
import Kafka.Producer.Convert
import Kafka.Producer.Types
@@ -55,7 +56,7 @@ runProducer props f =
-- A newly created producer must be closed with 'closeProducer' function.
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer pps = liftIO $ do
- kc@(KafkaConf kc' ct) <- kafkaConf (KafkaProps $ M.toList (ppKafkaProps pps))
+ kc@(KafkaConf kc' _) <- kafkaConf (KafkaProps $ M.toList (ppKafkaProps pps))
tc <- topicConf (TopicProps $ M.toList (ppTopicProps pps))
-- set callbacks
@@ -67,7 +68,7 @@ newProducer pps = liftIO $ do
Right kafka -> do
forM_ (ppLogLevel pps) (rdKafkaSetLogLevel kafka . fromEnum)
let prod = KafkaProducer (Kafka kafka) kc tc
- runEventLoop prod ct (Just $ Timeout 100) >> return (Right prod)
+ return (Right prod)
-- | Sends a single message.
-- Since librdkafka is backed by a queue, this function can return before messages are sent. See
@@ -76,7 +77,8 @@ produceMessage :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> m (Maybe KafkaError)
-produceMessage (KafkaProducer (Kafka k) _ (TopicConf tc)) m = liftIO $
+produceMessage kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) m = liftIO $ do
+ pollEvents kp (Just $ Timeout 0) -- fire callbacks if any exist (handle delivery reports)
bracket (mkTopic $ prTopic m) clTopic withTopic
where
mkTopic (TopicName tn) = newUnmanagedRdKafkaTopicT k tn (Just tc)
@@ -103,7 +105,8 @@ produceMessageBatch :: MonadIO m
-> m [(ProducerRecord, KafkaError)]
-- ^ An empty list when the operation is successful,
-- otherwise a list of "failed" messages with corresponsing errors.
-produceMessageBatch (KafkaProducer (Kafka k) _ (TopicConf tc)) messages = liftIO $
+produceMessageBatch kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) messages = liftIO $ do
+ pollEvents kp (Just $ Timeout 0) -- fire callbacks if any exist (handle delivery reports)
concat <$> forM (mkBatches messages) sendBatch
where
mkSortKey = prTopic &&& prPartition
@@ -158,7 +161,9 @@ flushProducer :: MonadIO m => KafkaProducer -> m ()
flushProducer kp = liftIO $ do
pollEvents kp (Just $ Timeout 100)
l <- outboundQueueLength (kpKafkaPtr kp)
- unless (l == 0) $ flushProducer kp
+ if (l == 0)
+ then pollEvents kp (Just $ Timeout 0) -- to be sure that all the delivery reports are fired
+ else flushProducer kp
------------------------------------------------------------------------------------
diff --git a/src/Kafka/Producer/Callbacks.hs b/src/Kafka/Producer/Callbacks.hs
new file mode 100644
index 0000000..3f53387
--- /dev/null
+++ b/src/Kafka/Producer/Callbacks.hs
@@ -0,0 +1,28 @@
+module Kafka.Producer.Callbacks
+( deliveryErrorsCallback
+, module X
+)
+where
+
+import Foreign
+import Foreign.C.Error
+import Kafka.Callbacks as X
+import Kafka.Internal.RdKafka
+import Kafka.Internal.Setup
+import Kafka.Internal.Shared
+import Kafka.Types
+
+-- | Sets the callback for delivery errors.
+-- The callback is only called in case of errors.
+deliveryErrorsCallback :: (KafkaError -> IO ()) -> KafkaConf -> IO ()
+deliveryErrorsCallback callback (KafkaConf conf _) = rdKafkaConfSetDrMsgCb conf realCb
+ where
+ realCb :: t -> Ptr RdKafkaMessageT -> IO ()
+ realCb _ mptr =
+ if mptr == nullPtr
+ then getErrno >>= (callback . kafkaRespErr)
+ else do
+ s <- peek mptr
+ if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
+ then (callback . KafkaResponseError $ err'RdKafkaMessageT s)
+ else pure ()
diff --git a/src/Kafka/Producer/ProducerProperties.hs b/src/Kafka/Producer/ProducerProperties.hs
index d152174..9a57a94 100644
--- a/src/Kafka/Producer/ProducerProperties.hs
+++ b/src/Kafka/Producer/ProducerProperties.hs
@@ -1,15 +1,15 @@
module Kafka.Producer.ProducerProperties
( module Kafka.Producer.ProducerProperties
-, module Kafka.Callbacks
+, module Kafka.Producer.Callbacks
)
where
import Control.Monad
-import qualified Data.List as L
-import Data.Map (Map)
-import qualified Data.Map as M
-import Kafka.Callbacks
+import qualified Data.List as L
+import Data.Map (Map)
+import qualified Data.Map as M
import Kafka.Internal.Setup
+import Kafka.Producer.Callbacks
import Kafka.Types
-- | Properties to create 'KafkaProducer'.
@@ -54,6 +54,10 @@ topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression c =
extraTopicProps $ M.singleton "compression.codec" (kafkaCompressionCodecToString c)
+sendTimeout :: Timeout -> ProducerProperties
+sendTimeout (Timeout t) =
+ extraTopicProps $ M.singleton "message.timeout.ms" (show t)
+
-- | Any configuration options that are supported by /librdkafka/.
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md here>
extraProps :: Map String String -> ProducerProperties