summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYvesPares <>2019-10-09 10:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-10-09 10:16:00 (GMT)
commitb25c49f834fc2777416e25d067a0fea383716f08 (patch)
tree92f4744167d7378cd01a6e5d70402ebfc102a69e
version 0.1.0.0HEAD0.1.0.0master
-rw-r--r--LICENSE21
-rw-r--r--examples/ExampleS3.hs78
-rw-r--r--porcupine-s3.cabal95
-rw-r--r--src/Control/Monad/ReaderSoup/AWS.hs66
-rw-r--r--src/Data/Locations/Accessors/AWS.hs161
-rw-r--r--src/Network/AWS/S3/TaskPipelineUtils.hs190
6 files changed, 611 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..2b55c2c
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Tweag I/O, NovaDiscovery
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/examples/ExampleS3.hs b/examples/ExampleS3.hs
new file mode 100644
index 0000000..65e2bdb
--- /dev/null
+++ b/examples/ExampleS3.hs
@@ -0,0 +1,78 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+
+-- The same example than example1 in 'porcupine-core', but with s3 access
+-- enabled by 'runPipelineTask'. Don't forget to map locations to s3 urls in the
+-- 'porcupine.yaml' generated by calling 'exampleS3 write-config-template', or
+-- else it will act exactly like example1.
+--
+-- Don't forget to enable OverloadedLabels and import Data.Locations.Accessors.AWS
+
+import Data.Aeson
+import Data.DocRecord
+import qualified Data.HashMap.Strict as HM
+import qualified Data.Text as T
+import GHC.Generics
+import Porcupine
+
+import Data.Locations.Accessors.AWS
+
+
+data User = User { userName :: T.Text
+ , userSurname :: T.Text
+ , userAge :: Int }
+ deriving (Generic)
+instance FromJSON User
+
+newtype Analysis = Analysis { numLetters :: HM.HashMap Char Int }
+ deriving (Generic)
+instance ToJSON Analysis
+
+-- | How to load users
+userFile :: DataSource User
+userFile = dataSource ["Inputs", "User"]
+ (somePureDeserial JSONSerial)
+
+-- | How to write analysis
+analysisFile :: DataSink Analysis
+analysisFile = dataSink ["Outputs", "Analysis"]
+ (somePureSerial JSONSerial)
+
+-- | The simple computation we want to perform
+computeAnalysis :: User -> Analysis
+computeAnalysis (User name surname _) = Analysis $
+ HM.fromListWith (+) $ [(c,1) | c <- T.unpack name]
+ ++ [(c,1) | c <- T.unpack surname]
+
+-- | The task combining the three previous operations.
+--
+-- This task may look very opaque from the outside, having no parameters and no
+-- return value. But we will be able to reuse it over different users without
+-- having to change it at all.
+analyseOneUser :: (LogThrow m) => PTask m () ()
+analyseOneUser =
+ loadData userFile >>> arr computeAnalysis >>> writeData analysisFile
+
+mainTask :: (LogThrow m) => PTask m () ()
+mainTask =
+ -- First we get the ids of the users that we want to analyse. We need only one
+ -- field that will contain a range of values, see IndexRange. By default, this
+ -- range contains just one value, zero.
+ getOption ["Settings"] (docField @"users" (oneIndex (0::Int)) "The user ids to load")
+ -- We turn the range we read into a full lazy list:
+ >>> arr enumTRIndices
+ -- Then we just map over these ids and call analyseOneUser each time:
+ >>> parMapTask_ "userId" analyseOneUser
+
+main :: IO ()
+main = runPipelineTask (FullConfig "exampleS3" "porcupine.yaml" "porcupine-core/examples/data" ())
+ ( #aws <-- useAWS Discover
+ -- We just add #aws on top of the
+ -- baseContexts. Credentials will be discovered.
+ :& baseContexts "")
+ mainTask ()
diff --git a/porcupine-s3.cabal b/porcupine-s3.cabal
new file mode 100644
index 0000000..506a642
--- /dev/null
+++ b/porcupine-s3.cabal
@@ -0,0 +1,95 @@
+cabal-version: 1.12
+
+-- This file has been generated from package.yaml by hpack version 0.32.0.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: 91c80b02cb716a8eae5230965f9cb8380ba3465a9009534aad839e9284103774
+
+name: porcupine-s3
+version: 0.1.0.0
+synopsis: A location accessor for porcupine to connect to AWS S3 sources/sinks
+description: Gives a porcupine task pipeline access to AWS S3 objects (read and write). See
+ the README at <https://github.com/tweag/porcupine#README.md> and the examples
+ in the `porcupine-s3` package.
+category: Data, Arrows, Combinators, Control, AWS, Cloud
+homepage: https://github.com/tweag/porcupine#readme
+bug-reports: https://github.com/tweag/porcupine/issues
+maintainer: Yves Parès <yves.pares@tweag.io>
+copyright: 2018 EURL Tweag, NovaDiscovery
+license: MIT
+license-file: LICENSE
+build-type: Simple
+
+source-repository head
+ type: git
+ location: https://github.com/tweag/porcupine
+
+library
+ exposed-modules:
+ Control.Monad.ReaderSoup.AWS
+ Data.Locations.Accessors.AWS
+ Network.AWS.S3.TaskPipelineUtils
+ other-modules:
+ Paths_porcupine_s3
+ hs-source-dirs:
+ src
+ ghc-options: -Wall
+ build-depends:
+ amazonka
+ , amazonka-core
+ , amazonka-s3
+ , base >=4.10 && <5
+ , bytestring
+ , conduit
+ , conduit-extra
+ , directory
+ , filepath
+ , katip
+ , lens
+ , monad-control
+ , mtl
+ , porcupine-core ==0.1.*
+ , reader-soup ==0.1.*
+ , resourcet
+ , retry
+ , safe-exceptions
+ , streaming
+ , streaming-bytestring
+ , text
+ default-language: Haskell2010
+
+executable exampleS3
+ main-is: ExampleS3.hs
+ other-modules:
+ Paths_porcupine_s3
+ hs-source-dirs:
+ examples
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , amazonka
+ , amazonka-core
+ , amazonka-s3
+ , base >=4.10 && <5
+ , bytestring
+ , conduit
+ , conduit-extra
+ , directory
+ , docrecords
+ , filepath
+ , katip
+ , lens
+ , monad-control
+ , mtl
+ , porcupine-core
+ , porcupine-s3
+ , reader-soup ==0.1.*
+ , resourcet
+ , retry
+ , safe-exceptions
+ , streaming
+ , streaming-bytestring
+ , text
+ , unordered-containers
+ default-language: Haskell2010
diff --git a/src/Control/Monad/ReaderSoup/AWS.hs b/src/Control/Monad/ReaderSoup/AWS.hs
new file mode 100644
index 0000000..8dac6b6
--- /dev/null
+++ b/src/Control/Monad/ReaderSoup/AWS.hs
@@ -0,0 +1,66 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+{-# LANGUAGE UndecidableInstances #-}
+{-# OPTIONS_GHC "-fno-warn-orphans" #-}
+
+module Control.Monad.ReaderSoup.AWS
+ ( Credentials(..)
+ , Region(..)
+ , useAWS
+ , useAWSRegion
+ ) where
+
+import Control.Exception.Safe
+import Control.Lens
+import Control.Monad.Reader
+import Control.Monad.ReaderSoup
+import Control.Monad.ReaderSoup.Resource ()
+import Control.Monad.Trans.AWS
+import Katip
+import Network.AWS
+import Network.AWS.Auth (AuthError)
+
+
+type instance ContextFromName "aws" = Env
+
+instance SoupContext Env AWST where
+ toReaderT act = ReaderT $ \env -> runAWST env act
+ fromReaderT (ReaderT act) = ask >>= lift . act
+
+-- | Tries to get the credentials, but uses dummy credentials if they haven't
+-- been found.
+getEnv :: (MonadCatch m, KatipContext m) => Credentials -> m Env
+getEnv creds = katipAddNamespace "awsContext" $ do
+ env <- try $ newEnv creds
+ case env of
+ Right x -> return x
+ Left (e :: AuthError) -> do
+ logFM DebugS $ logStr $
+ "Using dummy credentials, because there was an error when trying to get the credentials: "
+ ++ displayException e
+ newEnv (FromKeys "foo" "bar")
+
+-- | See 'Credentials' documentation to know how to
+useAWS :: (MonadIO m, MonadCatch m, KatipContext m) => Credentials -> ContextRunner AWST m
+useAWS creds = ContextRunner $ \act -> do
+ env <- getEnv creds
+ runAWST env act
+
+-- | Like 'useAWS', but you set the default 'Region'
+useAWSRegion :: (MonadIO m, MonadCatch m, KatipContext m) => Credentials -> Region -> ContextRunner AWST m
+useAWSRegion creds region = ContextRunner $ \act -> do
+ env <- getEnv creds
+ let env' = env & envRegion .~ region
+ runAWST env' act
+
+instance (IsInSoup_ r ctxs "aws", IsInSoup_ r ctxs "resource") => MonadAWS (ReaderSoup_ r ctxs) where
+ liftAWS act =
+ scooping #aws $
+ hoist (picking #resource) act
diff --git a/src/Data/Locations/Accessors/AWS.hs b/src/Data/Locations/Accessors/AWS.hs
new file mode 100644
index 0000000..cb72c16
--- /dev/null
+++ b/src/Data/Locations/Accessors/AWS.hs
@@ -0,0 +1,161 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveTraversable #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC "-fno-warn-orphans" #-}
+{-# OPTIONS_GHC "-fno-warn-name-shadowing" #-}
+
+module Data.Locations.Accessors.AWS
+ ( module Control.Monad.ReaderSoup.AWS
+ , runPipelineTaskS3
+ -- * Backward-compat API
+ , selectRun
+ , runWriteLazyByte
+ , runReadLazyByte
+ , runReadLazyByte_
+ ) where
+
+import Control.Exception.Safe
+import Control.Lens
+import Control.Monad.ReaderSoup
+import Control.Monad.ReaderSoup.AWS
+import Control.Monad.ReaderSoup.Katip ()
+import Control.Monad.Trans.Resource
+import qualified Data.ByteString.Lazy as LBS
+import qualified Data.ByteString.Streaming as BSS
+import Data.Locations.Accessors
+import Data.Locations.Loc
+import Data.String
+import Network.AWS hiding (Error)
+import Network.AWS.S3
+import qualified Network.AWS.S3.TaskPipelineUtils as S3
+import System.TaskPipeline.CLI
+import System.TaskPipeline.PTask
+import System.TaskPipeline.Run
+
+
+-- | Just a compatiblity overlay for code explicitly dealing with S3 URLs
+pattern S3Obj :: String -> PathWithExtension a -> URL a
+pattern S3Obj{bucketName,objectName} = RemoteFile "s3" bucketName Nothing objectName []
+
+-- | Accessing resources on S3
+instance (MonadAWS m, MonadMask m, MonadResource m)
+ => LocationAccessor m "aws" where
+ newtype GLocOf "aws" a = S (URL a)
+ deriving (Functor, Foldable, Traversable, ToJSON, TypedLocation)
+ locExists _ = return True -- TODO: Implement it
+ writeBSS (S l) = writeBSS_S3 l
+ readBSS (S l) f = readBSS_S3 l f
+ copy (S l1) (S l2) = copy_S3 l1 l2
+
+instance (MonadAWS m, MonadMask m, MonadResource m)
+ => MayProvideLocationAccessors m "aws"
+
+instance (IsLocString a) => Show (GLocOf "aws" a) where
+ show (S l) = show l
+
+instance (IsLocString a) => FromJSON (GLocOf "aws" a) where
+ parseJSON v = do
+ loc <- parseJSON v
+ case loc of
+ S3Obj{} -> return $ S loc
+ _ -> fail "Doesn't use 's3' protocol"
+
+writeBSS_S3 :: MonadAWS m => Loc -> BSS.ByteString m a -> m a
+writeBSS_S3 S3Obj { bucketName, objectName } body = do
+ let raw = objectName ^. pathWithExtensionAsRawFilePath
+ (res, r) <- S3.uploadObj (fromString bucketName) (fromString raw) body
+ case res ^. porsResponseStatus of
+ 200 -> pure ()
+ _ -> error $ "Unable to upload to the object " ++ raw ++ "."
+ return r
+writeBSS_S3 _ _ = undefined
+
+readBSS_S3
+ :: (MonadAWS m)
+ => Loc
+ -> (BSS.ByteString m () -> m b)
+ -> m b
+readBSS_S3 S3Obj{ bucketName, objectName } k = do
+ r <- S3.streamObjInto
+ (fromString bucketName)
+ (fromString $ objectName ^. pathWithExtensionAsRawFilePath)
+ k
+ case r of
+ Left e -> throw e
+ Right r -> return r
+readBSS_S3 _ _ = undefined
+
+copy_S3
+ :: (MonadResource m, MonadAWS m)
+ => Loc
+ -> Loc
+ -> m ()
+copy_S3 locFrom@(S3Obj bucket1 obj1) locTo@(S3Obj bucket2 obj2)
+ | bucket1 == bucket2 = do
+ _ <- S3.copyObj
+ (fromString bucket1)
+ (fromString $ obj1^.pathWithExtensionAsRawFilePath)
+ (fromString $ obj2^.pathWithExtensionAsRawFilePath)
+ return ()
+ | otherwise = readBSS_S3 locFrom (writeBSS_S3 locTo)
+copy_S3 _ _ = undefined
+
+-- | Just a shortcut for when you want ONLY local files and S3 support, with AWS
+-- credentials discovery. Use 'runPipelineTask' if you want to activate other
+-- location accessors.
+runPipelineTaskS3
+ :: PipelineConfigMethod o -- ^ How to configure the pipeline
+ -> Maybe Region -- ^ Change the default AWS region
+ -> PTask (ReaderSoup (("aws":::ContextFromName "aws") : BasePorcupineContexts)) () o
+ -> IO o
+runPipelineTaskS3 pcm mbRegion ptask =
+ runPipelineTask pcm ( #aws <-- case mbRegion of
+ Nothing -> useAWS Discover
+ Just reg -> useAWSRegion Discover reg
+ :& baseContexts (pcm ^. pipelineConfigMethodProgName) ) ptask ()
+
+
+-- DEPRECATED CODE:
+
+-- * Automatically switching from Resource to AWS monad when accessing some loc
+
+-- | Run a computation or a sequence of computations that will access some
+-- locations. Selects whether to run in IO or AWS based on some Loc used as
+-- selector.
+selectRun :: Loc -- ^ A Loc to access
+ -> (forall m l. (LocationAccessor m l) => LocOf l -> m a)
+ -- ^ The action to run, either in AWS or IO
+ -> IO a
+selectRun loc f =
+ case loc of
+ LocalFile{} -> do
+ let accessorsRec = baseContexts "selectRun_Local"
+ (_,argsRec) = splitAccessorsFromArgRec accessorsRec
+ consumeSoup argsRec $ f (L loc)
+ S3Obj{} -> do
+ let accessorsRec = #aws <-- useAWS Discover
+ :& baseContexts "selectRun_AWS"
+ (_,argsRec) = splitAccessorsFromArgRec accessorsRec
+ consumeSoup argsRec $ f (S loc)
+ _ -> error "selectRun only handles local and S3 locations"
+
+-- | Just a shortcut
+runWriteLazyByte
+ :: Loc
+ -> LBS.ByteString
+ -> IO ()
+runWriteLazyByte l bs = selectRun l $ \l' -> writeLazyByte l' bs
+
+-- | Just a shortcut
+runReadLazyByte, runReadLazyByte_ :: Loc -> IO LBS.ByteString
+runReadLazyByte l = selectRun l readLazyByte
+runReadLazyByte_ = runReadLazyByte
diff --git a/src/Network/AWS/S3/TaskPipelineUtils.hs b/src/Network/AWS/S3/TaskPipelineUtils.hs
new file mode 100644
index 0000000..1048b2c
--- /dev/null
+++ b/src/Network/AWS/S3/TaskPipelineUtils.hs
@@ -0,0 +1,190 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# OPTIONS_GHC "-fno-warn-orphans" #-}
+
+module Network.AWS.S3.TaskPipelineUtils
+ ( runAll
+ , getEnv
+ , uploadObj
+ , uploadFolder
+ , streamS3Folder
+ , streamObjInto
+ , streamObjIntoExt
+ , downloadFolder
+ , copyObj
+ )
+where
+
+import Control.Exception.Safe
+import Control.Lens hiding ((:>))
+import Control.Monad (when)
+import Control.Monad.Trans.Resource
+import Control.Retry (RetryPolicyM (..), limitRetries,
+ retrying, rsIterNumber)
+import qualified Data.ByteString.Streaming as BSS
+import Data.Conduit.Binary (sinkLbs)
+import Data.String
+import Data.Text (Text)
+import qualified Data.Text as T
+import Network.AWS
+import Network.AWS.Auth (AuthError)
+import Network.AWS.Env (Env, HasEnv, environment)
+import Network.AWS.S3
+import qualified Network.AWS.S3.ListObjects as LO
+import qualified Streaming.Prelude as S
+import Streaming.TaskPipelineUtils as S
+import System.Directory (createDirectoryIfMissing)
+import System.FilePath (normalise, takeDirectory, (</>))
+
+
+runAll :: AWS b -> IO b
+runAll f = do
+ env <- getEnv True
+ runResourceT $ runAWS env f
+
+-- These instances may overlap in theory, but in practice there is probably no
+-- good reason to have two AWS.Envs in the same program, so only one side
+-- should have one
+instance {-# OVERLAPPABLE #-} HasEnv a => HasEnv (a `With` b) where environment = elt.environment
+instance HasEnv (a `With` Env)
+ where environment = ann.environment
+
+getEnv :: Bool -- ^ Verbose
+ -> IO Env
+getEnv verbose = do
+ -- Reads env vars AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
+ env <-
+ catch (newEnv Discover) (handleException :: AuthError -> IO Env)
+ pure $ env & envRegion .~ Frankfurt
+ where
+ handleException e = do
+ when verbose $ do
+ let err = show e
+ putStrLn $ "Warning: couldn't get aws credentials, got " ++ err
+ putStrLn $ "Dummy credentials will be used instead, " ++
+ "so anything trying to access S3 " ++
+ "or any other amazon service will probably fail"
+ newEnv (FromKeys "foo" "bar")
+
+uploadObj :: (MonadAWS m)
+ => BucketName
+ -> ObjectKey
+ -> BSS.ByteString m a
+ -> m (PutObjectResponse, a)
+uploadObj buck object source = do
+ (bs :> r) <- BSS.toStrict source
+ por <- send $ putObject buck object $ toBody bs
+ return (por, r)
+
+copyObj ::
+ (MonadAWS m)
+ => BucketName
+ -> Text
+ -> ObjectKey
+ -> m CopyObjectResponse
+copyObj buck objFrom objTo = send $ copyObject buck objFrom objTo
+
+-- | Upload a whole folder to an s3 bucket
+uploadFolder :: (MonadAWS m, MonadResource m)
+ => FilePath -- ^ Local folder to copy
+ -> BucketName -- ^ Bucket to copy to
+ -> FilePath -- ^ Remote path to copy the content of the folder in
+ -> m ()
+uploadFolder srcFolder destBucket destPath =
+ streamFolderRel srcFolder
+ & S.mapM_ (\f -> do
+ let
+ objectName = destPath </> f
+ (crs,_) <- uploadObj destBucket (fromString objectName) $ BSS.readFile f
+ liftIO $ putStrLn $
+ if view porsResponseStatus crs == 200
+ then objectName ++ " uploaded."
+ else objectName ++ " upload failed.")
+
+streamS3Folder ::
+ MonadAWS m => BucketName -> Maybe FilePath -> Stream (Of FilePath) m ()
+streamS3Folder bucketName prefix = do
+ let listCommand = LO.listObjects bucketName
+ & LO.loPrefix .~ ((fromString . normalise) <$> prefix)
+ rs <- lift $ liftAWS $ send listCommand
+ view LO.lorsContents rs
+ & S.each
+ & S.map (view oKey)
+ & S.map (\(ObjectKey k) -> T.unpack k)
+
+downloadFolder :: (MonadAWS m, MonadResource m)
+ => BucketName
+ -> Maybe FilePath -- ^ The folder to download
+ -> FilePath -- ^ The path in which to save the download
+ -> m ()
+downloadFolder srcBuck srcPath dest =
+ streamS3Folder
+ srcBuck
+ srcPath
+ & S.mapM_ (\f -> do
+ let outFile = dest </> f
+ liftIO $ createDirectoryIfMissing True $ takeDirectory outFile
+ streamObjIntoExt srcBuck (fromString f) $ BSS.writeFile outFile)
+
+streamObjInto :: (MonadAWS m)
+ => BucketName
+ -> ObjectKey
+ -> (BSS.ByteString m () -> m b)
+ -> m (Either SomeException b)
+streamObjInto srcBuck srcObj f = retry (_svcRetry s3) . try $ do
+ let g = getObject srcBuck srcObj
+ rs <- send g
+ resultingBS <- view gorsBody rs `sinkBody` sinkLbs
+ f (BSS.fromLazy resultingBS)
+
+-- |
+-- Retries the given action until it succeeds or the maximum attemps has been
+-- reached.
+--
+-- Amazonka has an automatic retry mechanism, except for streaming transfers,
+-- and 'getObject' is streamed (so it doesn't have it).
+-- This means that we have to implement our own retry mechanism, which is
+-- a gross copy-paste of amazonka's internal mechanism.
+--
+-- Reference:
+-- https://github.com/brendanhay/amazonka/blob/248f7b2a7248222cc21cef6194cd1872ba99ac5d/amazonka/src/Network/AWS/Internal/HTTP.hs#L180-L189
+retry :: MonadIO m => Retry -> m (Either e a) -> m (Either e a)
+retry awsRetry action =
+ let
+ retryPolicy =
+ let
+ Exponential {..} = awsRetry
+ delay (rsIterNumber -> n)
+ | n >= 0 = Just $ truncate (grow n * 1000000)
+ | otherwise = Nothing
+ grow n = _retryBase * (fromIntegral _retryGrowth ^^ (n - 1))
+ in
+ limitRetries _retryAttempts <> RetryPolicyM (return . delay)
+ shouldRetry _ result =
+ case result of
+ Right _ -> pure False
+ Left _ -> pure True
+ in
+ retrying retryPolicy shouldRetry (const action)
+
+
+streamObjIntoExt :: (MonadAWS m)
+ => BucketName
+ -> ObjectKey
+ -> (BSS.ByteString m () -> m b)
+ -> m b
+streamObjIntoExt srcBuck srcObj f = do
+ streamResult <- streamObjInto srcBuck srcObj f
+ case streamResult of
+ Right x -> do
+ liftIO $ putStrLn $ show srcObj ++ " downloaded."
+ pure x
+ Left err -> do
+ liftIO $ putStrLn $ show srcObj ++ " download failed: " ++ show err
+ f mempty