Add copyObject API: (#5)
* Performs server side copy of an object by creating a new object that is a copy of an existing object or part of an existing object. * The user may specify an object on server of any size, and optionally specify a byte range to only copy a part of the object. * Copy conditions are also supported.
This commit is contained in:
parent
d7b46aa115
commit
e4e2576c74
@ -26,6 +26,7 @@ library
|
||||
, Network.Minio.Data.ByteString
|
||||
, Network.Minio.Data.Crypto
|
||||
, Network.Minio.Data.Time
|
||||
, Network.Minio.Errors
|
||||
, Network.Minio.ListOps
|
||||
, Network.Minio.PutObject
|
||||
, Network.Minio.Sign.V4
|
||||
@ -55,6 +56,7 @@ library
|
||||
, monad-control
|
||||
, resourcet
|
||||
, text
|
||||
, text-format
|
||||
, time
|
||||
, transformers
|
||||
, transformers-base
|
||||
@ -98,6 +100,7 @@ test-suite minio-hs-live-server-test
|
||||
, Network.Minio.Data.ByteString
|
||||
, Network.Minio.Data.Crypto
|
||||
, Network.Minio.Data.Time
|
||||
, Network.Minio.Errors
|
||||
, Network.Minio.ListOps
|
||||
, Network.Minio.PutObject
|
||||
, Network.Minio.S3API
|
||||
@ -138,6 +141,7 @@ test-suite minio-hs-live-server-test
|
||||
, tasty-smallcheck
|
||||
, temporary
|
||||
, text
|
||||
, text-format
|
||||
, time
|
||||
, transformers
|
||||
, transformers-base
|
||||
@ -180,6 +184,7 @@ test-suite minio-hs-test
|
||||
, tasty-smallcheck
|
||||
, temporary
|
||||
, text
|
||||
, text-format
|
||||
, time
|
||||
, transformers
|
||||
, transformers-base
|
||||
@ -204,6 +209,7 @@ test-suite minio-hs-test
|
||||
, Network.Minio.Data.ByteString
|
||||
, Network.Minio.Data.Crypto
|
||||
, Network.Minio.Data.Time
|
||||
, Network.Minio.Errors
|
||||
, Network.Minio.ListOps
|
||||
, Network.Minio.PutObject
|
||||
, Network.Minio.S3API
|
||||
|
||||
@ -6,11 +6,28 @@ https://github.com/sdiehl/protolude/blob/master/Symbols.md
|
||||
-}
|
||||
module Lib.Prelude
|
||||
( module Exports
|
||||
, both
|
||||
|
||||
, format
|
||||
) where
|
||||
|
||||
import Protolude as Exports
|
||||
import Protolude as Exports
|
||||
|
||||
import Data.Time as Exports (UTCTime)
|
||||
import Control.Monad.Trans.Maybe as Exports (runMaybeT, MaybeT(..))
|
||||
import Data.Time as Exports (UTCTime(..), diffUTCTime)
|
||||
import Control.Monad.Trans.Maybe as Exports (runMaybeT, MaybeT(..))
|
||||
|
||||
import Control.Monad.Catch as Exports (throwM, MonadThrow, MonadCatch)
|
||||
import Control.Monad.Catch as Exports (throwM, MonadThrow, MonadCatch)
|
||||
|
||||
import Data.Text.Format as Exports (Shown(..))
|
||||
import qualified Data.Text.Format as TF
|
||||
import Data.Text.Format.Params (Params)
|
||||
import qualified Data.Text.Lazy as LT
|
||||
|
||||
format :: Params ps => TF.Format -> ps -> Text
|
||||
format f args = LT.toStrict $ TF.format f args
|
||||
|
||||
-- import Data.Tuple as Exports (uncurry)
|
||||
|
||||
-- | Apply a function on both elements of a pair
|
||||
both :: (a -> b) -> (a, a) -> (b, b)
|
||||
both f (a, b) = (f a, f b)
|
||||
|
||||
@ -44,6 +44,7 @@ module Network.Minio
|
||||
, fGetObject
|
||||
, fPutObject
|
||||
, putObject
|
||||
, copyObject
|
||||
|
||||
, getObject
|
||||
, statObject
|
||||
@ -61,6 +62,7 @@ import qualified Data.Conduit.Binary as CB
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.ListOps
|
||||
import Network.Minio.PutObject
|
||||
import Network.Minio.S3API
|
||||
@ -79,14 +81,20 @@ fPutObject bucket object f = void $ putObjectInternal bucket object $
|
||||
ODFile f Nothing
|
||||
|
||||
-- | Put an object from a conduit source. The size can be provided if
|
||||
-- known; this helps the library select optimal part sizes to
|
||||
-- performing a multipart upload. If not specified, it is assumed that
|
||||
-- the object can be potentially 5TiB and selects multipart sizes
|
||||
-- appropriately.
|
||||
-- known; this helps the library select optimal part sizes to perform
|
||||
-- a multipart upload. If not specified, it is assumed that the object
|
||||
-- can be potentially 5TiB and selects multipart sizes appropriately.
|
||||
putObject :: Bucket -> Object -> C.Producer Minio ByteString
|
||||
-> Maybe Int64 -> Minio ()
|
||||
putObject bucket object src sizeMay = void $ putObjectInternal bucket object $
|
||||
ODStream src sizeMay
|
||||
-> Maybe Int64 -> Minio ()
|
||||
putObject bucket object src sizeMay =
|
||||
void $ putObjectInternal bucket object $ ODStream src sizeMay
|
||||
|
||||
-- | Perform a server-side copy operation to create an object with the
|
||||
-- given bucket and object name from the source specification in
|
||||
-- CopyPartSource. This function performs a multipart copy operation
|
||||
-- if the new object is to be greater than 5GiB in size.
|
||||
copyObject :: Bucket -> Object -> CopyPartSource -> Minio ()
|
||||
copyObject bucket object cps = void $ copyObjectInternal bucket object cps
|
||||
|
||||
-- | Get an object from the object store as a resumable source (conduit).
|
||||
getObject :: Bucket -> Object -> Minio (C.ResumableSource Minio ByteString)
|
||||
|
||||
@ -7,7 +7,8 @@ import Control.Monad.Trans.Control
|
||||
import Control.Monad.Trans.Resource
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Default (Default(..))
|
||||
import Network.HTTP.Client (defaultManagerSettings, HttpException)
|
||||
import qualified Data.Text as T
|
||||
import Network.HTTP.Client (defaultManagerSettings)
|
||||
import qualified Network.HTTP.Conduit as NC
|
||||
import Network.HTTP.Types (Method, Header, Query)
|
||||
import qualified Network.HTTP.Types as HT
|
||||
@ -15,6 +16,9 @@ import Text.XML
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.Utils
|
||||
|
||||
-- | Connection Info data type. Use the Default instance to create
|
||||
-- connection info for your service.
|
||||
data ConnectInfo = ConnectInfo {
|
||||
@ -140,6 +144,47 @@ data ObjectInfo = ObjectInfo {
|
||||
, oiSize :: Int64
|
||||
} deriving (Show, Eq)
|
||||
|
||||
data CopyPartSource = CopyPartSource {
|
||||
cpSource :: Text -- | formatted like "/sourceBucket/sourceObject"
|
||||
, cpSourceRange :: Maybe (Int64, Int64) -- | (0, 9) means first ten
|
||||
-- bytes of the source
|
||||
-- object
|
||||
, cpSourceIfMatch :: Maybe Text
|
||||
, cpSourceIfNoneMatch :: Maybe Text
|
||||
, cpSourceIfUnmodifiedSince :: Maybe UTCTime
|
||||
, cpSourceIfModifiedSince :: Maybe UTCTime
|
||||
} deriving (Show, Eq)
|
||||
|
||||
instance Default CopyPartSource where
|
||||
def = CopyPartSource "" def def def def def
|
||||
|
||||
cpsToHeaders :: CopyPartSource -> [HT.Header]
|
||||
cpsToHeaders cps = ("x-amz-copy-source", encodeUtf8 $ cpSource cps) :
|
||||
(rangeHdr ++ (zip names values))
|
||||
where
|
||||
names = ["x-amz-copy-source-if-match", "x-amz-copy-source-if-none-match",
|
||||
"x-amz-copy-source-if-unmodified-since",
|
||||
"x-amz-copy-source-if-modified-since"]
|
||||
values = concatMap (maybeToList . fmap encodeUtf8 . (cps &))
|
||||
[cpSourceIfMatch, cpSourceIfNoneMatch,
|
||||
fmap formatRFC1123 . cpSourceIfUnmodifiedSince,
|
||||
fmap formatRFC1123 . cpSourceIfModifiedSince]
|
||||
rangeHdr = ("x-amz-copy-source-range",)
|
||||
. HT.renderByteRanges
|
||||
. (:[])
|
||||
. uncurry HT.ByteRangeFromTo
|
||||
<$> (map (both fromIntegral) $
|
||||
maybeToList $ cpSourceRange cps)
|
||||
|
||||
-- | Extract the source bucket and source object name. TODO: validate
|
||||
-- the bucket and object name extracted.
|
||||
cpsToObject :: CopyPartSource -> Maybe (Bucket, Object)
|
||||
cpsToObject cps = do
|
||||
[_, bucket, object] <- Just splits
|
||||
return (bucket, object)
|
||||
where
|
||||
splits = T.splitOn "/" $ cpSource cps
|
||||
|
||||
-- | Represents different kinds of payload that are used with S3 API
|
||||
-- requests.
|
||||
data Payload = PayloadBS ByteString
|
||||
@ -222,29 +267,3 @@ runMinio ci m = do
|
||||
|
||||
s3Name :: Text -> Name
|
||||
s3Name s = Name s (Just "http://s3.amazonaws.com/doc/2006-03-01/") Nothing
|
||||
|
||||
---------------------------------
|
||||
-- Errors
|
||||
---------------------------------
|
||||
-- | Various validation errors
|
||||
data MErrV = MErrVSinglePUTSizeExceeded Int64
|
||||
| MErrVPutSizeExceeded Int64
|
||||
| MErrVETagHeaderNotFound
|
||||
| MErrVInvalidObjectInfoResponse
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | Errors thrown by the library
|
||||
data MinioErr = ME MError
|
||||
| MEHttp HttpException
|
||||
| MEFile IOException
|
||||
deriving (Show)
|
||||
|
||||
instance Exception MinioErr
|
||||
|
||||
-- | Library internal errors
|
||||
data MError = XMLParseError Text
|
||||
| ResponseError (NC.Response LByteString)
|
||||
| ValidationError MErrV
|
||||
deriving (Show, Eq)
|
||||
|
||||
instance Exception MError
|
||||
|
||||
36
src/Network/Minio/Errors.hs
Normal file
36
src/Network/Minio/Errors.hs
Normal file
@ -0,0 +1,36 @@
|
||||
module Network.Minio.Errors where
|
||||
|
||||
import Control.Exception
|
||||
import qualified Network.HTTP.Conduit as NC
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
|
||||
---------------------------------
|
||||
-- Errors
|
||||
---------------------------------
|
||||
-- | Various validation errors
|
||||
data MErrV = MErrVSinglePUTSizeExceeded Int64
|
||||
| MErrVPutSizeExceeded Int64
|
||||
| MErrVETagHeaderNotFound
|
||||
| MErrVInvalidObjectInfoResponse
|
||||
| MErrVInvalidSrcObjSpec Text
|
||||
| MErrVInvalidSrcObjByteRange (Int64, Int64)
|
||||
| MErrVCopyObjSingleNoRangeAccepted
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | Errors thrown by the library
|
||||
data MinioErr = ME MError
|
||||
| MEHttp NC.HttpException
|
||||
| MEFile IOException
|
||||
deriving (Show)
|
||||
|
||||
instance Exception MinioErr
|
||||
|
||||
-- | Library internal errors
|
||||
data MError = XMLParseError Text
|
||||
| ResponseError (NC.Response LByteString)
|
||||
| ValidationError MErrV
|
||||
deriving (Show, Eq)
|
||||
|
||||
instance Exception MError
|
||||
@ -3,6 +3,9 @@ module Network.Minio.PutObject
|
||||
putObjectInternal
|
||||
, ObjectData(..)
|
||||
, selectPartSizes
|
||||
, copyObjectInternal
|
||||
, selectCopyRanges
|
||||
, minPartSize
|
||||
) where
|
||||
|
||||
|
||||
@ -19,6 +22,7 @@ import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Data.Crypto
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.ListOps
|
||||
import Network.Minio.S3API
|
||||
import Network.Minio.Utils
|
||||
@ -28,6 +32,14 @@ import Network.Minio.Utils
|
||||
maxObjectSize :: Int64
|
||||
maxObjectSize = 5 * 1024 * 1024 * oneMiB
|
||||
|
||||
-- | minimum size of parts used in multipart operations.
|
||||
minPartSize :: Int64
|
||||
minPartSize = 64 * oneMiB
|
||||
|
||||
-- | max part of an object size is 5GiB
|
||||
maxObjectPartSize :: Int64
|
||||
maxObjectPartSize = 5 * 1024 * oneMiB
|
||||
|
||||
oneMiB :: Int64
|
||||
oneMiB = 1024 * 1024
|
||||
|
||||
@ -44,8 +56,9 @@ maxMultipartParts = 10000
|
||||
-- For streams also, a size may be provided. This is useful to limit
|
||||
-- the input - if it is not provided, upload will continue until the
|
||||
-- stream ends or the object reaches `maxObjectsize` size.
|
||||
data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional size.
|
||||
| ODStream (C.Producer m ByteString) (Maybe Int64) -- ^ Pass size in bytes as maybe if known.
|
||||
data ObjectData m =
|
||||
ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional size.
|
||||
| ODStream (C.Producer m ByteString) (Maybe Int64) -- ^ Pass size in bytes as maybe if known.
|
||||
|
||||
-- | Put an object from ObjectData. This high-level API handles
|
||||
-- objects of all sizes, and even if the object size is unknown.
|
||||
@ -77,18 +90,21 @@ putObjectInternal b o (ODFile fp sizeMay) = do
|
||||
CB.sourceFile fp
|
||||
|
||||
-- | Select part sizes - the logic is that the minimum part-size will
|
||||
-- be 64MiB. TODO: write quickcheck tests.
|
||||
-- be 64MiB.
|
||||
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
|
||||
selectPartSizes size = List.zip3 [1..] partOffsets partSizes
|
||||
selectPartSizes size = uncurry (List.zip3 [1..]) $
|
||||
List.unzip $ loop 0 size
|
||||
where
|
||||
ceil :: Double -> Int64
|
||||
ceil = ceiling
|
||||
partSize = max (64 * oneMiB) (ceil $ fromIntegral size /
|
||||
fromIntegral maxMultipartParts)
|
||||
(numParts, lastPartSize) = size `divMod` partSize
|
||||
lastPart = filter (> 0) [lastPartSize]
|
||||
partSizes = replicate (fromIntegral numParts) partSize ++ lastPart
|
||||
partOffsets = List.scanl' (+) 0 partSizes
|
||||
partSize = max minPartSize (ceil $ fromIntegral size /
|
||||
fromIntegral maxMultipartParts)
|
||||
|
||||
m = fromIntegral partSize
|
||||
loop st sz
|
||||
| st > sz = []
|
||||
| st + m >= sz = [(st, sz - st)]
|
||||
| otherwise = (st, m) : loop (st + m) sz
|
||||
|
||||
-- returns partinfo if part is already uploaded.
|
||||
checkUploadNeeded :: Payload -> PartNumber
|
||||
@ -178,3 +194,68 @@ getExistingUpload b o = do
|
||||
parts <- maybe (return [])
|
||||
(\uid -> listIncompleteParts b o uid C.$$ CC.sinkList) uidMay
|
||||
return (uidMay, Map.fromList $ map (\p -> (piNumber p, p)) parts)
|
||||
|
||||
-- | Copy an object using single or multipart copy strategy.
|
||||
copyObjectInternal :: Bucket -> Object -> CopyPartSource
|
||||
-> Minio ETag
|
||||
copyObjectInternal b' o cps = do
|
||||
-- validate and extract the src bucket and object
|
||||
(srcBucket, srcObject) <- maybe
|
||||
(throwM $ ValidationError $ MErrVInvalidSrcObjSpec $ cpSource cps)
|
||||
return $ cpsToObject cps
|
||||
|
||||
-- get source object size with a head request
|
||||
(ObjectInfo _ _ _ srcSize) <- headObject srcBucket srcObject
|
||||
|
||||
-- check that byte offsets are valid if specified in cps
|
||||
when (isJust (cpSourceRange cps) &&
|
||||
or [fst range < 0, snd range < fst range,
|
||||
snd range >= fromIntegral srcSize]) $
|
||||
throwM $ ValidationError $ MErrVInvalidSrcObjByteRange range
|
||||
|
||||
-- 1. If sz > 5gb use multipart copy
|
||||
-- 2. If startOffset /= 0 use multipart copy
|
||||
let destSize = (\(a, b) -> b - a + 1 ) $
|
||||
maybe (0, srcSize - 1) identity $ cpSourceRange cps
|
||||
startOffset = maybe 0 fst $ cpSourceRange cps
|
||||
endOffset = maybe (srcSize - 1) snd $ cpSourceRange cps
|
||||
|
||||
if destSize > maxObjectPartSize || (endOffset - startOffset + 1 /= srcSize)
|
||||
then multiPartCopyObject b' o cps srcSize
|
||||
else fst <$> copyObjectSingle b' o cps{cpSourceRange = Nothing} []
|
||||
|
||||
where
|
||||
range = maybe (0, 0) identity $ cpSourceRange cps
|
||||
|
||||
-- | Given the input byte range of the source object, compute the
|
||||
-- splits for a multipart copy object procedure. Minimum part size
|
||||
-- used is minPartSize.
|
||||
selectCopyRanges :: (Int64, Int64) -> [(PartNumber, (Int64, Int64))]
|
||||
selectCopyRanges (st, end) = zip pns $
|
||||
map (\(x, y) -> (st + x, st + x + y - 1)) $ zip startOffsets partSizes
|
||||
where
|
||||
size = end - st + 1
|
||||
(pns, startOffsets, partSizes) = List.unzip3 $ selectPartSizes size
|
||||
|
||||
-- | Perform a multipart copy object action. Since we cannot verify
|
||||
-- existing parts based on the source object, there is no resuming
|
||||
-- copy action support.
|
||||
multiPartCopyObject :: Bucket -> Object -> CopyPartSource -> Int64
|
||||
-> Minio ETag
|
||||
multiPartCopyObject b o cps srcSize = do
|
||||
uid <- newMultipartUpload b o []
|
||||
|
||||
let byteRange = maybe (0, fromIntegral $ srcSize - 1) identity $
|
||||
cpSourceRange cps
|
||||
partRanges = selectCopyRanges byteRange
|
||||
partSources = map (\(x, y) -> (x, cps {cpSourceRange = Just y}))
|
||||
partRanges
|
||||
|
||||
copiedParts <- limitedMapConcurrently 10
|
||||
(\(pn, cps') -> do
|
||||
(etag, _) <- copyObjectPart b o cps' uid pn []
|
||||
return $ PartInfo pn etag
|
||||
)
|
||||
partSources
|
||||
|
||||
completeMultipartUpload b o uid copiedParts
|
||||
|
||||
@ -22,6 +22,7 @@ module Network.Minio.S3API
|
||||
, putBucket
|
||||
, ETag
|
||||
, putObjectSingle
|
||||
, copyObjectSingle
|
||||
|
||||
-- * Multipart Upload APIs
|
||||
--------------------------
|
||||
@ -29,8 +30,10 @@ module Network.Minio.S3API
|
||||
, PartInfo
|
||||
, Payload(..)
|
||||
, PartNumber
|
||||
, CopyPartSource(..)
|
||||
, newMultipartUpload
|
||||
, putObjectPart
|
||||
, copyObjectPart
|
||||
, completeMultipartUpload
|
||||
, abortMultipartUpload
|
||||
, ListUploadsResult
|
||||
@ -52,11 +55,12 @@ import qualified Network.HTTP.Types as HT
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.API
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.Utils
|
||||
import Network.Minio.XmlParser
|
||||
import Network.Minio.XmlGenerator
|
||||
import Network.Minio.XmlParser
|
||||
|
||||
|
||||
-- | Fetch all buckets from the service.
|
||||
@ -193,6 +197,43 @@ putObjectPart bucket object uploadId partNumber headers payload = do
|
||||
, ("partNumber", Just $ show partNumber)
|
||||
]
|
||||
|
||||
-- | Performs server-side copy of an object or part of an object as an
|
||||
-- upload part of an ongoing multi-part upload.
|
||||
copyObjectPart :: Bucket -> Object -> CopyPartSource -> UploadId
|
||||
-> PartNumber -> [HT.Header] -> Minio (ETag, UTCTime)
|
||||
copyObjectPart bucket object cps uploadId partNumber headers = do
|
||||
resp <- executeRequest $
|
||||
def { riMethod = HT.methodPut
|
||||
, riBucket = Just bucket
|
||||
, riObject = Just object
|
||||
, riQueryParams = mkOptionalParams params
|
||||
, riHeaders = headers ++ cpsToHeaders cps
|
||||
}
|
||||
|
||||
parseCopyObjectResponse $ NC.responseBody resp
|
||||
where
|
||||
params = [
|
||||
("uploadId", Just uploadId)
|
||||
, ("partNumber", Just $ show partNumber)
|
||||
]
|
||||
|
||||
-- | Performs server-side copy of an object that is upto 5GiB in
|
||||
-- size. If the object is greater than 5GiB, this function throws the
|
||||
-- error returned by the server.
|
||||
copyObjectSingle :: Bucket -> Object -> CopyPartSource -> [HT.Header]
|
||||
-> Minio (ETag, UTCTime)
|
||||
copyObjectSingle bucket object cps headers = do
|
||||
-- validate that cpSourceRange is Nothing for this API.
|
||||
when (isJust $ cpSourceRange cps) $
|
||||
throwM $ ValidationError $ MErrVCopyObjSingleNoRangeAccepted
|
||||
resp <- executeRequest $
|
||||
def { riMethod = HT.methodPut
|
||||
, riBucket = Just bucket
|
||||
, riObject = Just object
|
||||
, riHeaders = headers ++ cpsToHeaders cps
|
||||
}
|
||||
parseCopyObjectResponse $ NC.responseBody resp
|
||||
|
||||
-- | Complete a multipart upload.
|
||||
completeMultipartUpload :: Bucket -> Object -> UploadId -> [PartInfo]
|
||||
-> Minio ETag
|
||||
@ -226,22 +267,22 @@ listIncompleteUploads' :: Bucket -> Maybe Text -> Maybe Text -> Maybe Text
|
||||
listIncompleteUploads' bucket prefix delimiter keyMarker uploadIdMarker = do
|
||||
resp <- executeRequest $ def { riMethod = HT.methodGet
|
||||
, riBucket = Just bucket
|
||||
, riQueryParams = ("uploads", Nothing): mkOptionalParams params
|
||||
, riQueryParams = params
|
||||
}
|
||||
parseListUploadsResponse $ NC.responseBody resp
|
||||
where
|
||||
-- build optional query params
|
||||
params = [
|
||||
("prefix", prefix)
|
||||
, ("delimiter", delimiter)
|
||||
, ("key-marker", keyMarker)
|
||||
, ("upload-id-marker", uploadIdMarker)
|
||||
]
|
||||
-- build query params
|
||||
params = ("uploads", Nothing) : mkOptionalParams
|
||||
[ ("prefix", prefix)
|
||||
, ("delimiter", delimiter)
|
||||
, ("key-marker", keyMarker)
|
||||
, ("upload-id-marker", uploadIdMarker)
|
||||
]
|
||||
|
||||
|
||||
-- | List parts of an ongoing multipart upload.
|
||||
listIncompleteParts' :: Bucket -> Object -> UploadId -> Maybe Text
|
||||
-> Maybe Text -> Minio ListPartsResult
|
||||
-> Maybe Text -> Minio ListPartsResult
|
||||
listIncompleteParts' bucket object uploadId maxParts partNumMarker = do
|
||||
resp <- executeRequest $ def { riMethod = HT.methodGet
|
||||
, riBucket = Just bucket
|
||||
|
||||
@ -22,12 +22,16 @@ import qualified System.IO as IO
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
|
||||
-- | Represent the time format string returned by S3 API calls.
|
||||
s3TimeFormat :: [Char]
|
||||
s3TimeFormat = iso8601DateFormat $ Just "%T%QZ"
|
||||
|
||||
-- | Format as per RFC 1123.
|
||||
formatRFC1123 :: UTCTime -> T.Text
|
||||
formatRFC1123 = T.pack . formatTime defaultTimeLocale "%a, %d %b %Y %X %Z"
|
||||
|
||||
allocateReadFile :: (R.MonadResource m, R.MonadResourceBase m)
|
||||
=> FilePath -> m (R.ReleaseKey, Handle)
|
||||
allocateReadFile fp = do
|
||||
|
||||
@ -3,6 +3,7 @@ module Network.Minio.XmlParser
|
||||
, parseLocation
|
||||
, parseNewMultipartUpload
|
||||
, parseCompleteMultipartUploadResponse
|
||||
, parseCopyObjectResponse
|
||||
, parseListObjectsResponse
|
||||
, parseListUploadsResponse
|
||||
, parseListPartsResponse
|
||||
@ -19,6 +20,7 @@ import Text.XML.Cursor
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.Utils (s3TimeFormat)
|
||||
|
||||
|
||||
@ -79,6 +81,16 @@ parseCompleteMultipartUploadResponse xmldata = do
|
||||
r <- parseRoot xmldata
|
||||
return $ T.concat $ r $// s3Elem "ETag" &/ content
|
||||
|
||||
-- | Parse the response XML of copyObject and copyObjectPart
|
||||
parseCopyObjectResponse :: (MonadThrow m) => LByteString -> m (ETag, UTCTime)
|
||||
parseCopyObjectResponse xmldata = do
|
||||
r <- parseRoot xmldata
|
||||
let
|
||||
mtimeStr = T.concat $ r $// s3Elem "LastModified" &/ content
|
||||
|
||||
mtime <- parseS3XMLTime mtimeStr
|
||||
return $ (T.concat $ r $// s3Elem "ETag" &/ content, mtime)
|
||||
|
||||
-- | Parse the response XML of a list objects call.
|
||||
parseListObjectsResponse :: (MonadThrow m)
|
||||
=> LByteString -> m ListObjectsResult
|
||||
|
||||
@ -290,4 +290,104 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
|
||||
step "delete object"
|
||||
deleteObject bucket object
|
||||
|
||||
, funTestWithBucket "copyObjectSingle basic tests" $ \step bucket -> do
|
||||
let object = "xxx"
|
||||
objCopy = "xxxCopy"
|
||||
size1 = 100 :: Int64
|
||||
|
||||
step "create server object to copy"
|
||||
inputFile <- mkRandFile size1
|
||||
fPutObject bucket object inputFile
|
||||
|
||||
step "copy object"
|
||||
let cps = def { cpSource = format "/{}/{}" [bucket, object] }
|
||||
(etag, modTime) <- copyObjectSingle bucket objCopy cps []
|
||||
|
||||
-- retrieve obj info to check
|
||||
ObjectInfo _ t e s <- headObject bucket objCopy
|
||||
|
||||
let isMTimeDiffOk = abs (diffUTCTime modTime t) < 1.0
|
||||
|
||||
liftIO $ (s == size1 && e == etag && isMTimeDiffOk) @?
|
||||
"Copied object did not match expected."
|
||||
|
||||
step "cleanup actions"
|
||||
deleteObject bucket object
|
||||
deleteObject bucket objCopy
|
||||
|
||||
, funTestWithBucket "copyObjectPart basic tests" $ \step bucket -> do
|
||||
let srcObj = "XXX"
|
||||
copyObj = "XXXCopy"
|
||||
|
||||
step "Prepare"
|
||||
let mb15 = 15 * 1024 * 1024
|
||||
mb5 = 5 * 1024 * 1024
|
||||
randFile <- mkRandFile mb15
|
||||
fPutObject bucket srcObj randFile
|
||||
|
||||
step "create new multipart upload"
|
||||
uid <- newMultipartUpload bucket copyObj []
|
||||
liftIO $ (T.length uid > 0) @? ("Got an empty multipartUpload Id.")
|
||||
|
||||
step "put object parts 1-3"
|
||||
let cps = def {cpSource = format "/{}/{}" [bucket, srcObj]}
|
||||
parts <- forM [1..3] $ \p -> do
|
||||
(etag, _) <- copyObjectPart bucket copyObj cps{
|
||||
cpSourceRange = Just ((p-1)*mb5, (p-1)*mb5 + (mb5 - 1))
|
||||
} uid (fromIntegral p) []
|
||||
return $ PartInfo (fromIntegral p) etag
|
||||
|
||||
step "complete multipart"
|
||||
void $ completeMultipartUpload bucket copyObj uid parts
|
||||
|
||||
step "verify copied object size"
|
||||
(ObjectInfo _ _ _ s) <- headObject bucket copyObj
|
||||
|
||||
liftIO $ (s == mb15) @? "Size failed to match"
|
||||
|
||||
step $ "Cleanup actions"
|
||||
deleteObject bucket srcObj
|
||||
deleteObject bucket copyObj
|
||||
|
||||
, funTestWithBucket "copyObject basic tests" $ \step bucket -> do
|
||||
let srcs = ["XXX", "XXXL"]
|
||||
copyObjs = ["XXXCopy", "XXXLCopy"]
|
||||
sizes = map (* (1024 * 1024)) [15, 65]
|
||||
|
||||
step "Prepare"
|
||||
forM_ (zip srcs sizes) $ \(src, size) ->
|
||||
fPutObject bucket src =<< mkRandFile size
|
||||
|
||||
step "make small and large object copy"
|
||||
forM_ (zip copyObjs srcs) $ \(cp, src) ->
|
||||
copyObject bucket cp def{cpSource = format "/{}/{}" [bucket, src]}
|
||||
|
||||
step "verify uploaded objects"
|
||||
uploadedSizes <- fmap (fmap oiSize) $ forM copyObjs (headObject bucket)
|
||||
|
||||
liftIO $ (sizes == uploadedSizes) @? "Uploaded obj sizes failed to match"
|
||||
|
||||
forM_ (concat [srcs, copyObjs]) (deleteObject bucket)
|
||||
|
||||
, funTestWithBucket "copyObject with offset test " $ \step bucket -> do
|
||||
let src = "XXX"
|
||||
copyObj = "XXXCopy"
|
||||
size = 15 * 1024 * 1024
|
||||
|
||||
step "Prepare"
|
||||
fPutObject bucket src =<< mkRandFile size
|
||||
|
||||
step "copy last 10MiB of object"
|
||||
copyObject bucket copyObj def{
|
||||
cpSource = format "/{}/{}" [bucket, src]
|
||||
, cpSourceRange = Just (5 * 1024 * 1024, size - 1)
|
||||
}
|
||||
|
||||
step "verify uploaded object"
|
||||
cSize <- oiSize <$> headObject bucket copyObj
|
||||
|
||||
liftIO $ (cSize == 10 * 1024 * 1024) @? "Uploaded obj size mismatched!"
|
||||
|
||||
forM_ [src, copyObj] (deleteObject bucket)
|
||||
]
|
||||
|
||||
@ -4,13 +4,14 @@ module Network.Minio.XmlParser.Test
|
||||
) where
|
||||
|
||||
import qualified Control.Monad.Catch as MC
|
||||
import Data.Time (fromGregorian, UTCTime(..))
|
||||
import Data.Time (fromGregorian)
|
||||
import Test.Tasty
|
||||
import Test.Tasty.HUnit
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.XmlParser
|
||||
|
||||
xmlParserTests :: TestTree
|
||||
@ -21,6 +22,7 @@ xmlParserTests = testGroup "XML Parser Tests"
|
||||
, testCase "Test parseListUploadsresponse" testParseListIncompleteUploads
|
||||
, testCase "Test parseCompleteMultipartUploadResponse" testParseCompleteMultipartUploadResponse
|
||||
, testCase "Test parseListPartsResponse" testParseListPartsResponse
|
||||
, testCase "Test parseCopyObjectResponse" testParseCopyObjectResponse
|
||||
]
|
||||
|
||||
tryMError :: (MC.MonadCatch m) => m a -> m (Either MError a)
|
||||
@ -210,3 +212,25 @@ testParseListPartsResponse = do
|
||||
|
||||
parsedListPartsResult <- runExceptT $ parseListPartsResponse xmldata
|
||||
eitherMError parsedListPartsResult (@?= expectedListResult)
|
||||
|
||||
testParseCopyObjectResponse :: Assertion
|
||||
testParseCopyObjectResponse = do
|
||||
let
|
||||
cases = [ ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
\<CopyObjectResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
\<LastModified>2009-10-28T22:32:00.000Z</LastModified>\
|
||||
\<ETag>\"9b2cf535f27731c974343645a3985328\"</ETag>\
|
||||
\</CopyObjectResult>",
|
||||
("\"9b2cf535f27731c974343645a3985328\"",
|
||||
UTCTime (fromGregorian 2009 10 28) 81120))
|
||||
, ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
\<CopyPartResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
\<LastModified>2009-10-28T22:32:00.000Z</LastModified>\
|
||||
\<ETag>\"9b2cf535f27731c974343645a3985328\"</ETag>\
|
||||
\</CopyPartResult>",
|
||||
("\"9b2cf535f27731c974343645a3985328\"",
|
||||
UTCTime (fromGregorian 2009 10 28) 81120))]
|
||||
|
||||
forM_ cases $ \(xmldata, (etag, modTime)) -> do
|
||||
parseResult <- runExceptT $ parseCopyObjectResponse xmldata
|
||||
eitherMError parseResult (@?= (etag, modTime))
|
||||
|
||||
54
test/Spec.hs
54
test/Spec.hs
@ -31,7 +31,7 @@ properties = testGroup "Properties" [qcProps] -- [scProps]
|
||||
|
||||
qcProps :: TestTree
|
||||
qcProps = testGroup "(checked by QuickCheck)"
|
||||
[ QC.testProperty "selectPartSizes: simple properties" $
|
||||
[ QC.testProperty "selectPartSizes:" $
|
||||
\n -> let (pns, offs, sizes) = L.unzip3 (selectPartSizes n)
|
||||
|
||||
-- check that pns increments from 1.
|
||||
@ -45,22 +45,52 @@ qcProps = testGroup "(checked by QuickCheck)"
|
||||
isOffsetsAsc = all (\(a, b) -> a < b) $ consPairs offs
|
||||
|
||||
-- check sizes sums to n.
|
||||
isSumSizeOk = n < 0 || (sum sizes == n && all (> 0) sizes)
|
||||
isSumSizeOk = sum sizes == n
|
||||
|
||||
-- check sizes are constant except last
|
||||
isSizesConstantExceptLast =
|
||||
n <= 0 || all (\(a, b) -> a == b) (consPairs $ L.init sizes)
|
||||
all (\(a, b) -> a == b) (consPairs $ L.init sizes)
|
||||
|
||||
in isPNumsAscendingFrom1 && isOffsetsAsc && isSumSizeOk &&
|
||||
isSizesConstantExceptLast
|
||||
-- check each part except last is at least minPartSize;
|
||||
-- last part may be 0 only if it is the only part.
|
||||
nparts = length sizes
|
||||
isMinPartSizeOk =
|
||||
if | nparts > 1 -> -- last part can be smaller but > 0
|
||||
all (>= minPartSize) (take (nparts - 1) sizes) &&
|
||||
all (\s -> s > 0) (drop (nparts - 1) sizes)
|
||||
| nparts == 1 -> -- size may be 0 here.
|
||||
maybe True (\x -> x >= 0 && x <= minPartSize) $
|
||||
headMay sizes
|
||||
| otherwise -> False
|
||||
|
||||
, QC.testProperty "selectPartSizes: part-size is at least 64MiB" $
|
||||
\n -> let (_, _, sizes) = L.unzip3 (selectPartSizes n)
|
||||
mib64 = 64 * 1024 * 1024
|
||||
in if | length sizes > 1 -> -- last part can be smaller but > 0
|
||||
all (>= mib64) (L.init sizes) && L.last sizes > 0
|
||||
| length sizes == 1 -> maybe True (> 0) $ head sizes
|
||||
| otherwise -> True
|
||||
in n < 0 ||
|
||||
(isPNumsAscendingFrom1 && isOffsetsAsc && isSumSizeOk &&
|
||||
isSizesConstantExceptLast && isMinPartSizeOk)
|
||||
|
||||
, QC.testProperty "selectCopyRanges:" $
|
||||
\(start, end) ->
|
||||
let (_, pairs) = L.unzip (selectCopyRanges (start, end))
|
||||
|
||||
-- is last part's snd offset end?
|
||||
isLastPartOk = maybe False ((end ==) . snd) $ lastMay pairs
|
||||
-- is first part's fst offset start
|
||||
isFirstPartOk = maybe False ((start ==) . fst) $ headMay pairs
|
||||
|
||||
-- each pair is >=64MiB except last, and all those parts
|
||||
-- have same size.
|
||||
initSizes = maybe [] (map (\(a, b) -> b - a + 1)) $ initMay pairs
|
||||
isPartSizesOk = all (>= minPartSize) initSizes &&
|
||||
maybe True (\k -> all (== k) initSizes)
|
||||
(headMay initSizes)
|
||||
|
||||
-- returned offsets are contiguous.
|
||||
fsts = drop 1 $ map fst pairs
|
||||
snds = take (length pairs - 1) $ map snd pairs
|
||||
isContParts = length fsts == length snds &&
|
||||
and (map (\(a, b) -> a == b + 1) $ zip fsts snds)
|
||||
|
||||
in start < 0 || start > end ||
|
||||
(isLastPartOk && isFirstPartOk && isPartSizesOk && isContParts)
|
||||
]
|
||||
|
||||
unitTests :: TestTree
|
||||
|
||||
Loading…
Reference in New Issue
Block a user