Initial high-level putobject

This commit is contained in:
Aditya Manthramurthy 2017-01-27 17:52:10 +05:30
parent 512c455fde
commit 74748cfb16
7 changed files with 241 additions and 36 deletions

View File

@ -19,17 +19,19 @@ library
exposed-modules: Network.Minio
, Network.Minio.S3API
other-modules: Lib.Prelude
, Network.Minio.API
, Network.Minio.Data
, Network.Minio.Data.ByteString
, Network.Minio.Data.Crypto
, Network.Minio.Data.Time
, Network.Minio.PutObject
, Network.Minio.Sign.V4
, Network.Minio.API
, Network.Minio.Utils
, Network.Minio.XmlParser
, Network.Minio.XmlGenerator
, Network.Minio.XmlParser
build-depends: base >= 4.7 && < 5
, protolude >= 0.1.6 && < 0.2
, async
, bytestring
, case-insensitive
, conduit
@ -43,6 +45,7 @@ library
, http-client
, http-conduit
, http-types
, lifted-async
, lifted-base
, memory
, monad-control
@ -60,6 +63,7 @@ library
, MultiParamTypeClasses
, MultiWayIf
, RankNTypes
, TypeFamilies
, TupleSections
test-suite minio-hs-test
@ -69,6 +73,7 @@ test-suite minio-hs-test
build-depends: base
, minio-hs
, protolude >= 0.1.6 && < 0.2
, async
, bytestring
, case-insensitive
, conduit
@ -82,6 +87,7 @@ test-suite minio-hs-test
, http-client
, http-conduit
, http-types
, lifted-async
, lifted-base
, memory
, monad-control
@ -106,6 +112,7 @@ test-suite minio-hs-test
, MultiWayIf
, RankNTypes
, TupleSections
, TypeFamilies
other-modules: Lib.Prelude
, Network.Minio
, Network.Minio.API
@ -113,12 +120,13 @@ test-suite minio-hs-test
, Network.Minio.Data.ByteString
, Network.Minio.Data.Crypto
, Network.Minio.Data.Time
, Network.Minio.PutObject
, Network.Minio.S3API
, Network.Minio.Sign.V4
, Network.Minio.XmlGenerator
, Network.Minio.XmlParser
, Network.Minio.Utils
, Network.Minio.XmlGenerator
, Network.Minio.XmlGenerator.Test
, Network.Minio.XmlParser
, Network.Minio.XmlParser.Test

View File

@ -29,22 +29,24 @@ module Network.Minio
, fGetObject
, fPutObject
, ObjectData(..)
, putObject
) where
{-
This module exports the high-level Minio API for object storage.
-}
import qualified Control.Monad.Trans.Resource as R
-- import qualified Control.Monad.Trans.Resource as R
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified System.IO as IO
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.PutObject
import Network.Minio.S3API
import Network.Minio.Utils
-- import Network.Minio.Utils
-- | Fetch the object and write it to the given file safely. The
-- object is first written to a temporary file in the same directory
@ -56,11 +58,5 @@ fGetObject bucket object fp = do
-- | Upload the given file to the given object.
fPutObject :: Bucket -> Object -> FilePath -> Minio ()
fPutObject bucket object fp = do
(releaseKey, h) <- allocateReadFile fp
size <- liftIO $ IO.hFileSize h
putObject bucket object [] h 0 (fromIntegral size)
-- release file handle
R.release releaseKey
fPutObject bucket object f = void $ putObject bucket object $
ODFile f Nothing

View File

@ -133,6 +133,7 @@ getRegionFromRI ri = maybe "us-east-1" identity (riRegion ri)
-- | Various validation errors
data MErrV = MErrVSinglePUTSizeExceeded Int64
| MErrVPutSizeExceeded Int64
| MErrVETagHeaderNotFound
deriving (Show)

View File

@ -0,0 +1,147 @@
module Network.Minio.PutObject
(
putObject
, ObjectData(..)
) where
import qualified Control.Monad.Trans.Resource as R
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.List as List
import qualified Data.ByteString.Lazy as LB
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.S3API
import Network.Minio.Utils
maxObjectSize :: Int64
maxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
oneMiB :: Int64
oneMiB = 1024 * 1024
maxMultipartParts :: Int64
maxMultipartParts = 10000
-- | A data-type to represent the source data for an object. A
-- file-path or a producer-conduit may be provided.
--
-- For files, a size may be provided - this is useful in cases when
-- the file size cannot be automatically determined or if only some
-- prefix of the file is desired.
--
-- For streams also, a size may be provided. This is useful to limit
-- the input - if it is not provided, upload will continue until the
-- stream ends or the object reaches `maxObjectsize` size.
data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional size.
| ODStream (C.Producer m ByteString) (Maybe Int64) -- ^ Pass size in bytes as maybe if known.
-- | Put an object from ObjectData. This high-level API handles
-- objects of all sizes, and even if the object size is unknown.
putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag
putObject b o (ODFile fp sizeMay) = do
isSeekable <- isFileSeekable fp
-- FIXME: allocateReadFile may return exceptions and shortcircuit
finalSizeMay <- maybe (do (rKey, h) <- allocateReadFile fp
sizeE <- getFileSize h
R.release rKey
return $ hush $ sizeE
)
(return . Just) sizeMay
case finalSizeMay of
-- unable to get size, so assume non-seekable file and max-object size
Nothing -> sequentialMultipartUpload b o (Just maxObjectSize) $
CB.sourceFile fp
-- got file size, so check for single/multipart upload
Just size ->
if | size <= 64 * oneMiB -> do
(rKey, h) <- allocateReadFile fp
etag <- putObjectSingle b o [] h 0 size
R.release rKey
return etag
| size > maxObjectSize -> throwError $ MErrValidation $
MErrVPutSizeExceeded size
| isSeekable -> parallelMultipartUpload b o fp size
| otherwise -> sequentialMultipartUpload b o (Just size) $
CB.sourceFile fp
putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src
-- | Select part sizes - the logic is that the minimum part-size will
-- be 64MiB. TODO: write quickcheck tests.
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes size = List.zip3 [1..] partOffsets partSizes
where
partSize = max (64 * oneMiB) (size `div` maxMultipartParts)
(numParts, lastPartSize) = size `divMod` partSize
lastPart = filter (> 0) [lastPartSize]
partSizes = replicate (fromIntegral numParts) partSize ++ lastPart
partOffsets = List.scanl' (+) 0 partSizes
parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64
-> Minio ETag
parallelMultipartUpload b o filePath size = do
let partSizeInfo = selectPartSizes size
-- get new upload id.
uploadId <- newMultipartUpload b o []
-- perform upload with 10 threads
uploadedParts <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo
completeMultipartUpload b o uploadId uploadedParts
where
uploadPart uploadId (partNum, offset, sz) = do
(rKey, h) <- allocateReadFile filePath
pInfo <- putObjectPart b o uploadId partNum [] $ PayloadH h offset sz
R.release rKey
return pInfo
-- | Upload multipart object from conduit source sequentially without
-- object size information.
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
-> C.Producer Minio ByteString -> Minio ETag
sequentialMultipartUpload b o sizeMay src = do
-- get new upload id.
uploadId <- newMultipartUpload b o []
-- upload parts in loop
uploadedParts <- loop uploadId rSrc partSizeInfo []
-- complete multipart upload
completeMultipartUpload b o uploadId uploadedParts
where
rSrc = C.newResumableSource src
partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay
-- make a sink that consumes only `s` bytes
limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs
-- FIXME: test, confirm and remove traceShowM statements
loop _ _ [] uploadedParts = return $ reverse uploadedParts
loop uid rSource ((partNum, _, size):ps) u = do
-- load data from resume-able source into bytestring.
(newSource, buf) <- rSource C.$$++ (limitedSink size)
traceShowM "psize: "
traceShowM (LB.length buf)
-- check if we got size bytes.
if LB.length buf == size
-- upload the full size part.
then do pInfo <- putObjectPart b o uid partNum [] $
PayloadBS $ LB.toStrict buf
loop uid newSource ps (pInfo:u)
-- got a smaller part, so its the last one.
else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.")
finalData <- newSource C.$$+- (limitedSink size)
traceShowM "finalData size:"
traceShowM (LB.length finalData)
pInfo <- putObjectPart b o uid partNum [] $
PayloadBS $ LB.toStrict buf
return $ reverse (pInfo:u)

View File

@ -17,12 +17,7 @@ module Network.Minio.S3API
-- * Creating buckets and objects
---------------------------------
, putBucket
, putObject
-- * Deletion APIs
--------------------------
, deleteBucket
, deleteObject
, putObjectSingle
-- * Multipart Upload APIs
--------------------------
@ -31,6 +26,12 @@ module Network.Minio.S3API
, completeMultipartUpload
, abortMultipartUpload
, listIncompleteUploads
-- * Deletion APIs
--------------------------
, deleteBucket
, deleteObject
) where
import qualified Data.Conduit as C
@ -89,21 +90,28 @@ maxSinglePutObjectSizeBytes = 5 * 1024 * 1024 * 1024
-- | PUT an object into the service. This function performs a single
-- PUT object call, and so can only transfer objects upto 5GiB.
putObject :: Bucket -> Object -> [HT.Header] -> Handle -> Int64
-> Int64 -> Minio ()
putObject bucket object headers h offset size = do
putObjectSingle :: Bucket -> Object -> [HT.Header] -> Handle -> Int64
-> Int64 -> Minio ETag
putObjectSingle bucket object headers h offset size = do
-- check length is within single PUT object size.
when (size > maxSinglePutObjectSizeBytes) $
throwError $ MErrValidation $ MErrVSinglePUTSizeExceeded size
-- content-length header is automatically set by library.
void $ executeRequest $
def { riMethod = HT.methodPut
, riBucket = Just bucket
, riObject = Just object
, riHeaders = headers
, riPayload = PayloadH h offset size
}
resp <- executeRequest $
def { riMethod = HT.methodPut
, riBucket = Just bucket
, riObject = Just object
, riHeaders = headers
, riPayload = PayloadH h offset size
}
let rheaders = NC.responseHeaders resp
etag = getETagHeader rheaders
maybe
(throwError $ MErrValidation MErrVETagHeaderNotFound)
return etag
-- | List objects in a bucket matching prefix up to delimiter,
@ -156,8 +164,8 @@ newMultipartUpload bucket object headers = do
-- | PUT a part of an object as part of a multipart upload.
putObjectPart :: Bucket -> Object -> UploadId -> PartNumber -> [HT.Header]
-> Handle -> Int64 -> Int64 -> Minio PartInfo
putObjectPart bucket object uploadId partNumber headers h offset size = do
-> Payload -> Minio PartInfo
putObjectPart bucket object uploadId partNumber headers payload = do
resp <- executeRequest $
def { riMethod = HT.methodPut
, riBucket = Just bucket
@ -166,7 +174,7 @@ putObjectPart bucket object uploadId partNumber headers h offset size = do
show partNumber),
("uploadId", Just $ encodeUtf8 uploadId)]
, riHeaders = headers
, riPayload = PayloadH h offset size
, riPayload = payload
}
let rheaders = NC.responseHeaders resp
etag = getETagHeader rheaders

View File

@ -1,6 +1,9 @@
module Network.Minio.Utils where
import qualified Control.Concurrent.Async.Lifted as A
import qualified Control.Concurrent.QSem as Q
import qualified Control.Exception.Lifted as ExL
import Control.Monad.Trans.Control (liftBaseOp_, StM)
import qualified Control.Monad.Trans.Resource as R
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Conduit as C
@ -21,9 +24,21 @@ allocateReadFile fp = do
(rk, hdlE) <- R.allocate (openReadFile fp) cleanup
either (throwError . MErrIO) (return . (rk,)) hdlE
where
openReadFile f = runExceptT $ tryIO $ IO.openBinaryFile f IO.ReadMode
openReadFile f = ExL.try $ IO.openBinaryFile f IO.ReadMode
cleanup = either (const $ return ()) IO.hClose
getFileSize :: (R.MonadResourceBase m, R.MonadResource m, MonadError MinioErr m)
=> Handle -> m (Either IOException Int64)
getFileSize h = ExL.try $ liftIO $ fromIntegral <$> IO.hFileSize h
isFileSeekable :: (R.MonadResource m, MonadError MinioErr m)
=> FilePath -> m Bool
isFileSeekable fp = do
(rKey, h) <- allocateReadFile fp
isSeekable <- liftIO $ IO.hIsSeekable h
R.release rKey
return isSeekable
lookupHeader :: HT.HeaderName -> [HT.Header] -> Maybe ByteString
lookupHeader hdr = headMay . map snd . filter (\(h, _) -> h == hdr)
@ -61,3 +76,22 @@ http req mgr = do
lbsResp <- NC.lbsResponse resp
throwError $ MErrService $ LBS.toStrict $ NC.responseBody lbsResp
return resp
-- like mapConcurrently but with a limited number of concurrent
-- threads.
limitedMapConcurrently :: forall t a (m :: * -> *) b.
(MonadIO m, R.MonadBaseControl IO m,
StM m a ~ StM m b)
=> Int -> (t -> m a) -> [t] -> m [b]
limitedMapConcurrently count act args = do
qSem <- liftIO $ Q.newQSem count
threads <- workOn qSem args
mapM A.wait threads
where
workOn _ [] = return []
workOn qs (a:as) = liftBaseOp_
(bracket_ (Q.waitQSem qs) (Q.signalQSem qs)) $
do
thread <- A.async $ act a
others <- workOn qs as
return (thread : others)

View File

@ -99,7 +99,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
h <- liftIO $ SIO.openBinaryFile "/tmp/inputfile" SIO.ReadMode
let mb15 = 15 * 1024 * 1024
partInfo <- forM [1..10] $ \pnum ->
putObjectPart bucket object uid pnum [] h 0 mb15
putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15
step "complete multipart"
etag <- completeMultipartUpload bucket object uid partInfo
@ -141,6 +141,17 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
step "list incomplete multipart uploads"
incompleteUploads <- listIncompleteUploads bucket Nothing Nothing Nothing Nothing
liftIO $ (length $ lurUploads incompleteUploads) @?= 10
, funTestWithBucket "multipart" "testbucket5" $ \step bucket -> do
step "upload large object"
-- fPutObject bucket "big" "/tmp/large"
-- putObject bucket "big" ("/dev/zero")
etag <- putObject bucket "big" (ODFile "/dev/zero" $ Just $ 1024*1024*100)
traceShowM etag
step "cleanup"
deleteObject bucket "big"
]
unitTests :: TestTree