summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluispedro <>2019-07-11 06:39:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-07-11 06:39:00 (GMT)
commitb4b22fe2b2edbaf9f2ec9d8b6a65c0ee99a27893 (patch)
tree2fd464a105dc386622bd2e7036cb0a8f7747f4fb
version 0.1.0HEAD0.1.0master
-rwxr-xr-xAlgorithms/OutSort.hs186
-rw-r--r--Algorithms/SortMain.hs164
-rw-r--r--COPYING25
-rw-r--r--ChangeLog2
-rw-r--r--README.md30
-rwxr-xr-xSortLines.hs32
-rw-r--r--outsort.cabal56
7 files changed, 495 insertions, 0 deletions
diff --git a/Algorithms/OutSort.hs b/Algorithms/OutSort.hs
new file mode 100755
index 0000000..292e188
--- /dev/null
+++ b/Algorithms/OutSort.hs
@@ -0,0 +1,186 @@
+#!/usr/bin/env stack
+-- stack script --resolver lts-9.2 --optimize
+{-# LANGUAGE LambdaCase, OverloadedStrings, FlexibleContexts, BangPatterns, PackageImports, ScopedTypeVariables, RankNTypes #-}
+
+module Algorithms.OutSort
+ ( outsort
+ , outsortStorable
+ , isolateBySize
+ , RIO
+ ) where
+
+import "temporary" System.IO.Temp (withSystemTempDirectory)
+
+import Control.Monad
+import Control.Monad.IO.Class
+import qualified Data.Vector as V
+import qualified Control.Concurrent.Async as A
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Unsafe as BU
+import qualified Data.ByteString.Internal as BI
+import qualified Data.Vector.Storable as VS
+import qualified Data.Vector.Storable.Mutable as VSM
+import Data.Vector.Algorithms.Intro (sortByBounds, selectByBounds)
+import qualified Data.Vector.Generic.Mutable as VGM
+import qualified Data.Conduit.Combinators as CC
+import qualified Data.Conduit.Binary as CB
+import qualified Data.Conduit as C
+
+import Control.Monad.Primitive
+import Data.Conduit ((.|))
+import Data.Conduit.Algorithms.Utils (awaitJust)
+import Data.Conduit.Algorithms.Async
+import Data.Conduit.Algorithms
+import System.FilePath
+import Control.Monad.Trans.Resource
+import Foreign
+import Foreign.Marshal.Utils (copyBytes)
+import System.IO.SafeWrite (withOutputFile)
+
+type RIO = ResourceT IO
+
+outsort :: forall a b. Ord a =>
+ C.ConduitT B.ByteString a RIO () -- ^ decoder
+ -> C.ConduitT a B.ByteString RIO () -- ^ encoder
+ -> C.ConduitT a a RIO () -- ^ isolate a block
+ -> C.ConduitT () B.ByteString RIO () -- ^ initial input
+ -> C.ConduitT B.ByteString C.Void RIO b -- ^ final output
+ -> IO b
+outsort reader writer chunk input output= withSystemTempDirectory "sort" $ \tdir -> do
+ fs <- C.runConduitRes $
+ input
+ .| reader
+ .| partials
+ .| writePartials tdir (\fp vs ->
+ C.runConduitRes $
+ yieldV vs
+ .| writer
+ .| CB.sinkFileCautious fp)
+ C.runConduitRes $
+ mergeC [CB.sourceFile f .| reader | f <- fs]
+ .| writer
+ .| output
+ where
+ partials = do
+ vs <- chunk .| CC.sinkVector
+ unless (V.null vs) $ do
+ vs' <- liftIO $ V.unsafeThaw vs
+ liftIO $ sortParallel vs'
+ C.yield vs'
+ partials
+
+yieldV :: forall a b v. VGM.MVector v a => v (PrimState IO) a -> C.ConduitT b a RIO ()
+yieldV v =
+ forM_ [0 .. VGM.length v - 1] $ \ix -> do
+ (liftIO $ VGM.read v ix) >>= C.yield
+
+writePartials :: forall a v. VGM.MVector v a => FilePath -> (FilePath -> (v (PrimState IO) a) -> IO ()) -> C.ConduitT (v (PrimState IO) a) C.Void RIO [FilePath]
+writePartials tdir writer = do
+ empty <- liftIO $ A.async (return ())
+ writePartials' (0 :: Int) [] empty
+ where
+
+ writePartials' :: Int -> [FilePath] -> A.Async () -> C.ConduitT (v (PrimState IO) a) C.Void RIO [FilePath]
+ writePartials' n fs prev = do
+ next <- C.await
+ liftIO $ A.wait prev
+ case next of
+ Nothing -> return fs
+ Just vs -> do
+ let tnext = tdir </> show n <.> "temp"
+ wt <- liftIO . A.async $ writer tnext vs
+ writePartials' (n+1) (tnext:fs) wt
+
+outsortStorable :: forall a. (Ord a, Storable a, Show a) =>
+ a -- ^ dummy element, necessary for specifying type, can be undefined
+ -> Int -- ^ number of elements per block
+ -> C.ConduitT () B.ByteString RIO () -- ^ initial input
+ -> C.ConduitT B.ByteString C.Void RIO () -- ^ final output
+ -> IO ()
+outsortStorable _ nsize input output = withSystemTempDirectory "sort" $ \tdir -> do
+ fs <- C.runConduitRes $
+ input
+ .| partialsStorable
+ .| writePartials tdir writeStorable
+ case fs of
+ [f] -> C.runConduitRes $
+ CB.sourceFile f .| output
+ _ -> do
+ C.runConduitRes $
+ mergeC [CB.sourceFile f .| readStorable | f <- fs]
+ .| CC.conduitVector 256
+ .| asyncMapC 16 encodeV
+ .| output
+ where
+ writeStorable fp vs =
+ withOutputFile fp $ \h ->
+ VSM.unsafeWith vs $ \p ->
+ B.hPut h =<< (BU.unsafePackCStringFinalizer (castPtr p) (nBytes * VSM.length vs) (return ()))
+
+ nBytes :: Int
+ nBytes = sizeOf (undefined :: a)
+
+ encodeV :: VS.Vector a -> B.ByteString
+ encodeV v =
+ BI.unsafeCreate (nBytes * VS.length v) $ \pd ->
+ VS.unsafeWith v $ \ps ->
+ copyBytes pd (castPtr ps) (nBytes * VS.length v)
+
+ readStorable :: C.ConduitT B.ByteString a RIO ()
+ readStorable = do
+ vs <- decodeN 64
+ unless (VSM.null vs) $ do
+ yieldV vs
+ readStorable
+ decodeN :: Int -> C.ConduitT B.ByteString b RIO (VSM.MVector (PrimState IO) a)
+ decodeN size = do
+ vec <- liftIO (VSM.unsafeNew size)
+ written <- CC.takeE (nBytes * size)
+ .| decodeN' (VSM.unsafeCast vec) 0
+ return $! if written == nBytes * nsize
+ then vec
+ else VSM.slice 0 (written `div` nBytes) vec
+ where
+ decodeN' :: (VSM.MVector (PrimState IO) Word8) -> Int -> C.ConduitT B.ByteString b RIO Int
+ decodeN' dest ix = C.await >>= \case
+ Nothing -> return ix
+ Just block -> do
+ liftIO (VSM.unsafeWith dest $ \pd ->
+ BU.unsafeUseAsCString block $ \ps ->
+ copyBytes (pd `plusPtr` ix) ps (B.length block))
+ decodeN' dest (ix + B.length block)
+ partialsStorable :: C.ConduitT B.ByteString (VSM.MVector (PrimState IO) a) RIO ()
+ partialsStorable = do
+ vs <- decodeN nsize
+ unless (VGM.null vs) $ do
+ liftIO $ sortParallel vs
+ C.yield vs
+ partialsStorable
+
+
+-- sort a vector in parallel threads
+sortParallel :: forall a v. (Ord a, VGM.MVector v a) => v (PrimState IO) a -> IO ()
+sortParallel v = sortPByBounds (0 :: Int) v 0 (VGM.length v)
+
+--sortPByBounds :: (Ord e) => Int -> VM.IOVector e -> Int -> Int -> IO ()
+sortPByBounds dep v start end
+ | end - start < 8192 || dep > 10 = sortByBounds compare v start end
+ | otherwise = do
+ let k = (start + end) `div` 2
+ selectByBounds compare v (k - start) start end
+ A.concurrently_
+ (sortPByBounds (dep + 1) v start k)
+ (sortPByBounds (dep + 1) v k end)
+
+
+isolateBySize :: Monad m => (a -> Int) -> Int -> C.ConduitT a a m ()
+isolateBySize sizer maxsize = awaitJust $ \next -> do
+ C.yield next
+ isolateBySize' (sizer next)
+ where
+ isolateBySize' !seen = awaitJust $ \next ->
+ if seen + sizer next < maxsize
+ then do
+ C.yield next
+ isolateBySize' $ seen + sizer next
+ else C.leftover next
diff --git a/Algorithms/SortMain.hs b/Algorithms/SortMain.hs
new file mode 100644
index 0000000..95dc955
--- /dev/null
+++ b/Algorithms/SortMain.hs
@@ -0,0 +1,164 @@
+{-# LANGUAGE LambdaCase, OverloadedStrings, FlexibleContexts, BangPatterns, PackageImports #-}
+
+module Algorithms.SortMain
+ ( sortMain
+ , sortMain'
+ , sortMainStorable
+ , mergeMain
+ , mergeMain'
+ ) where
+
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Char8 as B8
+import qualified Data.Conduit as C
+import qualified Data.Conduit.List as CL
+import qualified Data.Conduit.Binary as CB
+import qualified Data.Conduit.Algorithms as CAlg
+import Control.Concurrent (setNumCapabilities)
+import Control.Monad.IO.Class (liftIO)
+import Data.Conduit ((.|))
+import System.Console.GetOpt
+import System.Environment (getArgs)
+import Data.List (foldl')
+import Safe (atDef)
+import System.Exit (exitFailure)
+import Control.Monad.Trans.Resource (MonadResource)
+import Control.Monad.IO.Class (MonadIO)
+import Foreign
+
+import Algorithms.OutSort
+
+
+readAllFiles :: (MonadIO m, MonadResource m) =>
+ Bool -- ^ verbose
+ -> [FilePath] -- ^ file list
+ -> C.Source m B.ByteString
+readAllFiles False ifiles = sequence_ (map CB.sourceFile ifiles)
+readAllFiles True ifiles = readAllFiles' (zip [(1::Int)..] ifiles)
+ where
+ n = length ifiles
+ readAllFiles' [] = return ()
+ readAllFiles' ((i,f):rest) = do
+ liftIO $ putStrLn ("Reading file "++f++" ("++show i++"/"++show n++")")
+ CB.sourceFile f
+ readAllFiles' rest
+
+data CmdArgs = CmdArgs
+ { optIFile :: FilePath
+ , optOFile :: FilePath
+ , optIFileList :: FilePath
+ , optVerbose :: Bool
+ , nJobs :: Int
+ } deriving (Eq, Show)
+
+data CmdFlags = OutputFile FilePath
+ | InputFile FilePath
+ | ListFile FilePath
+ | NJobs Int
+ | Verbose
+ deriving (Eq, Show)
+
+options :: [OptDescr CmdFlags]
+options =
+ [ Option ['v'] ["verbose"] (NoArg Verbose) "verbose mode"
+ , Option ['i'] ["input"] (ReqArg InputFile "FILE") "Input file"
+ , Option ['F'] ["file-list"] (ReqArg ListFile "FILE") "Input is a list of files"
+ , Option ['o'] ["output"] (ReqArg OutputFile "FILE") "Output file"
+ , Option ['j'] ["threads", "jobs"] (ReqArg (NJobs . read) "INT") "Nr threads"
+ ]
+
+
+parseArgs :: [String] -> CmdArgs
+parseArgs argv = foldl' p (CmdArgs ifile ofile "" False 1) flags
+ where
+ (flags, args, _extraOpts) = getOpt Permute options argv
+ ifile = atDef "" args 0
+ ofile = atDef "" args 1
+
+ p c (OutputFile o) = c { optOFile = o }
+ p c (InputFile i) = c { optIFile = i }
+ p c (ListFile f) = c { optIFileList = f }
+ p c (NJobs n) = c { nJobs = n }
+ p c Verbose = c { optVerbose = True }
+
+extractIFiles :: CmdArgs -> IO [FilePath]
+extractIFiles opts = case (optIFile opts, optIFileList opts) of
+ (ifile, "") -> return [ifile]
+ ("", ffile) -> C.runConduitRes $
+ CB.sourceFile ffile
+ .| CB.lines
+ .| CL.map B8.unpack
+ .| CL.consume
+ _ -> do
+ putStrLn "Cannot pass both input file and -F argument"
+ exitFailure
+
+
+sortMain' :: Ord a =>
+ [String] -- ^ command line arguments
+ -> C.Conduit B.ByteString RIO a -- ^ decoder
+ -> C.Conduit a RIO B.ByteString -- ^ encoder
+ -> C.Conduit a RIO a -- ^ isolate a block
+ -> IO ()
+sortMain' args decoder encoder isolator = do
+ let opts = parseArgs args
+ nthreads = nJobs opts
+ setNumCapabilities nthreads
+ ifiles <- extractIFiles opts
+ outsort
+ decoder
+ encoder
+ isolator
+ (readAllFiles (optVerbose opts) ifiles)
+ (CB.sinkFileCautious $ optOFile opts)
+
+
+-- | Simple main function
+sortMain :: Ord a =>
+ C.Conduit B8.ByteString RIO a
+ -> C.Conduit a RIO B8.ByteString
+ -> C.Conduit a RIO a
+ -> IO ()
+sortMain decoder encoder isolator = do
+ args <- getArgs
+ sortMain' args decoder encoder isolator
+
+-- | Simple main function
+sortMainStorable :: (Storable a, Ord a, Show a) =>
+ a
+ -> Int
+ -> IO ()
+sortMainStorable dummy chunkSize = do
+ args <- getArgs
+ let opts = parseArgs args
+ nthreads = nJobs opts
+ setNumCapabilities nthreads
+ ifiles <- extractIFiles opts
+ outsortStorable
+ dummy
+ chunkSize
+ (readAllFiles (optVerbose opts) ifiles)
+ (CB.sinkFileCautious $ optOFile opts)
+
+mergeMain' :: Ord a =>
+ [String] -- ^ command line arguments
+ -> C.Conduit B.ByteString RIO a -- ^ decoder
+ -> C.Conduit a RIO B.ByteString -- ^ encoder
+ -> IO ()
+mergeMain' args decoder encoder = do
+ let opts = parseArgs args
+ nthreads = nJobs opts
+ setNumCapabilities nthreads
+ ifiles <- extractIFiles opts
+ C.runConduitRes $
+ CAlg.mergeC [(CB.sourceFile f .| decoder) | f <- ifiles]
+ .| encoder
+ .| CB.sinkFileCautious (optOFile opts)
+
+mergeMain :: Ord a =>
+ C.Conduit B8.ByteString RIO a -- ^ decoder
+ -> C.Conduit a RIO B8.ByteString -- ^ encoder
+ -> IO ()
+mergeMain decoder encoder = do
+ args <- getArgs
+ mergeMain' args decoder encoder
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..867c994
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,25 @@
+Copyright (c) 2012-2017
+Luis Pedro Coelho <luis@luispedro.org>
+Paulo Monteiro
+Renato Alves <renato.alves@embl.de>
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use,
+copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
diff --git a/ChangeLog b/ChangeLog
new file mode 100644
index 0000000..cb2d1e6
--- /dev/null
+++ b/ChangeLog
@@ -0,0 +1,2 @@
+Version 0.0.1.1 2019-07-11 by luispedro
+ * First release
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..49a9b1d
--- /dev/null
+++ b/README.md
@@ -0,0 +1,30 @@
+# Outsort: generic (Haskell-based) external sorting
+
+
+Example
+
+```haskell
+ import qualified Data.Conduit.Combinators as CC
+ import qualified Data.Conduit.Binary as CB
+
+ import Algorithms.OutSort (isolateBySize)
+ import Algorithms.SortMain (sortMain)
+
+ main :: IO ()
+ main = sortMain
+ CB.lines
+ CC.unlinesAscii
+ (isolateBySize (const 1) 500000)
+```
+
+All that is needed is a decoder (`ConduitT ByteString a m ()`), an encoder
+(`ConduitT ByteString a m ()`), and a function to split the input into blocks
+(`ConduitT a a m ()`). Given these elements, the result is a programme which can
+sort arbitrarily large inputs using external memory.
+
+Licence: MIT
+
+Author: [Luis Pedro Coelho](http://luispedro.org) (email:
+[coelho@embl.de](mailto:coelho@embl.de)) (on twitter:
+[@luispedrocoelho](https://twitter.com/luispedrocoelho))
+
diff --git a/SortLines.hs b/SortLines.hs
new file mode 100755
index 0000000..e1d94ba
--- /dev/null
+++ b/SortLines.hs
@@ -0,0 +1,32 @@
+#!/usr/bin/env stack
+{- stack
+ script
+ --resolver lts-9.2
+ --optimize
+ --package vector
+ --package conduit
+ --package conduit-extra
+ --package conduit-combinators
+ --package bytestring
+ --package vector-algorithms
+ --package transformers
+ --package containers
+ --package temporary
+ --package async
+ --package exceptions
+ --package Glob
+ --package filepath
+ --package resourcet
+ --package conduit-algorithms-0.0.4.0
+-}
+import qualified Data.Conduit.Combinators as CC
+import qualified Data.Conduit.Binary as CB
+
+import Algorithms.OutSort (isolateBySize)
+import Algorithms.SortMain (sortMain)
+
+main :: IO ()
+main = sortMain
+ CB.lines
+ CC.unlinesAscii
+ (isolateBySize (const 1) 500000)
diff --git a/outsort.cabal b/outsort.cabal
new file mode 100644
index 0000000..9b2f569
--- /dev/null
+++ b/outsort.cabal
@@ -0,0 +1,56 @@
+cabal-version: 1.12
+
+-- This file has been generated from package.yaml by hpack version 0.31.1.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: efa126c76ad8be6d454b7cfea73666df65f7ade248a92914f513b850be15ac7b
+
+name: outsort
+version: 0.1.0
+synopsis: External sorting package based on Conduit
+description: External (disk-backed) sorting package based on Conduit, saving intermediate files to disk and later merging them all.
+category: Algorithms
+maintainer: Luis Pedro Coelho <luis@luispedro.org>
+license: MIT
+license-file: COPYING
+build-type: Simple
+extra-source-files:
+ README.md
+ ChangeLog
+
+executable SortLines
+ main-is: SortLines.hs
+ other-modules:
+ Algorithms.OutSort
+ Algorithms.SortMain
+ Paths_outsort
+ hs-source-dirs:
+ ./
+ ghc-options: -Wall -O2 -threaded -fwarn-tabs
+ build-depends:
+ MissingH
+ , async
+ , base >=4.7 && <5
+ , bytestring
+ , conduit
+ , conduit-algorithms
+ , conduit-combinators
+ , conduit-extra
+ , containers
+ , deepseq
+ , directory
+ , exceptions
+ , filemanip
+ , filepath
+ , primitive
+ , resourcet
+ , safe
+ , safeio
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , vector
+ , vector-algorithms
+ default-language: Haskell2010