summaryrefslogtreecommitdiff
path: root/src/Data/CSV/Enumerator.hs
blob: 63ef42f169ba440cbf15aa04ea2db7a83e126ef6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
{-# LANGUAGE OverloadedStrings, BangPatterns #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE TypeSynonymInstances #-}

module Data.CSV.Enumerator 
  ( 
   -- * CSV Data types
    Row   -- Simply @[ByteString]@
  , Field   -- Simply @ByteString@
  , MapRow  

  , CSVeable(..)

  , ParsedRow(..) 

  -- * CSV Setttings
  , CSVSettings(..)
  , defCSVSettings

  -- * Reading / Writing CSV Files
  , readCSVFile
  , writeCSVFile
  , appendCSVFile

  -- * Folding Over CSV Files 
  -- | These enumerators generalize the map* family of functions with a running accumulator.
  , CSVAction
  , funToIter
  , funToIterIO

  -- * Primitive Iteratees
  , collectRows
  , outputRowIter
  , outputRowsIter

  -- * Other Utilities
  , outputRow
  , outputRows
  , outputColumns
  , writeHeaders
  )

where

import Control.Applicative hiding (many)
import Control.Exception (bracket, SomeException)
import Control.Monad (mzero, mplus, foldM, when)
import Control.Monad.IO.Class (liftIO, MonadIO)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Data.ByteString.Char8 (ByteString)
import Data.ByteString.Internal (c2w)
import qualified Data.Map as M
import System.Directory
import System.IO
import System.PosixCompat.Files (getFileStatus, fileSize)

import Data.Attoparsec as P hiding (take)
import qualified Data.Attoparsec.Char8 as C8
-- import Data.Attoparsec.Enum
import Data.Attoparsec.Enumerator
import qualified Data.Enumerator as E
import Data.Enumerator (($$), yield, continue)
import Data.Enumerator.Binary (enumFile)
import Data.Word (Word8)
import Safe (headMay)

import Data.CSV.Enumerator.Types

class CSVeable r where

  -- | Convert a CSV row into strict ByteString equivalent.
  rowToStr :: CSVSettings -> r -> B.ByteString

  -- | Possibly return headers for a list of rows.
  fileHeaders :: [r] -> Maybe Row

  -- | The raw iteratee to process any Enumerator stream
  iterCSV :: CSVSettings
          -> CSVAction r a
          -> a
          -> E.Iteratee B.ByteString IO a

  -- | Iteratee to push rows into a given file
  fileSink :: CSVSettings
           -> FilePath
           -> (Maybe Handle, Int)
           -> ParsedRow r
           -> E.Iteratee B.ByteString IO (Maybe Handle, Int)

  -- | Open & fold over the CSV file.  Processing starts on row 2 for MapRow
  -- instance to use first row as column headers.
  foldCSVFile :: FilePath -- ^ File to open as a CSV file
              -> CSVSettings -- ^ CSV settings to use on the input file
              -> CSVAction r a -- ^ Fold action
              -> a  -- ^ Initial accumulator
              -> IO (Either SomeException a) -- ^ Error or the resulting accumulator

  -- | Take a CSV file, apply function to each of its rows and save the
  -- resulting rows into a new file.
  --
  -- Each row is simply a list of fields.
  mapCSVFile :: FilePath         -- ^ Input file
             -> CSVSettings      -- ^ CSV Settings
             -> (r -> [r])       -- ^ A function to map a row onto rows
             -> FilePath         -- ^ Output file
             -> IO (Either SomeException Int)    -- ^ Number of rows processed 
  mapCSVFile fi s f fo = do
    res <- foldCSVFile fi s iter (Nothing, 0)
    return $ snd `fmap` res
    where
      iter !acc (ParsedRow (Just !r)) = foldM chain acc (f r) 
      iter !acc x = fileSink s fo acc x

      chain !acc !r = singleSink r acc

      singleSink !x !acc = fileSink s fo acc (ParsedRow (Just x))


  ----------------------------------------------------------------------------
  -- | Like 'mapCSVFile' but operates on multiple files pouring results into
  -- a single file.
  mapCSVFiles :: [FilePath]       -- ^ Input files
              -> CSVSettings      -- ^ CSV Settings
              -> (r -> [r])       -- ^ A function to map a row onto rows
              -> FilePath         -- ^ Output file
              -> IO (Either SomeException Int)    -- ^ Number of rows processed 

------------------------------------------------------------------------------
-- | 'Row' instance for 'CSVeable'
instance CSVeable Row where
  rowToStr s !r = 
    let 
      sep = B.pack [c2w (csvOutputColSep s)] 
      wrapField !f = case (csvOutputQuoteChar s) of
        Just !x -> x `B8.cons` escape x f `B8.snoc` x
        otherwise -> f
      escape c str = B8.intercalate (B8.pack [c,c]) $ B8.split c str
    in B.intercalate sep . map wrapField $ r
  
  fileHeaders _ = Nothing


  iterCSV csvs f acc = loop acc
    where
      loop !acc' = do
        eof <- E.isEOF 
        case eof of
          True -> f acc' EOF
          False -> comboIter acc'
      procRow acc' = rowParser csvs >>= f acc' . ParsedRow
      comboIter acc' = procRow acc' >>= loop
      

  foldCSVFile fp csvs f acc = E.run iter 
    where
      iter = enumFile fp $$ iterCSV csvs f acc


  fileSink csvs fo = iter 
    where
      iter :: (Maybe Handle, Int) 
           -> ParsedRow Row 
           -> E.Iteratee B.ByteString IO (Maybe Handle, Int)

      iter acc@(oh, i) EOF = case oh of
          Just oh' -> liftIO (hClose oh') >> yield (Nothing, i) E.EOF
          Nothing -> yield acc E.EOF

      iter acc (ParsedRow Nothing) = yield acc (E.Chunks [])

      iter (Nothing, !i) r = do
        oh <- liftIO $ openFile fo WriteMode
        iter (Just oh, i) r

      iter (Just oh, !i) (ParsedRow (Just r)) = do 
        outputRowIter csvs oh r 
        yield (Just oh, i+1) (E.Chunks [])


  mapCSVFiles fis s f fo = foldM stepFile (Right 0) fis
    where
      stepFile :: (Either SomeException Int) 
               -> FilePath 
               -> IO (Either SomeException Int)
      stepFile res0 fi = do 
        case res0 of
          Left x -> return $ Left x
          Right i -> do 
            res <- foldCSVFile fi s (iter fi) (Nothing, i)
            return $ fmap snd res

      iter :: FilePath
           -> (Maybe Handle, Int) 
           -> ParsedRow Row 
           -> E.Iteratee B.ByteString IO (Maybe Handle, Int)
      iter fi acc@(oh, i) EOF = case oh of
        Just oh' -> liftIO (hClose oh') >> yield (Nothing, i) E.EOF
        Nothing -> yield acc E.EOF
      iter fi acc (ParsedRow Nothing) = return acc
      iter fi (Nothing, !i) (ParsedRow (Just r)) = do
        let row' = f r
        oh <- liftIO $ openFile fo AppendMode
        iter fi (Just oh, i) (ParsedRow (Just r))
      iter fi (Just oh, !i) (ParsedRow (Just r)) = do 
        outputRowsIter s oh (f r) 
        return (Just oh, i+1)



------------------------------------------------------------------------------
-- 'MapRow' instance for 'CSVeable'
instance CSVeable MapRow where
  rowToStr s r = rowToStr s . M.elems $ r

  fileHeaders rs = headMay rs >>= return . M.keys

  iterCSV csvs f !acc = loop ([], acc)
    where
      loop (headers, !acc') = do
        eof <- E.isEOF 
        case eof of
          True -> f acc' EOF
          False -> comboIter headers acc'

      comboIter !headers !acc' = do 
        a <- procRow headers acc' 
        loop (headers, a)

      -- Fill headers if not yet filled
      procRow [] !acc' = rowParser csvs >>= (\(Just hs) -> loop (hs, acc'))

      -- Process starting w/ the second row
      procRow !headers !acc' = rowParser csvs >>= 
                               toMapCSV headers >>= 
                               f acc' . ParsedRow 

      toMapCSV !headers !fs = yield (fs >>= (Just . M.fromList . zip headers)) (E.Chunks [])

  foldCSVFile fp csvs f !acc = E.run (enumFile fp $$ iterCSV csvs f acc)


  fileSink s fo = mapIter
    where
      mapIter :: (Maybe Handle, Int) 
              -> ParsedRow MapRow 
              -> E.Iteratee B.ByteString IO (Maybe Handle, Int)
      mapIter acc@(oh, !i) EOF = case oh of
        Just oh' -> liftIO (hClose oh') >> yield (Nothing, i) E.EOF
        Nothing -> yield acc E.EOF
      mapIter !acc (ParsedRow Nothing) = return acc
      mapIter (Nothing, !i) (ParsedRow (Just (!r))) = do
        oh <- liftIO $ do
          oh' <- openFile fo WriteMode
          B.hPutStrLn oh' . rowToStr s . M.keys $ r
          return oh'
        mapIter (Just oh, i) (ParsedRow (Just r))
      mapIter (Just oh, !i) (ParsedRow (Just (!r))) = do
        outputRowIter s oh r 
        return (Just oh, i+1)


  mapCSVFiles fis s f fo = foldM stepFile (Right 0) fis
    where
      stepFile res0 fi = do 
        case res0 of
          Left x -> return $ Left x
          Right i -> do 
            res <- foldCSVFile fi s (iter fi) (Nothing, i)
            return $ fmap snd res

      addFileSource fi r = M.insert "FromFile" (B8.pack fi) r

      iter :: FilePath
           -> (Maybe Handle, Int) 
           -> ParsedRow MapRow 
           -> E.Iteratee B.ByteString IO (Maybe Handle, Int)
      iter fi acc@(oh, i) EOF = case oh of
        Just oh' -> liftIO (hClose oh') >> yield (Nothing, i) E.EOF
        Nothing -> yield acc E.EOF
      iter fi acc (ParsedRow Nothing) = return acc
      iter fi (Nothing, !i) (ParsedRow (Just r)) = do
        case f r of
          [] -> return (Nothing, i) -- the fn did not return any rows at all!
          (x:_) -> do
            oh <- liftIO $ do
              exist <- doesFileExist fo
              oh' <- openFile fo AppendMode
              case exist of
                True -> return ()
                False -> B.hPutStrLn oh' . rowToStr s . M.keys . (addFileSource fi) $ x
              return oh'
            iter fi (Just oh, i) (ParsedRow (Just r))
      iter fi (Just oh, !i) (ParsedRow (Just r)) = 
        let rows = f . addFileSource fi $ r
        in do
          outputRowsIter s oh rows 
          return (Just oh, i+1)


------------------------------------------------------------------------------
readCSVFile :: (CSVeable r) => CSVSettings  -- ^ CSV settings
            -> FilePath   -- ^ FilePath
            -> IO (Either SomeException [r])  -- ^ Collected data
readCSVFile s fp = do
  res <- foldCSVFile fp s collectRows []
  return $ case res of
    Left e -> Left e
    Right rs -> Right (reverse rs)


------------------------------------------------------------------------------
writeCSVFile :: (CSVeable r) => CSVSettings   -- ^ CSV settings
             -> FilePath  -- ^ Target file path
             -> [r]   -- ^ Data to be output
             -> IO Int  -- ^ Number of rows written
writeCSVFile s fp rs = 
  let doOutput h = writeHeaders s h rs >> outputRowsIter h
      outputRowsIter h = foldM (step h) 0  . map (rowToStr s) $ rs
      step h acc x = (B.hPutStrLn h x) >> return (acc+1)
  in bracket
      (openFile fp WriteMode)
      (hClose)
      (doOutput)

------------------------------------------------------------------------------
appendCSVFile :: (CSVeable r) => CSVSettings   -- ^ CSV settings
             -> FilePath  -- ^ Target file path
             -> [r]   -- ^ Data to be output
             -> IO Int  -- ^ Number of rows written
appendCSVFile s fp rs = 
  let doOutput (c,h) = when c (writeHeaders s h rs) >> outputRowsIter h
      outputRowsIter h = foldM (step h) 0  . map (rowToStr s) $ rs
      step h acc x = (B.hPutStrLn h x) >> return (acc+1)
      chkOpen = do
        writeHeaders <- do
          fe <- doesFileExist fp 
          if fe
          	then do
          	  fs <- getFileStatus fp >>= return . fileSize
          	  return $ if fs > 0 then False else True
            else return True
        h <- openFile fp AppendMode
        return (writeHeaders, h)
  in bracket
      (chkOpen)
      (hClose . snd)
      (doOutput)

------------------------------------------------------------------------------
-- | Output given row into given handle
outputRow :: CSVeable r => CSVSettings -> Handle -> r -> IO ()
outputRow s oh = B.hPutStrLn oh . rowToStr s


outputRows :: CSVeable r => CSVSettings -> Handle -> [r] -> IO ()
outputRows s oh = mapM_ (outputRow s oh)


-- | Expand or contract the given 'MapRow' to contain exactly the given set of
-- columns and then write the row into the given 'Handle'.
--
-- This is helpful in filtering the columns or perhaps combining a number of
-- files that don't have the same columns. 
--
-- Missing columns will be left empty.
outputColumns :: CSVSettings -> Handle -> [ByteString] -> MapRow -> IO ()
outputColumns s h cs r = outputRow s h r'
  where
    r' = M.fromList $ map formCol cs
    formCol x = (x, maybe "" id $ M.lookup x r)



writeHeaders :: CSVeable r => CSVSettings -> Handle -> [r] -> IO ()
writeHeaders s h rs = case fileHeaders rs of
  Just hs -> B.hPutStrLn h . rowToStr s $ hs
  Nothing -> return ()


outputRowIter :: CSVeable r => CSVSettings -> Handle -> r -> E.Iteratee B.ByteString IO ()
outputRowIter s oh = liftIO . outputRow s oh


outputRowsIter :: CSVeable r => CSVSettings -> Handle -> [r] -> E.Iteratee B.ByteString IO ()
outputRowsIter s oh rs = mapM_ (outputRowIter s oh) rs


------------------------------------------------------------------------------
-- | A datatype that incorporates the signaling of parsing status to the
--user-developed iteratee.
--
-- We need this because some iteratees do interleaved IO (such as outputting to
-- a file via a handle inside the accumulator) and some final actions may need
-- to be taken upon encountering EOF (such as closing the interleaved handle).
--
-- Use this datatype when developing iteratees for use with fold* family of
-- functions (Row enumarators).
data (CSVeable r) => ParsedRow r = ParsedRow (Maybe r) | EOF


------------------------------------------------------------------------------
-- | An iteratee that processes each row of a CSV file and updates the
-- accumulator.
--
-- You would implement one of these to use with the 'foldCSVFile' function.
type CSVAction r a = a -> ParsedRow r -> E.Iteratee B.ByteString IO a


------------------------------------------------------------------------------
-- | Convenience converter for fold step functions that live in the IO monad.
--
-- Use this if you don't want to deal with Iteratees when writing your fold
-- functions.
funToIterIO :: (CSVeable r) => (a -> ParsedRow r -> IO a) -> CSVAction r a
funToIterIO f = iterf
  where
    iterf !acc EOF = liftIO (f acc EOF) >>= \(!acc') -> yield acc' E.EOF
    iterf !acc r = liftIO (f acc r) >>= \(!acc') -> yield acc' (E.Chunks [])


------------------------------------------------------------------------------
-- | Convenience converter for fold step functions that are pure.
--
-- Use this if you don't want to deal with Iteratees when writing your fold
-- functions.
funToIter :: (CSVeable r) => (a -> ParsedRow r -> a) -> CSVAction r a
funToIter f = iterf
  where
    iterf !acc EOF = yield (f acc EOF) E.EOF
    iterf !acc r = yield (f acc r) (E.Chunks [])


------------------------------------------------------------------------------
-- | Just collect all rows into an array. This will cancel out the incremental
-- nature of this library.
collectRows :: CSVeable r => CSVAction r [r]
collectRows acc EOF = yield acc (E.Chunks [])
collectRows acc (ParsedRow (Just r)) = let a' = (r:acc) 
                                       in a' `seq` yield a' (E.Chunks [])
collectRows acc (ParsedRow Nothing) = yield acc (E.Chunks [])

-- * Parsers

rowParser :: (Monad m, MonadIO m) => CSVSettings -> E.Iteratee B.ByteString m (Maybe Row)
rowParser csvs = E.catchError p handler 
  where 
    p = iterParser $ row csvs
    handler e = do
      liftIO $ putStrLn ("Error in parsing: " ++ show e)
      yield Nothing (E.Chunks [])
      
row :: CSVSettings -> Parser (Maybe Row)
row csvs = csvrow csvs <|> badrow

badrow :: Parser (Maybe Row)
badrow = P.takeWhile (not . C8.isEndOfLine) *> 
         (C8.endOfLine <|> C8.endOfInput) *> return Nothing

csvrow :: CSVSettings -> Parser (Maybe Row)
csvrow c = 
  let !rowbody = (quotedField' <|> (field c)) `sepBy` C8.char (csvSep c)
      !properrow = rowbody <* (C8.endOfLine <|> P.endOfInput)
      quotedField' = case csvQuoteChar c of
          Nothing -> mzero
          Just q' -> try (quotedField q')
  in do
    res <- properrow
    return $ Just res

field :: CSVSettings -> Parser Field
field s = P.takeWhile (isFieldChar s) <?> "Parsing a regular field"

isFieldChar s = notInClass xs'
  where xs = csvSep s : "\n\r"
        xs' = case csvQuoteChar s of 
          Nothing -> xs
          Just x -> x : xs

quotedField :: Char -> Parser Field
quotedField c = let w = c2w c in do
  (C8.char c) <?> "Quote start"
  f <- many (notWord8 w <|> (string (B.pack $ [w,w]) *> return w))
  (C8.char c) <?> "Quote end"
  return $ B.pack f