feat(csv-import): automagically determine csv delimiters
This commit is contained in:
parent
94a120808d
commit
3555322f2a
@ -75,6 +75,7 @@ dependencies:
|
||||
- blaze-html
|
||||
- conduit-resumablesink >=0.2
|
||||
- parsec
|
||||
- attoparsec
|
||||
- uuid
|
||||
- exceptions
|
||||
- stm
|
||||
|
||||
@ -19,6 +19,9 @@ import Import hiding (Header)
|
||||
import Data.Csv
|
||||
import Data.Csv.Conduit
|
||||
|
||||
-- import qualified Data.Csv.Util as Csv
|
||||
import qualified Data.Csv.Parser as Csv
|
||||
|
||||
import qualified Data.Conduit.List as C
|
||||
import qualified Data.Conduit.Combinators as C (sourceLazy)
|
||||
|
||||
@ -26,6 +29,11 @@ import qualified Data.Map as Map
|
||||
import qualified Data.Vector as Vector
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Lazy as A
|
||||
|
||||
|
||||
deriving instance Typeable CsvParseError
|
||||
instance Exception CsvParseError
|
||||
@ -33,14 +41,71 @@ instance Exception CsvParseError
|
||||
|
||||
typeCsv, typeCsv' :: ContentType
|
||||
typeCsv = "text/csv"
|
||||
typeCsv' = "text/csv; charset=UTF-8; header=present"
|
||||
typeCsv' = BS.intercalate "; " [typeCsv, "charset=UTF-8", "header=present"]
|
||||
|
||||
extensionCsv :: Extension
|
||||
extensionCsv = fromMaybe "csv" $ listToMaybe [ ext | (ext, mime) <- Map.toList mimeMap, mime == typeCsv ]
|
||||
|
||||
|
||||
decodeCsv :: (MonadThrow m, FromNamedRecord csv) => Conduit ByteString m csv
|
||||
decodeCsv = transPipe throwExceptT $ fromNamedCsv defaultDecodeOptions
|
||||
decodeCsv :: (MonadThrow m, FromNamedRecord csv, MonadLogger m) => Conduit ByteString m csv
|
||||
decodeCsv = transPipe throwExceptT $ do
|
||||
testBuffer <- accumTestBuffer LBS.empty
|
||||
mapM_ leftover $ LBS.toChunks testBuffer
|
||||
|
||||
let decodeOptions = guessDecodeOptions testBuffer
|
||||
$logInfoS "decodeCsv" [st|Guessed Csv.DecodeOptions from buffer of size #{tshow (LBS.length testBuffer)}/#{tshow testBufferSize}: #{tshow decodeOptions}|]
|
||||
|
||||
fromNamedCsv decodeOptions
|
||||
where
|
||||
testBufferSize = 4096
|
||||
accumTestBuffer acc
|
||||
| LBS.length acc >= testBufferSize = return acc
|
||||
| otherwise = do
|
||||
frag <- await
|
||||
case frag of
|
||||
Nothing -> return acc
|
||||
Just frag' -> accumTestBuffer (acc <> LBS.fromStrict frag')
|
||||
|
||||
guessDecodeOptions testBuffer
|
||||
| Just firstDQuote <- doubleQuote `LBS.elemIndex` testBuffer
|
||||
= if
|
||||
| firstDQuote /= 0
|
||||
-> Csv.DecodeOptions $ testBuffer `LBS.index` pred firstDQuote
|
||||
| A.Done unused _ <- A.parse quotedField testBuffer
|
||||
-> case A.parse endOfLine unused of
|
||||
A.Fail _ _ _
|
||||
| Just (nextChar, _) <- LBS.uncons unused
|
||||
-> defaultDecodeOptions { Csv.decDelimiter = nextChar }
|
||||
_other -> guessDecodeOptions $ LBS.take firstDQuote testBuffer <> unused
|
||||
| otherwise
|
||||
-> defaultDecodeOptions -- Parsing of something, which should be a quoted field, failed; bail now
|
||||
| A.Done _ ls <- A.parse (A.many1 $ A.manyTill A.anyWord8 endOfLine) testBuffer
|
||||
, (h:hs) <- filter (not . Map.null) $ map (fmap getSum . Map.unionsWith mappend . map (flip Map.singleton $ Sum 1)) ls
|
||||
, Just equals <- fromNullable $ Map.filterWithKey (\c n -> all ((== Just n) . Map.lookup c) hs) h
|
||||
, let maxH = maximum equals
|
||||
, [d] <- filter ((== Just maxH) . flip Map.lookup (toNullable equals)) . Map.keys $ toNullable equals
|
||||
= defaultDecodeOptions { Csv.decDelimiter = d }
|
||||
| otherwise
|
||||
= defaultDecodeOptions
|
||||
|
||||
|
||||
quotedField :: A.Parser () -- We don't care about the return value
|
||||
quotedField = void . Csv.field $ Csv.decDelimiter defaultDecodeOptions -- We can use comma as a separator, because we know that the field we're trying to parse is quoted and so does not rely on the delimiter
|
||||
|
||||
|
||||
endOfLine :: A.Parser ()
|
||||
endOfLine = asum
|
||||
[ void $ A.word8 newline
|
||||
, mapM_ (void . A.word8) [cr, newline]
|
||||
, void $ A.word8 cr
|
||||
]
|
||||
|
||||
doubleQuote, newline, cr :: Word8
|
||||
doubleQuote = 34
|
||||
newline = 10
|
||||
cr = 13
|
||||
|
||||
|
||||
|
||||
encodeCsv :: ( ToNamedRecord csv
|
||||
, DefaultOrdered csv
|
||||
@ -70,6 +135,7 @@ respondCsvDB src = respondSourceDB typeCsv' $ src .| encodeCsv .| awaitForever s
|
||||
|
||||
fileSourceCsv :: ( FromNamedRecord csv
|
||||
, MonadResource m
|
||||
, MonadLogger m
|
||||
)
|
||||
=> FileInfo
|
||||
-> Source m csv
|
||||
|
||||
Loading…
Reference in New Issue
Block a user