diff --git a/docs/API.md b/docs/API.md
index ce9affe..dd3b8cd 100644
--- a/docs/API.md
+++ b/docs/API.md
@@ -28,7 +28,7 @@ awsCI { connectAccesskey = "your-access-key"
|[`listObjects`](#listObjects)|[`fPutObject`](#fPutObject)||
|[`listObjectsV1`](#listObjectsV1)|[`copyObject`](#copyObject)||
|[`listIncompleteUploads`](#listIncompleteUploads)|[`removeObject`](#removeObject)||
-|[`bucketExists`](#bucketExists)|||
+|[`bucketExists`](#bucketExists)|[`selectObjectContent`](#selectObjectContent)||
## 1. Connecting and running operations on the storage service
@@ -743,6 +743,59 @@ main = do
Right _ -> putStrLn "Removed incomplete upload successfully"
```
+
+### selectObjectContent :: Bucket -> Object -> SelectRequest -> Minio (ConduitT () EventMessage Minio ())
+Removes an ongoing multipart upload of an object from the service
+
+__Parameters__
+
+In the expression `selectObjectContent bucketName objectName selReq`
+the parameters are:
+
+|Param |Type |Description |
+|:---|:---| :---|
+| `bucketName` | _Bucket_ (alias for `Text`) | Name of the bucket |
+| `objectName` | _Object_ (alias for `Text`) | Name of the object |
+| `selReq` | _SelectRequest_ | Select request parameters |
+
+__SelectRequest record__
+
+This record is created using `selectRequest`. Please refer to the Haddocks for further information.
+
+__Return Value__
+
+The return value can be used to read individual `EventMessage`s in the response. Please refer to the Haddocks for further information.
+
+|Return type | Description |
+|:---|:---|
+| _Minio (C.conduitT () EventMessage Minio ())_ | A Conduit source of `EventMessage` values. |
+
+__Example__
+
+```haskell
+{-# Language OverloadedStrings #-}
+import Network.Minio
+
+import qualified Conduit as C
+
+main :: IO ()
+main = do
+ let
+ bucket = "mybucket"
+ object = "myobject"
+
+ res <- runMinio minioPlayCI $ do
+ let sr = selectRequest "Select * from s3object"
+ defaultCsvInput defaultCsvOutput
+ res <- selectObjectContent bucket object sr
+ C.runConduit $ res C..| getPayloadBytes C..| C.stdoutC
+
+ case res of
+ Left _ -> putStrLn "Failed!"
+ Right _ -> putStrLn "Success!"
+```
+
+
### bucketExists :: Bucket -> Minio Bool
Checks if a bucket exists.
diff --git a/examples/SelectObject.hs b/examples/SelectObject.hs
new file mode 100755
index 0000000..6243cd1
--- /dev/null
+++ b/examples/SelectObject.hs
@@ -0,0 +1,50 @@
+#!/usr/bin/env stack
+-- stack --resolver lts-13.1 runghc --package minio-hs
+
+--
+-- Minio Haskell SDK, (C) 2019 Minio, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+{-# LANGUAGE OverloadedStrings #-}
+import Network.Minio
+
+import qualified Conduit as C
+import Control.Monad (when)
+import qualified Data.ByteString.Lazy as LB
+
+import Prelude
+
+main :: IO ()
+main = do
+ let bucket = "selectbucket"
+ object = "1.csv"
+ content = "Name,Place,Temperature\n"
+ <> "James,San Jose,76\n"
+ <> "Alicia,San Leandro,88\n"
+ <> "Mark,San Carlos,90\n"
+
+ res <- runMinio minioPlayCI $ do
+
+ exists <- bucketExists bucket
+ when (not exists) $
+ makeBucket bucket Nothing
+
+ C.liftIO $ putStrLn "Uploading csv object"
+ putObject bucket object (C.sourceLazy content) Nothing defaultPutObjectOptions
+
+ let sr = selectRequest "Select * from s3object" defaultCsvInput defaultCsvOutput
+ res <- selectObjectContent bucket object sr
+ C.runConduit $ res C..| getPayloadBytes C..| C.stdoutC
+ print res
diff --git a/minio-hs.cabal b/minio-hs.cabal
index e65ce14..5eb1018 100644
--- a/minio-hs.cabal
+++ b/minio-hs.cabal
@@ -41,6 +41,7 @@ library
, Network.Minio.ListOps
, Network.Minio.PresignedOperations
, Network.Minio.PutObject
+ , Network.Minio.SelectAPI
, Network.Minio.Sign.V4
, Network.Minio.Utils
, Network.Minio.XmlGenerator
@@ -50,6 +51,7 @@ library
, protolude >= 0.2 && < 0.3
, aeson >= 1.2
, base64-bytestring >= 1.0
+ , binary >= 0.8.5.0
, bytestring >= 0.10
, case-insensitive >= 1.2
, conduit >= 1.3
@@ -57,6 +59,7 @@ library
, containers >= 0.5
, cryptonite >= 0.25
, cryptonite-conduit >= 0.2
+ , digest >= 0.0.1
, directory
, filepath >= 1.4
, http-client >= 0.5
@@ -64,12 +67,14 @@ library
, http-types >= 0.12
, ini
, memory >= 0.14
+ , raw-strings-qq >= 1
, resourcet >= 1.2
, text >= 1.2
, time >= 1.8
, transformers >= 0.5
, unliftio >= 0.2
, unliftio-core >= 0.1
+ , unordered-containers >= 0.2
, xml-conduit >= 1.8
default-language: Haskell2010
default-extensions: BangPatterns
@@ -120,6 +125,7 @@ test-suite minio-hs-live-server-test
, Network.Minio.PresignedOperations
, Network.Minio.PutObject
, Network.Minio.S3API
+ , Network.Minio.SelectAPI
, Network.Minio.Sign.V4
, Network.Minio.TestHelpers
, Network.Minio.Utils
@@ -131,11 +137,12 @@ test-suite minio-hs-live-server-test
, Network.Minio.XmlParser.Test
, Network.Minio.JsonParser
, Network.Minio.JsonParser.Test
- build-depends: base
+ build-depends: base >= 4.7 && < 5
, minio-hs
, protolude >= 0.1.6
, aeson
, base64-bytestring
+ , binary
, bytestring
, case-insensitive
, conduit
@@ -143,6 +150,7 @@ test-suite minio-hs-live-server-test
, containers
, cryptonite
, cryptonite-conduit
+ , digest
, directory
, filepath
, http-client
@@ -151,6 +159,7 @@ test-suite minio-hs-live-server-test
, ini
, memory
, QuickCheck
+ , raw-strings-qq >= 1
, resourcet
, tasty
, tasty-hunit
@@ -162,6 +171,7 @@ test-suite minio-hs-live-server-test
, transformers
, unliftio
, unliftio-core
+ , unordered-containers
, xml-conduit
if !flag(live-test)
buildable: False
@@ -170,11 +180,12 @@ test-suite minio-hs-test
type: exitcode-stdio-1.0
hs-source-dirs: test, src
main-is: Spec.hs
- build-depends: base
+ build-depends: base >= 4.7 && < 5
, minio-hs
, protolude >= 0.1.6
, aeson
, base64-bytestring
+ , binary
, bytestring
, case-insensitive
, conduit
@@ -183,6 +194,7 @@ test-suite minio-hs-test
, cryptonite
, cryptonite-conduit
, filepath
+ , digest
, directory
, http-client
, http-conduit
@@ -190,6 +202,7 @@ test-suite minio-hs-test
, ini
, memory
, QuickCheck
+ , raw-strings-qq >= 1
, resourcet
, tasty
, tasty-hunit
@@ -201,6 +214,7 @@ test-suite minio-hs-test
, transformers
, unliftio
, unliftio-core
+ , unordered-containers
, xml-conduit
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010
@@ -230,6 +244,7 @@ test-suite minio-hs-test
, Network.Minio.PresignedOperations
, Network.Minio.PutObject
, Network.Minio.S3API
+ , Network.Minio.SelectAPI
, Network.Minio.Sign.V4
, Network.Minio.TestHelpers
, Network.Minio.Utils
diff --git a/src/Network/Minio.hs b/src/Network/Minio.hs
index 0cc6632..209a7db 100644
--- a/src/Network/Minio.hs
+++ b/src/Network/Minio.hs
@@ -158,6 +158,9 @@ module Network.Minio
, removeObject
, removeIncompleteUpload
+ -- ** Select Object Content with SQL
+ , module Network.Minio.SelectAPI
+
-- * Presigned Operations
-------------------------
, UrlExpiry
@@ -207,6 +210,7 @@ import Network.Minio.Errors
import Network.Minio.ListOps
import Network.Minio.PutObject
import Network.Minio.S3API
+import Network.Minio.SelectAPI
import Network.Minio.Utils
-- | Lists buckets.
diff --git a/src/Network/Minio/Data.hs b/src/Network/Minio/Data.hs
index 349adbf..8e1e825 100644
--- a/src/Network/Minio/Data.hs
+++ b/src/Network/Minio/Data.hs
@@ -14,6 +14,7 @@
-- limitations under the License.
--
+{-# LANGUAGE CPP #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Minio.Data where
@@ -25,6 +26,7 @@ import Control.Monad.IO.Unlift (MonadUnliftIO, UnliftIO (..),
import Control.Monad.Trans.Resource
import qualified Data.ByteString as B
import Data.CaseInsensitive (mk)
+import qualified Data.HashMap.Strict as H
import qualified Data.Ini as Ini
import qualified Data.Map as Map
import Data.String (IsString (..))
@@ -511,6 +513,237 @@ data Notification = Notification
defaultNotification :: Notification
defaultNotification = Notification [] [] []
+
+--------------------------------------------------------------------------
+-- Select API Related Types
+--------------------------------------------------------------------------
+
+-- | SelectRequest represents the Select API call. Use the
+-- `selectRequest` function to create a value of this type.
+data SelectRequest = SelectRequest
+ { srExpression :: Text
+ , srExpressionType :: ExpressionType
+ , srInputSerialization :: InputSerialization
+ , srOutputSerialization :: OutputSerialization
+ , srRequestProgressEnabled :: Maybe Bool
+ } deriving (Eq, Show)
+
+data ExpressionType = SQL
+ deriving (Eq, Show)
+
+-- | InputSerialization represents format information of the input
+-- object being queried. Use one of the smart constructors such as
+-- `defaultCsvInput` as a starting value, and add compression info
+-- using `setInputCompressionType`
+data InputSerialization = InputSerialization
+ { isCompressionType :: Maybe CompressionType
+ , isFormatInfo :: InputFormatInfo
+ } deriving (Eq, Show)
+
+data CompressionType = CompressionTypeNone
+ | CompressionTypeGzip
+ | CompressionTypeBzip2
+ deriving (Eq, Show)
+
+data InputFormatInfo = InputFormatCSV CSVInputProp
+ | InputFormatJSON JSONInputProp
+ | InputFormatParquet
+ deriving (Eq, Show)
+
+-- | defaultCsvInput returns InputSerialization with default CSV
+-- format, and without any compression setting.
+defaultCsvInput :: InputSerialization
+defaultCsvInput = InputSerialization Nothing (InputFormatCSV defaultCSVProp)
+
+-- | linesJsonInput returns InputSerialization with JSON line based
+-- format with no compression setting.
+linesJsonInput :: InputSerialization
+linesJsonInput = InputSerialization Nothing
+ (InputFormatJSON $ JSONInputProp JSONTypeLines)
+
+-- | documentJsonInput returns InputSerialization with JSON document
+-- based format with no compression setting.
+documentJsonInput :: InputSerialization
+documentJsonInput = InputSerialization Nothing
+ (InputFormatJSON $ JSONInputProp JSONTypeDocument)
+
+-- | defaultParquetInput returns InputSerialization with Parquet
+-- format, and no compression setting.
+defaultParquetInput :: InputSerialization
+defaultParquetInput = InputSerialization Nothing InputFormatParquet
+
+-- | setInputCompressionType sets the compression type for the input
+-- of the SelectRequest
+setInputCompressionType :: CompressionType -> SelectRequest
+ -> SelectRequest
+setInputCompressionType c i =
+ let is = srInputSerialization i
+ is' = is { isCompressionType = Just c }
+ in i { srInputSerialization = is' }
+
+-- | defaultCsvOutput returns OutputSerialization with default CSV
+-- format.
+defaultCsvOutput :: OutputSerialization
+defaultCsvOutput = OutputSerializationCSV defaultCSVProp
+
+-- | defaultJsonInput returns OutputSerialization with default JSON
+-- format.
+defaultJsonOutput :: OutputSerialization
+defaultJsonOutput = OutputSerializationJSON (JSONOutputProp Nothing)
+
+-- | selectRequest is used to build a `SelectRequest`
+-- value. @selectRequest query inputSer outputSer@ represents a
+-- SelectRequest with the SQL query text given by @query@, the input
+-- serialization settings (compression format and format information)
+-- @inputSer@ and the output serialization settings @outputSer@.
+selectRequest :: Text -> InputSerialization -> OutputSerialization
+ -> SelectRequest
+selectRequest sqlQuery inputSer outputSer =
+ SelectRequest { srExpression = sqlQuery
+ , srExpressionType = SQL
+ , srInputSerialization = inputSer
+ , srOutputSerialization = outputSer
+ , srRequestProgressEnabled = Nothing
+ }
+
+-- | setRequestProgressEnabled sets the flag for turning on progress
+-- messages when the Select response is being streamed back to the
+-- client.
+setRequestProgressEnabled :: Bool -> SelectRequest -> SelectRequest
+setRequestProgressEnabled enabled sr =
+ sr { srRequestProgressEnabled = Just enabled }
+
+type CSVInputProp = CSVProp
+
+-- | CSVProp represents CSV format properties. It is built up using
+-- the Monoid instance.
+data CSVProp = CSVProp (H.HashMap Text Text)
+ deriving (Eq, Show)
+
+#if (__GLASGOW_HASKELL__ >= 804)
+instance Semigroup CSVProp where
+ (CSVProp a) <> (CSVProp b) = CSVProp (b <> a)
+#endif
+
+instance Monoid CSVProp where
+ mempty = CSVProp mempty
+#if (__GLASGOW_HASKELL__ < 804)
+ mappend (CSVProp a) (CSVProp b) = CSVProp (b <> a)
+#endif
+
+defaultCSVProp :: CSVProp
+defaultCSVProp = mempty
+
+recordDelimiter :: Text -> CSVProp
+recordDelimiter = CSVProp . H.singleton "RecordDelimiter"
+
+fieldDelimiter :: Text -> CSVProp
+fieldDelimiter = CSVProp . H.singleton "FieldDelimiter"
+
+quoteCharacter :: Text -> CSVProp
+quoteCharacter = CSVProp . H.singleton "QuoteCharacter"
+
+quoteEscapeCharacter :: Text -> CSVProp
+quoteEscapeCharacter = CSVProp . H.singleton "QuoteEscapeCharacter"
+
+-- | FileHeaderInfo specifies information about column headers for CSV
+-- format.
+data FileHeaderInfo
+ = FileHeaderNone -- ^ No column headers are present
+ | FileHeaderUse -- ^ Headers are present and they should be used
+ | FileHeaderIgnore -- ^ Header are present, but should be ignored
+ deriving (Eq, Show)
+
+fileHeaderInfo :: FileHeaderInfo -> CSVProp
+fileHeaderInfo = CSVProp . H.singleton "FileHeaderInfo" . toString
+ where
+ toString FileHeaderNone = "NONE"
+ toString FileHeaderUse = "USE"
+ toString FileHeaderIgnore = "IGNORE"
+
+commentCharacter :: Text -> CSVProp
+commentCharacter = CSVProp . H.singleton "Comments"
+
+allowQuotedRecordDelimiter :: CSVProp
+allowQuotedRecordDelimiter = CSVProp $ H.singleton "AllowQuotedRecordDelimiter" "TRUE"
+
+-- | Set the CSV format properties in the InputSerialization.
+setInputCSVProps :: CSVProp -> InputSerialization -> InputSerialization
+setInputCSVProps p is = is { isFormatInfo = InputFormatCSV p }
+
+-- | Set the CSV format properties in the OutputSerialization.
+outputCSVFromProps :: CSVProp -> OutputSerialization
+outputCSVFromProps p = OutputSerializationCSV p
+
+data JSONInputProp = JSONInputProp { jsonipType :: JSONType }
+ deriving (Eq, Show)
+
+data JSONType = JSONTypeDocument | JSONTypeLines
+ deriving (Eq, Show)
+
+
+-- | OutputSerialization represents output serialization settings for
+-- the SelectRequest. Use `defaultCsvOutput` or `defaultJsonOutput` as
+-- a starting point.
+data OutputSerialization = OutputSerializationJSON JSONOutputProp
+ | OutputSerializationCSV CSVOutputProp
+ deriving (Eq, Show)
+
+type CSVOutputProp = CSVProp
+
+-- | quoteFields is an output serialization parameter
+quoteFields :: QuoteFields -> CSVProp
+quoteFields q = CSVProp $ H.singleton "QuoteFields" $
+ case q of
+ QuoteFieldsAsNeeded -> "ASNEEDED"
+ QuoteFieldsAlways -> "ALWAYS"
+
+data QuoteFields = QuoteFieldsAsNeeded | QuoteFieldsAlways
+ deriving (Eq, Show)
+
+data JSONOutputProp = JSONOutputProp { jsonopRecordDelimiter :: Maybe Text }
+ deriving (Eq, Show)
+
+-- | Set the output record delimiter for JSON format
+outputJSONFromRecordDelimiter :: Text -> OutputSerialization
+outputJSONFromRecordDelimiter t =
+ OutputSerializationJSON (JSONOutputProp $ Just t)
+
+-- Response related types
+
+-- | An EventMessage represents each kind of message received from the server.
+data EventMessage = ProgressEventMessage { emProgress :: Progress }
+ | StatsEventMessage { emStats :: Stats }
+ | RequestLevelErrorMessage { emErrorCode :: Text
+ , emErrorMessage :: Text
+ }
+ | RecordPayloadEventMessage { emPayloadBytes :: ByteString }
+ deriving (Eq, Show)
+
+data MsgHeaderName = MessageType
+ | EventType
+ | ContentType
+ | ErrorCode
+ | ErrorMessage
+ deriving (Eq, Show)
+
+msgHeaderValueType :: Word8
+msgHeaderValueType = 7
+
+type MessageHeader = (MsgHeaderName, Text)
+
+data Progress = Progress { pBytesScanned :: Int64
+ , pBytesProcessed :: Int64
+ , pBytesReturned :: Int64
+ }
+ deriving (Eq, Show)
+
+type Stats = Progress
+
+--------------------------------------------------------------------------
+-- Select API Related Types End
+--------------------------------------------------------------------------
+
-- | Represents different kinds of payload that are used with S3 API
-- requests.
data Payload = PayloadBS ByteString
@@ -530,8 +763,8 @@ data AdminReqInfo = AdminReqInfo {
, ariQueryParams :: Query
}
-data S3ReqInfo = S3ReqInfo {
- riMethod :: Method
+data S3ReqInfo = S3ReqInfo
+ { riMethod :: Method
, riBucket :: Maybe Bucket
, riObject :: Maybe Object
, riQueryParams :: Query
diff --git a/src/Network/Minio/Errors.hs b/src/Network/Minio/Errors.hs
index 079a851..30d9719 100644
--- a/src/Network/Minio/Errors.hs
+++ b/src/Network/Minio/Errors.hs
@@ -51,6 +51,7 @@ data ServiceErr = BucketAlreadyExists
| NoSuchBucket
| InvalidBucketName
| NoSuchKey
+ | SelectErr Text Text
| ServiceErr Text Text
deriving (Show, Eq)
diff --git a/src/Network/Minio/SelectAPI.hs b/src/Network/Minio/SelectAPI.hs
new file mode 100644
index 0000000..56ab27a
--- /dev/null
+++ b/src/Network/Minio/SelectAPI.hs
@@ -0,0 +1,302 @@
+--
+-- Minio Haskell SDK, (C) 2017-2019 Minio, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+module Network.Minio.SelectAPI
+ (
+
+ -- | The `selectObjectContent` allows querying CSV, JSON or Parquet
+ -- format objects in AWS S3 and in Minio using SQL Select
+ -- statements. This allows significant reduction of data transfer
+ -- from object storage for computation-intensive tasks, as relevant
+ -- data is filtered close to the storage.
+
+ selectObjectContent
+
+ , SelectRequest
+ , selectRequest
+
+ -- *** Input Serialization
+ -------------------------
+
+ , InputSerialization
+ , defaultCsvInput
+ , linesJsonInput
+ , documentJsonInput
+ , defaultParquetInput
+ , setInputCSVProps
+
+ , CompressionType(..)
+ , setInputCompressionType
+
+ -- *** CSV Format details
+ ------------------------
+ -- | CSV format options such as delimiters and quote characters are
+ -- specified using using the functions below. Options are combined
+ -- monoidally.
+
+ , CSVProp
+ , recordDelimiter
+ , fieldDelimiter
+ , quoteCharacter
+ , quoteEscapeCharacter
+ , commentCharacter
+ , allowQuotedRecordDelimiter
+ , FileHeaderInfo(..)
+ , fileHeaderInfo
+ , QuoteFields(..)
+ , quoteFields
+
+ -- *** Output Serialization
+ -------------------------
+
+ , OutputSerialization
+ , defaultCsvOutput
+ , defaultJsonOutput
+ , outputCSVFromProps
+ , outputJSONFromRecordDelimiter
+
+ -- *** Progress messages
+ ------------------------
+
+ , setRequestProgressEnabled
+
+ -- *** Interpreting Select output
+ --------------------------------------------
+ -- | The conduit returned by `selectObjectContent` returns values of
+ -- the `EventMessage` data type. This returns the query output
+ -- messages formatted according to the chosen output serialization,
+ -- interleaved with progress messages (if enabled by
+ -- `setRequestProgressEnabled`), and at the end a statistics
+ -- message.
+ --
+ -- If the application is interested in only the payload, then
+ -- `getPayloadBytes` can be used. For example to simply print the
+ -- payload to stdout:
+ --
+ -- > resultConduit <- selectObjectContent bucket object mySelectRequest
+ -- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC
+ --
+ -- Note that runConduit, the connect operator (.|) and stdoutC are
+ -- all from the "conduit" package.
+
+ , getPayloadBytes
+ , EventMessage(..)
+ , Progress(..)
+ , Stats
+ ) where
+
+import Conduit ((.|))
+import qualified Conduit as C
+import qualified Data.Binary as Bin
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Lazy as LB
+import Data.Digest.CRC32 (crc32, crc32Update)
+import qualified Network.HTTP.Conduit as NC
+import qualified Network.HTTP.Types as HT
+import UnliftIO (MonadUnliftIO)
+
+import Lib.Prelude
+
+import Network.Minio.API
+import Network.Minio.Data
+import Network.Minio.Errors
+import Network.Minio.Utils
+import Network.Minio.XmlGenerator
+import Network.Minio.XmlParser
+
+data EventStreamException = ESEPreludeCRCFailed
+ | ESEMessageCRCFailed
+ | ESEUnexpectedEndOfStream
+ | ESEDecodeFail [Char]
+ | ESEInvalidHeaderType
+ | ESEInvalidHeaderValueType
+ | ESEInvalidMessageType
+ deriving (Eq, Show)
+
+instance Exception EventStreamException
+
+-- chunkSize in bytes is 32KiB
+chunkSize :: Int
+chunkSize = 32 * 1024
+
+parseBinary :: Bin.Binary a => ByteString -> IO a
+parseBinary b = do
+ case Bin.decodeOrFail $ LB.fromStrict b of
+ Left (_, _, msg) -> throwIO $ ESEDecodeFail msg
+ Right (_, _, r) -> return r
+
+bytesToHeaderName :: Text -> IO MsgHeaderName
+bytesToHeaderName t = case t of
+ ":message-type" -> return MessageType
+ ":event-type" -> return EventType
+ ":content-type" -> return ContentType
+ ":error-code" -> return ErrorCode
+ ":error-message" -> return ErrorMessage
+ _ -> throwIO ESEInvalidHeaderType
+
+parseHeaders :: MonadUnliftIO m
+ => Word32 -> C.ConduitM ByteString a m [MessageHeader]
+parseHeaders 0 = return []
+parseHeaders hdrLen = do
+ bs1 <- readNBytes 1
+ n :: Word8 <- liftIO $ parseBinary bs1
+
+ headerKeyBytes <- readNBytes $ fromIntegral n
+ let headerKey = decodeUtf8Lenient headerKeyBytes
+ headerName <- liftIO $ bytesToHeaderName headerKey
+
+ bs2 <- readNBytes 1
+ headerValueType :: Word8 <- liftIO $ parseBinary bs2
+ when (headerValueType /= 7) $ throwIO ESEInvalidHeaderValueType
+
+ bs3 <- readNBytes 2
+ vLen :: Word16 <- liftIO $ parseBinary bs3
+ headerValueBytes <- readNBytes $ fromIntegral vLen
+ let headerValue = decodeUtf8Lenient headerValueBytes
+ m = (headerName, headerValue)
+ k = 1 + fromIntegral n + 1 + 2 + fromIntegral vLen
+
+ ms <- parseHeaders (hdrLen - k)
+ return (m:ms)
+
+-- readNBytes returns N bytes read from the string and throws an
+-- exception if N bytes are not present on the stream.
+readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString
+readNBytes n = do
+ b <- LB.toStrict <$> (C.takeCE n .| C.sinkLazy)
+ if B.length b /= n
+ then throwIO ESEUnexpectedEndOfStream
+ else return b
+
+crcCheck :: MonadUnliftIO m
+ => C.ConduitM ByteString ByteString m ()
+crcCheck = do
+ b <- readNBytes 12
+ n :: Word32 <- liftIO $ parseBinary $ B.take 4 b
+ preludeCRC :: Word32 <- liftIO $ parseBinary $ B.drop 8 b
+ when (crc32 (B.take 8 b) /= preludeCRC) $
+ throwIO ESEPreludeCRCFailed
+
+ -- we do not yield the checksum
+ C.yield $ B.take 8 b
+
+ -- 12 bytes have been read off the current message. Now read the
+ -- next (n-12)-4 bytes and accumulate the checksum, and yield it.
+ let startCrc = crc32 b
+ finalCrc <- accumulateYield (fromIntegral n-16) startCrc
+
+ bs <- readNBytes 4
+ expectedCrc :: Word32 <- liftIO $ parseBinary bs
+
+ when (finalCrc /= expectedCrc) $
+ throwIO ESEMessageCRCFailed
+
+ -- we unconditionally recurse - downstream figures out when to
+ -- quit reading the stream
+ crcCheck
+ where
+ accumulateYield n checkSum = do
+ let toRead = min n chunkSize
+ b <- readNBytes toRead
+ let c' = crc32Update checkSum b
+ n' = n - B.length b
+ C.yield b
+ if n' > 0
+ then accumulateYield n' c'
+ else return c'
+
+handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m ()
+handleMessage = do
+ b1 <- readNBytes 4
+ msgLen :: Word32 <- liftIO $ parseBinary b1
+
+ b2 <- readNBytes 4
+ hdrLen :: Word32 <- liftIO $ parseBinary b2
+
+ hs <- parseHeaders hdrLen
+
+ let payloadLen = msgLen - hdrLen - 16
+ getHdrVal h = fmap snd . headMay . filter ((h ==) . fst)
+ eventHdrValue = getHdrVal EventType hs
+ msgHdrValue = getHdrVal MessageType hs
+ errCode = getHdrVal ErrorCode hs
+ errMsg = getHdrVal ErrorMessage hs
+
+ case msgHdrValue of
+ Just "event" -> do
+ case eventHdrValue of
+ Just "Records" -> passThrough $ fromIntegral payloadLen
+ Just "Cont" -> return ()
+ Just "Progress" -> do
+ bs <- readNBytes $ fromIntegral payloadLen
+ progress <- parseSelectProgress bs
+ C.yield $ ProgressEventMessage progress
+ Just "Stats" -> do
+ bs <- readNBytes $ fromIntegral payloadLen
+ stats <- parseSelectProgress bs
+ C.yield $ StatsEventMessage stats
+ Just "End" -> return ()
+ _ -> throwIO ESEInvalidMessageType
+ when (eventHdrValue /= Just "End") handleMessage
+
+ Just "error" -> do
+ let reqMsgMay = RequestLevelErrorMessage <$> errCode <*> errMsg
+ maybe (throwIO ESEInvalidMessageType) C.yield reqMsgMay
+
+ _ -> throwIO ESEInvalidMessageType
+
+ where
+ passThrough 0 = return ()
+ passThrough n = do
+ let c = min n chunkSize
+ b <- readNBytes c
+ C.yield $ RecordPayloadEventMessage b
+ passThrough $ n - B.length b
+
+
+selectProtoConduit :: MonadUnliftIO m
+ => C.ConduitT ByteString EventMessage m ()
+selectProtoConduit = crcCheck .| handleMessage
+
+-- | selectObjectContent calls the SelectRequest on the given
+-- object. It returns a Conduit of event messages that can be consumed
+-- by the client.
+selectObjectContent :: Bucket -> Object -> SelectRequest
+ -> Minio (C.ConduitT () EventMessage Minio ())
+selectObjectContent b o r = do
+ let reqInfo = defaultS3ReqInfo { riMethod = HT.methodPost
+ , riBucket = Just b
+ , riObject = Just o
+ , riPayload = PayloadBS $ mkSelectRequest r
+ , riNeedsLocation = False
+ , riQueryParams = [("select", Nothing), ("select-type", Just "2")]
+ }
+ --print $ mkSelectRequest r
+ resp <- mkStreamRequest reqInfo
+ return $ NC.responseBody resp .| selectProtoConduit
+
+-- | A helper conduit that returns only the record payload bytes.
+getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m ()
+getPayloadBytes = do
+ evM <- C.await
+ case evM of
+ Just v -> do
+ case v of
+ RecordPayloadEventMessage b -> C.yield b
+ RequestLevelErrorMessage c m -> liftIO $ throwIO $ SelectErr c m
+ _ -> return ()
+ getPayloadBytes
+ Nothing -> return ()
diff --git a/src/Network/Minio/XmlGenerator.hs b/src/Network/Minio/XmlGenerator.hs
index 602fb5e..890ef82 100644
--- a/src/Network/Minio/XmlGenerator.hs
+++ b/src/Network/Minio/XmlGenerator.hs
@@ -18,11 +18,12 @@ module Network.Minio.XmlGenerator
( mkCreateBucketConfig
, mkCompleteMultipartUploadRequest
, mkPutNotificationRequest
+ , mkSelectRequest
) where
import qualified Data.ByteString.Lazy as LBS
-import qualified Data.Map as M
+import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
import Text.XML
@@ -35,7 +36,7 @@ import Network.Minio.Data
mkCreateBucketConfig :: Text -> Region -> ByteString
mkCreateBucketConfig ns location = LBS.toStrict $ renderLBS def bucketConfig
where
- s3Element n = Element (s3Name ns n) M.empty
+ s3Element n = Element (s3Name ns n) mempty
root = s3Element "CreateBucketConfiguration"
[ NodeElement $ s3Element "LocationConstraint"
[ NodeContent location]
@@ -47,12 +48,12 @@ mkCompleteMultipartUploadRequest :: [PartTuple] -> ByteString
mkCompleteMultipartUploadRequest partInfo =
LBS.toStrict $ renderLBS def cmur
where
- root = Element "CompleteMultipartUpload" M.empty $
+ root = Element "CompleteMultipartUpload" mempty $
map (NodeElement . mkPart) partInfo
- mkPart (n, etag) = Element "Part" M.empty
- [ NodeElement $ Element "PartNumber" M.empty
+ mkPart (n, etag) = Element "Part" mempty
+ [ NodeElement $ Element "PartNumber" mempty
[NodeContent $ T.pack $ show n]
- , NodeElement $ Element "ETag" M.empty
+ , NodeElement $ Element "ETag" mempty
[NodeContent etag]
]
cmur = Document (Prologue [] Nothing []) root []
@@ -67,9 +68,9 @@ toXML ns node = LBS.toStrict $ renderLBS def $
Document (Prologue [] Nothing []) (xmlNode node) []
where
xmlNode :: XNode -> Element
- xmlNode (XNode name nodes) = Element (s3Name ns name) M.empty $
+ xmlNode (XNode name nodes) = Element (s3Name ns name) mempty $
map (NodeElement . xmlNode) nodes
- xmlNode (XLeaf name content) = Element (s3Name ns name) M.empty
+ xmlNode (XLeaf name content) = Element (s3Name ns name) mempty
[NodeContent content]
class ToXNode a where
@@ -100,3 +101,65 @@ getFRXNode (FilterRule n v) = XNode "FilterRule" [ XLeaf "Name" n
mkPutNotificationRequest :: Text -> Notification -> ByteString
mkPutNotificationRequest ns = toXML ns . toXNode
+
+mkSelectRequest :: SelectRequest -> ByteString
+mkSelectRequest r = LBS.toStrict $ renderLBS def sr
+ where
+ sr = Document (Prologue [] Nothing []) root []
+ root = Element "SelectRequest" mempty $
+ [ NodeElement (Element "Expression" mempty
+ [NodeContent $ srExpression r])
+ , NodeElement (Element "ExpressionType" mempty
+ [NodeContent $ show $ srExpressionType r])
+ , NodeElement (Element "InputSerialization" mempty $
+ inputSerializationNodes $ srInputSerialization r)
+ , NodeElement (Element "OutputSerialization" mempty $
+ outputSerializationNodes $ srOutputSerialization r)
+ ] ++ maybe [] reqProgElem (srRequestProgressEnabled r)
+ reqProgElem enabled = [NodeElement
+ (Element "RequestProgress" mempty
+ [NodeElement
+ (Element "Enabled" mempty
+ [NodeContent
+ (if enabled then "TRUE" else "FALSE")]
+ )
+ ]
+ )
+ ]
+ inputSerializationNodes is = comprTypeNode (isCompressionType is) ++
+ [NodeElement $ formatNode (isFormatInfo is)]
+ comprTypeNode (Just c) = [NodeElement $ Element "CompressionType" mempty
+ [NodeContent $
+ if | c == CompressionTypeNone -> "NONE"
+ | c == CompressionTypeGzip -> "GZIP"
+ | c == CompressionTypeBzip2 -> "BZIP2"
+ ]
+ ]
+ comprTypeNode Nothing = []
+
+ kvElement (k, v) = Element (Name k Nothing Nothing) mempty [NodeContent v]
+ formatNode (InputFormatCSV (CSVProp h)) =
+ Element "CSV" mempty
+ (map NodeElement $ map kvElement $ H.toList h)
+ formatNode (InputFormatJSON p) =
+ Element "JSON" mempty
+ [NodeElement
+ (Element "Type" mempty
+ [NodeContent $
+ if | jsonipType p == JSONTypeDocument -> "DOCUMENT"
+ | jsonipType p == JSONTypeLines -> "LINES"
+ ]
+ )
+ ]
+ formatNode InputFormatParquet = Element "Parquet" mempty []
+
+ outputSerializationNodes (OutputSerializationJSON j) =
+ [NodeElement (Element "JSON" mempty $
+ rdElem $ jsonopRecordDelimiter j)]
+ outputSerializationNodes (OutputSerializationCSV (CSVProp h)) =
+ [NodeElement $ Element "CSV" mempty
+ (map NodeElement $ map kvElement $ H.toList h)]
+
+ rdElem Nothing = []
+ rdElem (Just t) = [NodeElement $ Element "RecordDelimiter" mempty
+ [NodeContent t]]
diff --git a/src/Network/Minio/XmlParser.hs b/src/Network/Minio/XmlParser.hs
index bf1c036..f6409ba 100644
--- a/src/Network/Minio/XmlParser.hs
+++ b/src/Network/Minio/XmlParser.hs
@@ -26,8 +26,10 @@ module Network.Minio.XmlParser
, parseListPartsResponse
, parseErrResponse
, parseNotification
+ , parseSelectProgress
) where
+import qualified Data.ByteString.Lazy as LB
import Data.List (zip3, zip4, zip5)
import qualified Data.Map as Map
import qualified Data.Text as T
@@ -261,3 +263,13 @@ parseNotification xmldata = do
s3Elem ns "FilterRule" &| getFilterRule ns
return $ NotificationConfig id arn events
(Filter $ FilterKey $ FilterRules rules)
+
+parseSelectProgress :: MonadIO m => ByteString -> m Progress
+parseSelectProgress xmldata = do
+ r <- parseRoot $ LB.fromStrict xmldata
+ let bScanned = T.concat $ r $/ element "BytesScanned" &/ content
+ bProcessed = T.concat $ r $/element "BytesProcessed" &/ content
+ bReturned = T.concat $ r $/element "BytesReturned" &/ content
+ Progress <$> parseDecimal bScanned
+ <*> parseDecimal bProcessed
+ <*> parseDecimal bReturned
diff --git a/stack.yaml b/stack.yaml
index 7a77534..c0b754a 100644
--- a/stack.yaml
+++ b/stack.yaml
@@ -15,7 +15,7 @@
# resolver:
# name: custom-snapshot
# location: "./custom-snapshot.yaml"
-resolver: lts-11.1
+resolver: lts-13.1
# User packages to be built.
# Various formats can be used as shown in the example below.
diff --git a/test/Network/Minio/XmlGenerator/Test.hs b/test/Network/Minio/XmlGenerator/Test.hs
index 24b482c..0c818c5 100644
--- a/test/Network/Minio/XmlGenerator/Test.hs
+++ b/test/Network/Minio/XmlGenerator/Test.hs
@@ -14,12 +14,14 @@
-- limitations under the License.
--
+{-# LANGUAGE QuasiQuotes #-}
module Network.Minio.XmlGenerator.Test
( xmlGeneratorTests
) where
import Test.Tasty
import Test.Tasty.HUnit
+import Text.RawString.QQ (r)
import Lib.Prelude
@@ -33,6 +35,7 @@ xmlGeneratorTests = testGroup "XML Generator Tests"
[ testCase "Test mkCreateBucketConfig" testMkCreateBucketConfig
, testCase "Test mkCompleteMultipartUploadRequest" testMkCompleteMultipartUploadRequest
, testCase "Test mkPutNotificationRequest" testMkPutNotificationRequest
+ , testCase "Test mkSelectRequest" testMkSelectRequest
]
testMkCreateBucketConfig :: Assertion
@@ -95,3 +98,46 @@ testMkPutNotificationRequest =
[ObjectCreated] defaultFilter
]
]
+
+testMkSelectRequest :: Assertion
+testMkSelectRequest = mapM_ assertFn cases
+ where
+ assertFn (a, b) = assertEqual "selectRequest XML should match: " b $ mkSelectRequest a
+ cases = [ ( SelectRequest "Select * from S3Object" SQL
+ (InputSerialization (Just CompressionTypeGzip)
+ (InputFormatCSV $ fileHeaderInfo FileHeaderIgnore
+ <> recordDelimiter "\n"
+ <> fieldDelimiter ","
+ <> quoteCharacter "\""
+ <> quoteEscapeCharacter "\""
+ ))
+ (OutputSerializationCSV $ quoteFields QuoteFieldsAsNeeded
+ <> recordDelimiter "\n"
+ <> fieldDelimiter ","
+ <> quoteCharacter "\""
+ <> quoteEscapeCharacter "\""
+ )
+ (Just False)
+ , [r|Select * from S3ObjectSQLGZIP"
+IGNORE","ASNEEDED
+",FALSE|]
+ )
+ , ( setRequestProgressEnabled False $
+ setInputCompressionType CompressionTypeGzip $
+ selectRequest "Select * from S3Object" documentJsonInput
+ (outputJSONFromRecordDelimiter "\n")
+ , [r|Select * from S3ObjectSQLGZIPDOCUMENT
+FALSE|]
+ )
+ , ( setRequestProgressEnabled False $
+ setInputCompressionType CompressionTypeNone $
+ selectRequest "Select * from S3Object" defaultParquetInput
+ (outputCSVFromProps $ quoteFields QuoteFieldsAsNeeded
+ <> recordDelimiter "\n"
+ <> fieldDelimiter ","
+ <> quoteCharacter "\""
+ <> quoteEscapeCharacter "\"")
+ , [r|Select * from S3ObjectSQLNONE"ASNEEDED
+",FALSE|]
+ )
+ ]
diff --git a/test/Network/Minio/XmlParser/Test.hs b/test/Network/Minio/XmlParser/Test.hs
index 0f7496e..7bf5da0 100644
--- a/test/Network/Minio/XmlParser/Test.hs
+++ b/test/Network/Minio/XmlParser/Test.hs
@@ -14,15 +14,16 @@
-- limitations under the License.
--
+{-# LANGUAGE QuasiQuotes #-}
module Network.Minio.XmlParser.Test
- (
- xmlParserTests
+ ( xmlParserTests
) where
import qualified Data.Map as Map
import Data.Time (fromGregorian)
import Test.Tasty
import Test.Tasty.HUnit
+import Text.RawString.QQ (r)
import UnliftIO (MonadUnliftIO)
import Lib.Prelude
@@ -43,6 +44,7 @@ xmlParserTests = testGroup "XML Parser Tests"
, testCase "Test parseListPartsResponse" testParseListPartsResponse
, testCase "Test parseCopyObjectResponse" testParseCopyObjectResponse
, testCase "Test parseNotification" testParseNotification
+ , testCase "Test parseSelectProgress" testParseSelectProgress
]
tryValidationErr :: (MonadUnliftIO m) => m a -> m (Either MErrV a)
@@ -356,3 +358,24 @@ testParseNotification = do
forM_ cases $ \(xmldata, val) -> do
result <- runExceptT $ runTestNS $ parseNotification xmldata
eitherValidationErr result (@?= val)
+
+-- | Tests parsing of both progress and stats
+testParseSelectProgress :: Assertion
+testParseSelectProgress = do
+ let cases = [ ([r|
+|] , Progress 512 1024 1024)
+ , ([r|
+
+ 512
+ 1024
+ 1024
+|], Progress 512 1024 1024)
+ ]
+
+ forM_ cases $ \(xmldata, progress) -> do
+ result <- runExceptT $ parseSelectProgress xmldata
+ eitherValidationErr result (@?= progress)