From 72bf08129cc91b898b8a44034418077d65e57e17 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Fri, 8 Mar 2019 15:54:36 -0800 Subject: [PATCH] Add support for S3Select API (#108) --- docs/API.md | 55 ++++- examples/SelectObject.hs | 50 ++++ minio-hs.cabal | 19 +- src/Network/Minio.hs | 4 + src/Network/Minio/Data.hs | 237 ++++++++++++++++++- src/Network/Minio/Errors.hs | 1 + src/Network/Minio/SelectAPI.hs | 302 ++++++++++++++++++++++++ src/Network/Minio/XmlGenerator.hs | 79 ++++++- src/Network/Minio/XmlParser.hs | 12 + stack.yaml | 2 +- test/Network/Minio/XmlGenerator/Test.hs | 46 ++++ test/Network/Minio/XmlParser/Test.hs | 27 ++- 12 files changed, 818 insertions(+), 16 deletions(-) create mode 100755 examples/SelectObject.hs create mode 100644 src/Network/Minio/SelectAPI.hs diff --git a/docs/API.md b/docs/API.md index ce9affe..dd3b8cd 100644 --- a/docs/API.md +++ b/docs/API.md @@ -28,7 +28,7 @@ awsCI { connectAccesskey = "your-access-key" |[`listObjects`](#listObjects)|[`fPutObject`](#fPutObject)|| |[`listObjectsV1`](#listObjectsV1)|[`copyObject`](#copyObject)|| |[`listIncompleteUploads`](#listIncompleteUploads)|[`removeObject`](#removeObject)|| -|[`bucketExists`](#bucketExists)||| +|[`bucketExists`](#bucketExists)|[`selectObjectContent`](#selectObjectContent)|| ## 1. Connecting and running operations on the storage service @@ -743,6 +743,59 @@ main = do Right _ -> putStrLn "Removed incomplete upload successfully" ``` + +### selectObjectContent :: Bucket -> Object -> SelectRequest -> Minio (ConduitT () EventMessage Minio ()) +Removes an ongoing multipart upload of an object from the service + +__Parameters__ + +In the expression `selectObjectContent bucketName objectName selReq` +the parameters are: + +|Param |Type |Description | +|:---|:---| :---| +| `bucketName` | _Bucket_ (alias for `Text`) | Name of the bucket | +| `objectName` | _Object_ (alias for `Text`) | Name of the object | +| `selReq` | _SelectRequest_ | Select request parameters | + +__SelectRequest record__ + +This record is created using `selectRequest`. Please refer to the Haddocks for further information. + +__Return Value__ + +The return value can be used to read individual `EventMessage`s in the response. Please refer to the Haddocks for further information. + +|Return type | Description | +|:---|:---| +| _Minio (C.conduitT () EventMessage Minio ())_ | A Conduit source of `EventMessage` values. | + +__Example__ + +```haskell +{-# Language OverloadedStrings #-} +import Network.Minio + +import qualified Conduit as C + +main :: IO () +main = do + let + bucket = "mybucket" + object = "myobject" + + res <- runMinio minioPlayCI $ do + let sr = selectRequest "Select * from s3object" + defaultCsvInput defaultCsvOutput + res <- selectObjectContent bucket object sr + C.runConduit $ res C..| getPayloadBytes C..| C.stdoutC + + case res of + Left _ -> putStrLn "Failed!" + Right _ -> putStrLn "Success!" +``` + + ### bucketExists :: Bucket -> Minio Bool Checks if a bucket exists. diff --git a/examples/SelectObject.hs b/examples/SelectObject.hs new file mode 100755 index 0000000..6243cd1 --- /dev/null +++ b/examples/SelectObject.hs @@ -0,0 +1,50 @@ +#!/usr/bin/env stack +-- stack --resolver lts-13.1 runghc --package minio-hs + +-- +-- Minio Haskell SDK, (C) 2019 Minio, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +{-# LANGUAGE OverloadedStrings #-} +import Network.Minio + +import qualified Conduit as C +import Control.Monad (when) +import qualified Data.ByteString.Lazy as LB + +import Prelude + +main :: IO () +main = do + let bucket = "selectbucket" + object = "1.csv" + content = "Name,Place,Temperature\n" + <> "James,San Jose,76\n" + <> "Alicia,San Leandro,88\n" + <> "Mark,San Carlos,90\n" + + res <- runMinio minioPlayCI $ do + + exists <- bucketExists bucket + when (not exists) $ + makeBucket bucket Nothing + + C.liftIO $ putStrLn "Uploading csv object" + putObject bucket object (C.sourceLazy content) Nothing defaultPutObjectOptions + + let sr = selectRequest "Select * from s3object" defaultCsvInput defaultCsvOutput + res <- selectObjectContent bucket object sr + C.runConduit $ res C..| getPayloadBytes C..| C.stdoutC + print res diff --git a/minio-hs.cabal b/minio-hs.cabal index e65ce14..5eb1018 100644 --- a/minio-hs.cabal +++ b/minio-hs.cabal @@ -41,6 +41,7 @@ library , Network.Minio.ListOps , Network.Minio.PresignedOperations , Network.Minio.PutObject + , Network.Minio.SelectAPI , Network.Minio.Sign.V4 , Network.Minio.Utils , Network.Minio.XmlGenerator @@ -50,6 +51,7 @@ library , protolude >= 0.2 && < 0.3 , aeson >= 1.2 , base64-bytestring >= 1.0 + , binary >= 0.8.5.0 , bytestring >= 0.10 , case-insensitive >= 1.2 , conduit >= 1.3 @@ -57,6 +59,7 @@ library , containers >= 0.5 , cryptonite >= 0.25 , cryptonite-conduit >= 0.2 + , digest >= 0.0.1 , directory , filepath >= 1.4 , http-client >= 0.5 @@ -64,12 +67,14 @@ library , http-types >= 0.12 , ini , memory >= 0.14 + , raw-strings-qq >= 1 , resourcet >= 1.2 , text >= 1.2 , time >= 1.8 , transformers >= 0.5 , unliftio >= 0.2 , unliftio-core >= 0.1 + , unordered-containers >= 0.2 , xml-conduit >= 1.8 default-language: Haskell2010 default-extensions: BangPatterns @@ -120,6 +125,7 @@ test-suite minio-hs-live-server-test , Network.Minio.PresignedOperations , Network.Minio.PutObject , Network.Minio.S3API + , Network.Minio.SelectAPI , Network.Minio.Sign.V4 , Network.Minio.TestHelpers , Network.Minio.Utils @@ -131,11 +137,12 @@ test-suite minio-hs-live-server-test , Network.Minio.XmlParser.Test , Network.Minio.JsonParser , Network.Minio.JsonParser.Test - build-depends: base + build-depends: base >= 4.7 && < 5 , minio-hs , protolude >= 0.1.6 , aeson , base64-bytestring + , binary , bytestring , case-insensitive , conduit @@ -143,6 +150,7 @@ test-suite minio-hs-live-server-test , containers , cryptonite , cryptonite-conduit + , digest , directory , filepath , http-client @@ -151,6 +159,7 @@ test-suite minio-hs-live-server-test , ini , memory , QuickCheck + , raw-strings-qq >= 1 , resourcet , tasty , tasty-hunit @@ -162,6 +171,7 @@ test-suite minio-hs-live-server-test , transformers , unliftio , unliftio-core + , unordered-containers , xml-conduit if !flag(live-test) buildable: False @@ -170,11 +180,12 @@ test-suite minio-hs-test type: exitcode-stdio-1.0 hs-source-dirs: test, src main-is: Spec.hs - build-depends: base + build-depends: base >= 4.7 && < 5 , minio-hs , protolude >= 0.1.6 , aeson , base64-bytestring + , binary , bytestring , case-insensitive , conduit @@ -183,6 +194,7 @@ test-suite minio-hs-test , cryptonite , cryptonite-conduit , filepath + , digest , directory , http-client , http-conduit @@ -190,6 +202,7 @@ test-suite minio-hs-test , ini , memory , QuickCheck + , raw-strings-qq >= 1 , resourcet , tasty , tasty-hunit @@ -201,6 +214,7 @@ test-suite minio-hs-test , transformers , unliftio , unliftio-core + , unordered-containers , xml-conduit ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N default-language: Haskell2010 @@ -230,6 +244,7 @@ test-suite minio-hs-test , Network.Minio.PresignedOperations , Network.Minio.PutObject , Network.Minio.S3API + , Network.Minio.SelectAPI , Network.Minio.Sign.V4 , Network.Minio.TestHelpers , Network.Minio.Utils diff --git a/src/Network/Minio.hs b/src/Network/Minio.hs index 0cc6632..209a7db 100644 --- a/src/Network/Minio.hs +++ b/src/Network/Minio.hs @@ -158,6 +158,9 @@ module Network.Minio , removeObject , removeIncompleteUpload + -- ** Select Object Content with SQL + , module Network.Minio.SelectAPI + -- * Presigned Operations ------------------------- , UrlExpiry @@ -207,6 +210,7 @@ import Network.Minio.Errors import Network.Minio.ListOps import Network.Minio.PutObject import Network.Minio.S3API +import Network.Minio.SelectAPI import Network.Minio.Utils -- | Lists buckets. diff --git a/src/Network/Minio/Data.hs b/src/Network/Minio/Data.hs index 349adbf..8e1e825 100644 --- a/src/Network/Minio/Data.hs +++ b/src/Network/Minio/Data.hs @@ -14,6 +14,7 @@ -- limitations under the License. -- +{-# LANGUAGE CPP #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE TypeFamilies #-} module Network.Minio.Data where @@ -25,6 +26,7 @@ import Control.Monad.IO.Unlift (MonadUnliftIO, UnliftIO (..), import Control.Monad.Trans.Resource import qualified Data.ByteString as B import Data.CaseInsensitive (mk) +import qualified Data.HashMap.Strict as H import qualified Data.Ini as Ini import qualified Data.Map as Map import Data.String (IsString (..)) @@ -511,6 +513,237 @@ data Notification = Notification defaultNotification :: Notification defaultNotification = Notification [] [] [] + +-------------------------------------------------------------------------- +-- Select API Related Types +-------------------------------------------------------------------------- + +-- | SelectRequest represents the Select API call. Use the +-- `selectRequest` function to create a value of this type. +data SelectRequest = SelectRequest + { srExpression :: Text + , srExpressionType :: ExpressionType + , srInputSerialization :: InputSerialization + , srOutputSerialization :: OutputSerialization + , srRequestProgressEnabled :: Maybe Bool + } deriving (Eq, Show) + +data ExpressionType = SQL + deriving (Eq, Show) + +-- | InputSerialization represents format information of the input +-- object being queried. Use one of the smart constructors such as +-- `defaultCsvInput` as a starting value, and add compression info +-- using `setInputCompressionType` +data InputSerialization = InputSerialization + { isCompressionType :: Maybe CompressionType + , isFormatInfo :: InputFormatInfo + } deriving (Eq, Show) + +data CompressionType = CompressionTypeNone + | CompressionTypeGzip + | CompressionTypeBzip2 + deriving (Eq, Show) + +data InputFormatInfo = InputFormatCSV CSVInputProp + | InputFormatJSON JSONInputProp + | InputFormatParquet + deriving (Eq, Show) + +-- | defaultCsvInput returns InputSerialization with default CSV +-- format, and without any compression setting. +defaultCsvInput :: InputSerialization +defaultCsvInput = InputSerialization Nothing (InputFormatCSV defaultCSVProp) + +-- | linesJsonInput returns InputSerialization with JSON line based +-- format with no compression setting. +linesJsonInput :: InputSerialization +linesJsonInput = InputSerialization Nothing + (InputFormatJSON $ JSONInputProp JSONTypeLines) + +-- | documentJsonInput returns InputSerialization with JSON document +-- based format with no compression setting. +documentJsonInput :: InputSerialization +documentJsonInput = InputSerialization Nothing + (InputFormatJSON $ JSONInputProp JSONTypeDocument) + +-- | defaultParquetInput returns InputSerialization with Parquet +-- format, and no compression setting. +defaultParquetInput :: InputSerialization +defaultParquetInput = InputSerialization Nothing InputFormatParquet + +-- | setInputCompressionType sets the compression type for the input +-- of the SelectRequest +setInputCompressionType :: CompressionType -> SelectRequest + -> SelectRequest +setInputCompressionType c i = + let is = srInputSerialization i + is' = is { isCompressionType = Just c } + in i { srInputSerialization = is' } + +-- | defaultCsvOutput returns OutputSerialization with default CSV +-- format. +defaultCsvOutput :: OutputSerialization +defaultCsvOutput = OutputSerializationCSV defaultCSVProp + +-- | defaultJsonInput returns OutputSerialization with default JSON +-- format. +defaultJsonOutput :: OutputSerialization +defaultJsonOutput = OutputSerializationJSON (JSONOutputProp Nothing) + +-- | selectRequest is used to build a `SelectRequest` +-- value. @selectRequest query inputSer outputSer@ represents a +-- SelectRequest with the SQL query text given by @query@, the input +-- serialization settings (compression format and format information) +-- @inputSer@ and the output serialization settings @outputSer@. +selectRequest :: Text -> InputSerialization -> OutputSerialization + -> SelectRequest +selectRequest sqlQuery inputSer outputSer = + SelectRequest { srExpression = sqlQuery + , srExpressionType = SQL + , srInputSerialization = inputSer + , srOutputSerialization = outputSer + , srRequestProgressEnabled = Nothing + } + +-- | setRequestProgressEnabled sets the flag for turning on progress +-- messages when the Select response is being streamed back to the +-- client. +setRequestProgressEnabled :: Bool -> SelectRequest -> SelectRequest +setRequestProgressEnabled enabled sr = + sr { srRequestProgressEnabled = Just enabled } + +type CSVInputProp = CSVProp + +-- | CSVProp represents CSV format properties. It is built up using +-- the Monoid instance. +data CSVProp = CSVProp (H.HashMap Text Text) + deriving (Eq, Show) + +#if (__GLASGOW_HASKELL__ >= 804) +instance Semigroup CSVProp where + (CSVProp a) <> (CSVProp b) = CSVProp (b <> a) +#endif + +instance Monoid CSVProp where + mempty = CSVProp mempty +#if (__GLASGOW_HASKELL__ < 804) + mappend (CSVProp a) (CSVProp b) = CSVProp (b <> a) +#endif + +defaultCSVProp :: CSVProp +defaultCSVProp = mempty + +recordDelimiter :: Text -> CSVProp +recordDelimiter = CSVProp . H.singleton "RecordDelimiter" + +fieldDelimiter :: Text -> CSVProp +fieldDelimiter = CSVProp . H.singleton "FieldDelimiter" + +quoteCharacter :: Text -> CSVProp +quoteCharacter = CSVProp . H.singleton "QuoteCharacter" + +quoteEscapeCharacter :: Text -> CSVProp +quoteEscapeCharacter = CSVProp . H.singleton "QuoteEscapeCharacter" + +-- | FileHeaderInfo specifies information about column headers for CSV +-- format. +data FileHeaderInfo + = FileHeaderNone -- ^ No column headers are present + | FileHeaderUse -- ^ Headers are present and they should be used + | FileHeaderIgnore -- ^ Header are present, but should be ignored + deriving (Eq, Show) + +fileHeaderInfo :: FileHeaderInfo -> CSVProp +fileHeaderInfo = CSVProp . H.singleton "FileHeaderInfo" . toString + where + toString FileHeaderNone = "NONE" + toString FileHeaderUse = "USE" + toString FileHeaderIgnore = "IGNORE" + +commentCharacter :: Text -> CSVProp +commentCharacter = CSVProp . H.singleton "Comments" + +allowQuotedRecordDelimiter :: CSVProp +allowQuotedRecordDelimiter = CSVProp $ H.singleton "AllowQuotedRecordDelimiter" "TRUE" + +-- | Set the CSV format properties in the InputSerialization. +setInputCSVProps :: CSVProp -> InputSerialization -> InputSerialization +setInputCSVProps p is = is { isFormatInfo = InputFormatCSV p } + +-- | Set the CSV format properties in the OutputSerialization. +outputCSVFromProps :: CSVProp -> OutputSerialization +outputCSVFromProps p = OutputSerializationCSV p + +data JSONInputProp = JSONInputProp { jsonipType :: JSONType } + deriving (Eq, Show) + +data JSONType = JSONTypeDocument | JSONTypeLines + deriving (Eq, Show) + + +-- | OutputSerialization represents output serialization settings for +-- the SelectRequest. Use `defaultCsvOutput` or `defaultJsonOutput` as +-- a starting point. +data OutputSerialization = OutputSerializationJSON JSONOutputProp + | OutputSerializationCSV CSVOutputProp + deriving (Eq, Show) + +type CSVOutputProp = CSVProp + +-- | quoteFields is an output serialization parameter +quoteFields :: QuoteFields -> CSVProp +quoteFields q = CSVProp $ H.singleton "QuoteFields" $ + case q of + QuoteFieldsAsNeeded -> "ASNEEDED" + QuoteFieldsAlways -> "ALWAYS" + +data QuoteFields = QuoteFieldsAsNeeded | QuoteFieldsAlways + deriving (Eq, Show) + +data JSONOutputProp = JSONOutputProp { jsonopRecordDelimiter :: Maybe Text } + deriving (Eq, Show) + +-- | Set the output record delimiter for JSON format +outputJSONFromRecordDelimiter :: Text -> OutputSerialization +outputJSONFromRecordDelimiter t = + OutputSerializationJSON (JSONOutputProp $ Just t) + +-- Response related types + +-- | An EventMessage represents each kind of message received from the server. +data EventMessage = ProgressEventMessage { emProgress :: Progress } + | StatsEventMessage { emStats :: Stats } + | RequestLevelErrorMessage { emErrorCode :: Text + , emErrorMessage :: Text + } + | RecordPayloadEventMessage { emPayloadBytes :: ByteString } + deriving (Eq, Show) + +data MsgHeaderName = MessageType + | EventType + | ContentType + | ErrorCode + | ErrorMessage + deriving (Eq, Show) + +msgHeaderValueType :: Word8 +msgHeaderValueType = 7 + +type MessageHeader = (MsgHeaderName, Text) + +data Progress = Progress { pBytesScanned :: Int64 + , pBytesProcessed :: Int64 + , pBytesReturned :: Int64 + } + deriving (Eq, Show) + +type Stats = Progress + +-------------------------------------------------------------------------- +-- Select API Related Types End +-------------------------------------------------------------------------- + -- | Represents different kinds of payload that are used with S3 API -- requests. data Payload = PayloadBS ByteString @@ -530,8 +763,8 @@ data AdminReqInfo = AdminReqInfo { , ariQueryParams :: Query } -data S3ReqInfo = S3ReqInfo { - riMethod :: Method +data S3ReqInfo = S3ReqInfo + { riMethod :: Method , riBucket :: Maybe Bucket , riObject :: Maybe Object , riQueryParams :: Query diff --git a/src/Network/Minio/Errors.hs b/src/Network/Minio/Errors.hs index 079a851..30d9719 100644 --- a/src/Network/Minio/Errors.hs +++ b/src/Network/Minio/Errors.hs @@ -51,6 +51,7 @@ data ServiceErr = BucketAlreadyExists | NoSuchBucket | InvalidBucketName | NoSuchKey + | SelectErr Text Text | ServiceErr Text Text deriving (Show, Eq) diff --git a/src/Network/Minio/SelectAPI.hs b/src/Network/Minio/SelectAPI.hs new file mode 100644 index 0000000..56ab27a --- /dev/null +++ b/src/Network/Minio/SelectAPI.hs @@ -0,0 +1,302 @@ +-- +-- Minio Haskell SDK, (C) 2017-2019 Minio, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +module Network.Minio.SelectAPI + ( + + -- | The `selectObjectContent` allows querying CSV, JSON or Parquet + -- format objects in AWS S3 and in Minio using SQL Select + -- statements. This allows significant reduction of data transfer + -- from object storage for computation-intensive tasks, as relevant + -- data is filtered close to the storage. + + selectObjectContent + + , SelectRequest + , selectRequest + + -- *** Input Serialization + ------------------------- + + , InputSerialization + , defaultCsvInput + , linesJsonInput + , documentJsonInput + , defaultParquetInput + , setInputCSVProps + + , CompressionType(..) + , setInputCompressionType + + -- *** CSV Format details + ------------------------ + -- | CSV format options such as delimiters and quote characters are + -- specified using using the functions below. Options are combined + -- monoidally. + + , CSVProp + , recordDelimiter + , fieldDelimiter + , quoteCharacter + , quoteEscapeCharacter + , commentCharacter + , allowQuotedRecordDelimiter + , FileHeaderInfo(..) + , fileHeaderInfo + , QuoteFields(..) + , quoteFields + + -- *** Output Serialization + ------------------------- + + , OutputSerialization + , defaultCsvOutput + , defaultJsonOutput + , outputCSVFromProps + , outputJSONFromRecordDelimiter + + -- *** Progress messages + ------------------------ + + , setRequestProgressEnabled + + -- *** Interpreting Select output + -------------------------------------------- + -- | The conduit returned by `selectObjectContent` returns values of + -- the `EventMessage` data type. This returns the query output + -- messages formatted according to the chosen output serialization, + -- interleaved with progress messages (if enabled by + -- `setRequestProgressEnabled`), and at the end a statistics + -- message. + -- + -- If the application is interested in only the payload, then + -- `getPayloadBytes` can be used. For example to simply print the + -- payload to stdout: + -- + -- > resultConduit <- selectObjectContent bucket object mySelectRequest + -- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC + -- + -- Note that runConduit, the connect operator (.|) and stdoutC are + -- all from the "conduit" package. + + , getPayloadBytes + , EventMessage(..) + , Progress(..) + , Stats + ) where + +import Conduit ((.|)) +import qualified Conduit as C +import qualified Data.Binary as Bin +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as LB +import Data.Digest.CRC32 (crc32, crc32Update) +import qualified Network.HTTP.Conduit as NC +import qualified Network.HTTP.Types as HT +import UnliftIO (MonadUnliftIO) + +import Lib.Prelude + +import Network.Minio.API +import Network.Minio.Data +import Network.Minio.Errors +import Network.Minio.Utils +import Network.Minio.XmlGenerator +import Network.Minio.XmlParser + +data EventStreamException = ESEPreludeCRCFailed + | ESEMessageCRCFailed + | ESEUnexpectedEndOfStream + | ESEDecodeFail [Char] + | ESEInvalidHeaderType + | ESEInvalidHeaderValueType + | ESEInvalidMessageType + deriving (Eq, Show) + +instance Exception EventStreamException + +-- chunkSize in bytes is 32KiB +chunkSize :: Int +chunkSize = 32 * 1024 + +parseBinary :: Bin.Binary a => ByteString -> IO a +parseBinary b = do + case Bin.decodeOrFail $ LB.fromStrict b of + Left (_, _, msg) -> throwIO $ ESEDecodeFail msg + Right (_, _, r) -> return r + +bytesToHeaderName :: Text -> IO MsgHeaderName +bytesToHeaderName t = case t of + ":message-type" -> return MessageType + ":event-type" -> return EventType + ":content-type" -> return ContentType + ":error-code" -> return ErrorCode + ":error-message" -> return ErrorMessage + _ -> throwIO ESEInvalidHeaderType + +parseHeaders :: MonadUnliftIO m + => Word32 -> C.ConduitM ByteString a m [MessageHeader] +parseHeaders 0 = return [] +parseHeaders hdrLen = do + bs1 <- readNBytes 1 + n :: Word8 <- liftIO $ parseBinary bs1 + + headerKeyBytes <- readNBytes $ fromIntegral n + let headerKey = decodeUtf8Lenient headerKeyBytes + headerName <- liftIO $ bytesToHeaderName headerKey + + bs2 <- readNBytes 1 + headerValueType :: Word8 <- liftIO $ parseBinary bs2 + when (headerValueType /= 7) $ throwIO ESEInvalidHeaderValueType + + bs3 <- readNBytes 2 + vLen :: Word16 <- liftIO $ parseBinary bs3 + headerValueBytes <- readNBytes $ fromIntegral vLen + let headerValue = decodeUtf8Lenient headerValueBytes + m = (headerName, headerValue) + k = 1 + fromIntegral n + 1 + 2 + fromIntegral vLen + + ms <- parseHeaders (hdrLen - k) + return (m:ms) + +-- readNBytes returns N bytes read from the string and throws an +-- exception if N bytes are not present on the stream. +readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString +readNBytes n = do + b <- LB.toStrict <$> (C.takeCE n .| C.sinkLazy) + if B.length b /= n + then throwIO ESEUnexpectedEndOfStream + else return b + +crcCheck :: MonadUnliftIO m + => C.ConduitM ByteString ByteString m () +crcCheck = do + b <- readNBytes 12 + n :: Word32 <- liftIO $ parseBinary $ B.take 4 b + preludeCRC :: Word32 <- liftIO $ parseBinary $ B.drop 8 b + when (crc32 (B.take 8 b) /= preludeCRC) $ + throwIO ESEPreludeCRCFailed + + -- we do not yield the checksum + C.yield $ B.take 8 b + + -- 12 bytes have been read off the current message. Now read the + -- next (n-12)-4 bytes and accumulate the checksum, and yield it. + let startCrc = crc32 b + finalCrc <- accumulateYield (fromIntegral n-16) startCrc + + bs <- readNBytes 4 + expectedCrc :: Word32 <- liftIO $ parseBinary bs + + when (finalCrc /= expectedCrc) $ + throwIO ESEMessageCRCFailed + + -- we unconditionally recurse - downstream figures out when to + -- quit reading the stream + crcCheck + where + accumulateYield n checkSum = do + let toRead = min n chunkSize + b <- readNBytes toRead + let c' = crc32Update checkSum b + n' = n - B.length b + C.yield b + if n' > 0 + then accumulateYield n' c' + else return c' + +handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m () +handleMessage = do + b1 <- readNBytes 4 + msgLen :: Word32 <- liftIO $ parseBinary b1 + + b2 <- readNBytes 4 + hdrLen :: Word32 <- liftIO $ parseBinary b2 + + hs <- parseHeaders hdrLen + + let payloadLen = msgLen - hdrLen - 16 + getHdrVal h = fmap snd . headMay . filter ((h ==) . fst) + eventHdrValue = getHdrVal EventType hs + msgHdrValue = getHdrVal MessageType hs + errCode = getHdrVal ErrorCode hs + errMsg = getHdrVal ErrorMessage hs + + case msgHdrValue of + Just "event" -> do + case eventHdrValue of + Just "Records" -> passThrough $ fromIntegral payloadLen + Just "Cont" -> return () + Just "Progress" -> do + bs <- readNBytes $ fromIntegral payloadLen + progress <- parseSelectProgress bs + C.yield $ ProgressEventMessage progress + Just "Stats" -> do + bs <- readNBytes $ fromIntegral payloadLen + stats <- parseSelectProgress bs + C.yield $ StatsEventMessage stats + Just "End" -> return () + _ -> throwIO ESEInvalidMessageType + when (eventHdrValue /= Just "End") handleMessage + + Just "error" -> do + let reqMsgMay = RequestLevelErrorMessage <$> errCode <*> errMsg + maybe (throwIO ESEInvalidMessageType) C.yield reqMsgMay + + _ -> throwIO ESEInvalidMessageType + + where + passThrough 0 = return () + passThrough n = do + let c = min n chunkSize + b <- readNBytes c + C.yield $ RecordPayloadEventMessage b + passThrough $ n - B.length b + + +selectProtoConduit :: MonadUnliftIO m + => C.ConduitT ByteString EventMessage m () +selectProtoConduit = crcCheck .| handleMessage + +-- | selectObjectContent calls the SelectRequest on the given +-- object. It returns a Conduit of event messages that can be consumed +-- by the client. +selectObjectContent :: Bucket -> Object -> SelectRequest + -> Minio (C.ConduitT () EventMessage Minio ()) +selectObjectContent b o r = do + let reqInfo = defaultS3ReqInfo { riMethod = HT.methodPost + , riBucket = Just b + , riObject = Just o + , riPayload = PayloadBS $ mkSelectRequest r + , riNeedsLocation = False + , riQueryParams = [("select", Nothing), ("select-type", Just "2")] + } + --print $ mkSelectRequest r + resp <- mkStreamRequest reqInfo + return $ NC.responseBody resp .| selectProtoConduit + +-- | A helper conduit that returns only the record payload bytes. +getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m () +getPayloadBytes = do + evM <- C.await + case evM of + Just v -> do + case v of + RecordPayloadEventMessage b -> C.yield b + RequestLevelErrorMessage c m -> liftIO $ throwIO $ SelectErr c m + _ -> return () + getPayloadBytes + Nothing -> return () diff --git a/src/Network/Minio/XmlGenerator.hs b/src/Network/Minio/XmlGenerator.hs index 602fb5e..890ef82 100644 --- a/src/Network/Minio/XmlGenerator.hs +++ b/src/Network/Minio/XmlGenerator.hs @@ -18,11 +18,12 @@ module Network.Minio.XmlGenerator ( mkCreateBucketConfig , mkCompleteMultipartUploadRequest , mkPutNotificationRequest + , mkSelectRequest ) where import qualified Data.ByteString.Lazy as LBS -import qualified Data.Map as M +import qualified Data.HashMap.Strict as H import qualified Data.Text as T import Text.XML @@ -35,7 +36,7 @@ import Network.Minio.Data mkCreateBucketConfig :: Text -> Region -> ByteString mkCreateBucketConfig ns location = LBS.toStrict $ renderLBS def bucketConfig where - s3Element n = Element (s3Name ns n) M.empty + s3Element n = Element (s3Name ns n) mempty root = s3Element "CreateBucketConfiguration" [ NodeElement $ s3Element "LocationConstraint" [ NodeContent location] @@ -47,12 +48,12 @@ mkCompleteMultipartUploadRequest :: [PartTuple] -> ByteString mkCompleteMultipartUploadRequest partInfo = LBS.toStrict $ renderLBS def cmur where - root = Element "CompleteMultipartUpload" M.empty $ + root = Element "CompleteMultipartUpload" mempty $ map (NodeElement . mkPart) partInfo - mkPart (n, etag) = Element "Part" M.empty - [ NodeElement $ Element "PartNumber" M.empty + mkPart (n, etag) = Element "Part" mempty + [ NodeElement $ Element "PartNumber" mempty [NodeContent $ T.pack $ show n] - , NodeElement $ Element "ETag" M.empty + , NodeElement $ Element "ETag" mempty [NodeContent etag] ] cmur = Document (Prologue [] Nothing []) root [] @@ -67,9 +68,9 @@ toXML ns node = LBS.toStrict $ renderLBS def $ Document (Prologue [] Nothing []) (xmlNode node) [] where xmlNode :: XNode -> Element - xmlNode (XNode name nodes) = Element (s3Name ns name) M.empty $ + xmlNode (XNode name nodes) = Element (s3Name ns name) mempty $ map (NodeElement . xmlNode) nodes - xmlNode (XLeaf name content) = Element (s3Name ns name) M.empty + xmlNode (XLeaf name content) = Element (s3Name ns name) mempty [NodeContent content] class ToXNode a where @@ -100,3 +101,65 @@ getFRXNode (FilterRule n v) = XNode "FilterRule" [ XLeaf "Name" n mkPutNotificationRequest :: Text -> Notification -> ByteString mkPutNotificationRequest ns = toXML ns . toXNode + +mkSelectRequest :: SelectRequest -> ByteString +mkSelectRequest r = LBS.toStrict $ renderLBS def sr + where + sr = Document (Prologue [] Nothing []) root [] + root = Element "SelectRequest" mempty $ + [ NodeElement (Element "Expression" mempty + [NodeContent $ srExpression r]) + , NodeElement (Element "ExpressionType" mempty + [NodeContent $ show $ srExpressionType r]) + , NodeElement (Element "InputSerialization" mempty $ + inputSerializationNodes $ srInputSerialization r) + , NodeElement (Element "OutputSerialization" mempty $ + outputSerializationNodes $ srOutputSerialization r) + ] ++ maybe [] reqProgElem (srRequestProgressEnabled r) + reqProgElem enabled = [NodeElement + (Element "RequestProgress" mempty + [NodeElement + (Element "Enabled" mempty + [NodeContent + (if enabled then "TRUE" else "FALSE")] + ) + ] + ) + ] + inputSerializationNodes is = comprTypeNode (isCompressionType is) ++ + [NodeElement $ formatNode (isFormatInfo is)] + comprTypeNode (Just c) = [NodeElement $ Element "CompressionType" mempty + [NodeContent $ + if | c == CompressionTypeNone -> "NONE" + | c == CompressionTypeGzip -> "GZIP" + | c == CompressionTypeBzip2 -> "BZIP2" + ] + ] + comprTypeNode Nothing = [] + + kvElement (k, v) = Element (Name k Nothing Nothing) mempty [NodeContent v] + formatNode (InputFormatCSV (CSVProp h)) = + Element "CSV" mempty + (map NodeElement $ map kvElement $ H.toList h) + formatNode (InputFormatJSON p) = + Element "JSON" mempty + [NodeElement + (Element "Type" mempty + [NodeContent $ + if | jsonipType p == JSONTypeDocument -> "DOCUMENT" + | jsonipType p == JSONTypeLines -> "LINES" + ] + ) + ] + formatNode InputFormatParquet = Element "Parquet" mempty [] + + outputSerializationNodes (OutputSerializationJSON j) = + [NodeElement (Element "JSON" mempty $ + rdElem $ jsonopRecordDelimiter j)] + outputSerializationNodes (OutputSerializationCSV (CSVProp h)) = + [NodeElement $ Element "CSV" mempty + (map NodeElement $ map kvElement $ H.toList h)] + + rdElem Nothing = [] + rdElem (Just t) = [NodeElement $ Element "RecordDelimiter" mempty + [NodeContent t]] diff --git a/src/Network/Minio/XmlParser.hs b/src/Network/Minio/XmlParser.hs index bf1c036..f6409ba 100644 --- a/src/Network/Minio/XmlParser.hs +++ b/src/Network/Minio/XmlParser.hs @@ -26,8 +26,10 @@ module Network.Minio.XmlParser , parseListPartsResponse , parseErrResponse , parseNotification + , parseSelectProgress ) where +import qualified Data.ByteString.Lazy as LB import Data.List (zip3, zip4, zip5) import qualified Data.Map as Map import qualified Data.Text as T @@ -261,3 +263,13 @@ parseNotification xmldata = do s3Elem ns "FilterRule" &| getFilterRule ns return $ NotificationConfig id arn events (Filter $ FilterKey $ FilterRules rules) + +parseSelectProgress :: MonadIO m => ByteString -> m Progress +parseSelectProgress xmldata = do + r <- parseRoot $ LB.fromStrict xmldata + let bScanned = T.concat $ r $/ element "BytesScanned" &/ content + bProcessed = T.concat $ r $/element "BytesProcessed" &/ content + bReturned = T.concat $ r $/element "BytesReturned" &/ content + Progress <$> parseDecimal bScanned + <*> parseDecimal bProcessed + <*> parseDecimal bReturned diff --git a/stack.yaml b/stack.yaml index 7a77534..c0b754a 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: lts-11.1 +resolver: lts-13.1 # User packages to be built. # Various formats can be used as shown in the example below. diff --git a/test/Network/Minio/XmlGenerator/Test.hs b/test/Network/Minio/XmlGenerator/Test.hs index 24b482c..0c818c5 100644 --- a/test/Network/Minio/XmlGenerator/Test.hs +++ b/test/Network/Minio/XmlGenerator/Test.hs @@ -14,12 +14,14 @@ -- limitations under the License. -- +{-# LANGUAGE QuasiQuotes #-} module Network.Minio.XmlGenerator.Test ( xmlGeneratorTests ) where import Test.Tasty import Test.Tasty.HUnit +import Text.RawString.QQ (r) import Lib.Prelude @@ -33,6 +35,7 @@ xmlGeneratorTests = testGroup "XML Generator Tests" [ testCase "Test mkCreateBucketConfig" testMkCreateBucketConfig , testCase "Test mkCompleteMultipartUploadRequest" testMkCompleteMultipartUploadRequest , testCase "Test mkPutNotificationRequest" testMkPutNotificationRequest + , testCase "Test mkSelectRequest" testMkSelectRequest ] testMkCreateBucketConfig :: Assertion @@ -95,3 +98,46 @@ testMkPutNotificationRequest = [ObjectCreated] defaultFilter ] ] + +testMkSelectRequest :: Assertion +testMkSelectRequest = mapM_ assertFn cases + where + assertFn (a, b) = assertEqual "selectRequest XML should match: " b $ mkSelectRequest a + cases = [ ( SelectRequest "Select * from S3Object" SQL + (InputSerialization (Just CompressionTypeGzip) + (InputFormatCSV $ fileHeaderInfo FileHeaderIgnore + <> recordDelimiter "\n" + <> fieldDelimiter "," + <> quoteCharacter "\"" + <> quoteEscapeCharacter "\"" + )) + (OutputSerializationCSV $ quoteFields QuoteFieldsAsNeeded + <> recordDelimiter "\n" + <> fieldDelimiter "," + <> quoteCharacter "\"" + <> quoteEscapeCharacter "\"" + ) + (Just False) + , [r|Select * from S3ObjectSQLGZIP" +IGNORE","ASNEEDED +",FALSE|] + ) + , ( setRequestProgressEnabled False $ + setInputCompressionType CompressionTypeGzip $ + selectRequest "Select * from S3Object" documentJsonInput + (outputJSONFromRecordDelimiter "\n") + , [r|Select * from S3ObjectSQLGZIPDOCUMENT +FALSE|] + ) + , ( setRequestProgressEnabled False $ + setInputCompressionType CompressionTypeNone $ + selectRequest "Select * from S3Object" defaultParquetInput + (outputCSVFromProps $ quoteFields QuoteFieldsAsNeeded + <> recordDelimiter "\n" + <> fieldDelimiter "," + <> quoteCharacter "\"" + <> quoteEscapeCharacter "\"") + , [r|Select * from S3ObjectSQLNONE"ASNEEDED +",FALSE|] + ) + ] diff --git a/test/Network/Minio/XmlParser/Test.hs b/test/Network/Minio/XmlParser/Test.hs index 0f7496e..7bf5da0 100644 --- a/test/Network/Minio/XmlParser/Test.hs +++ b/test/Network/Minio/XmlParser/Test.hs @@ -14,15 +14,16 @@ -- limitations under the License. -- +{-# LANGUAGE QuasiQuotes #-} module Network.Minio.XmlParser.Test - ( - xmlParserTests + ( xmlParserTests ) where import qualified Data.Map as Map import Data.Time (fromGregorian) import Test.Tasty import Test.Tasty.HUnit +import Text.RawString.QQ (r) import UnliftIO (MonadUnliftIO) import Lib.Prelude @@ -43,6 +44,7 @@ xmlParserTests = testGroup "XML Parser Tests" , testCase "Test parseListPartsResponse" testParseListPartsResponse , testCase "Test parseCopyObjectResponse" testParseCopyObjectResponse , testCase "Test parseNotification" testParseNotification + , testCase "Test parseSelectProgress" testParseSelectProgress ] tryValidationErr :: (MonadUnliftIO m) => m a -> m (Either MErrV a) @@ -356,3 +358,24 @@ testParseNotification = do forM_ cases $ \(xmldata, val) -> do result <- runExceptT $ runTestNS $ parseNotification xmldata eitherValidationErr result (@?= val) + +-- | Tests parsing of both progress and stats +testParseSelectProgress :: Assertion +testParseSelectProgress = do + let cases = [ ([r| + + 512 + 1024 + 1024 +|] , Progress 512 1024 1024) + , ([r| + + 512 + 1024 + 1024 +|], Progress 512 1024 1024) + ] + + forM_ cases $ \(xmldata, progress) -> do + result <- runExceptT $ parseSelectProgress xmldata + eitherValidationErr result (@?= progress)