diff --git a/minio-hs.cabal b/minio-hs.cabal index c19f411..e1307cc 100644 --- a/minio-hs.cabal +++ b/minio-hs.cabal @@ -19,17 +19,19 @@ library exposed-modules: Network.Minio , Network.Minio.S3API other-modules: Lib.Prelude + , Network.Minio.API , Network.Minio.Data , Network.Minio.Data.ByteString , Network.Minio.Data.Crypto , Network.Minio.Data.Time + , Network.Minio.PutObject , Network.Minio.Sign.V4 - , Network.Minio.API , Network.Minio.Utils - , Network.Minio.XmlParser , Network.Minio.XmlGenerator + , Network.Minio.XmlParser build-depends: base >= 4.7 && < 5 , protolude >= 0.1.6 && < 0.2 + , async , bytestring , case-insensitive , conduit @@ -43,6 +45,7 @@ library , http-client , http-conduit , http-types + , lifted-async , lifted-base , memory , monad-control @@ -60,6 +63,7 @@ library , MultiParamTypeClasses , MultiWayIf , RankNTypes + , TypeFamilies , TupleSections test-suite minio-hs-test @@ -69,6 +73,7 @@ test-suite minio-hs-test build-depends: base , minio-hs , protolude >= 0.1.6 && < 0.2 + , async , bytestring , case-insensitive , conduit @@ -82,6 +87,7 @@ test-suite minio-hs-test , http-client , http-conduit , http-types + , lifted-async , lifted-base , memory , monad-control @@ -106,6 +112,7 @@ test-suite minio-hs-test , MultiWayIf , RankNTypes , TupleSections + , TypeFamilies other-modules: Lib.Prelude , Network.Minio , Network.Minio.API @@ -113,12 +120,13 @@ test-suite minio-hs-test , Network.Minio.Data.ByteString , Network.Minio.Data.Crypto , Network.Minio.Data.Time + , Network.Minio.PutObject , Network.Minio.S3API , Network.Minio.Sign.V4 - , Network.Minio.XmlGenerator - , Network.Minio.XmlParser , Network.Minio.Utils + , Network.Minio.XmlGenerator , Network.Minio.XmlGenerator.Test + , Network.Minio.XmlParser , Network.Minio.XmlParser.Test diff --git a/src/Network/Minio.hs b/src/Network/Minio.hs index 8422fe9..2431182 100644 --- a/src/Network/Minio.hs +++ b/src/Network/Minio.hs @@ -29,22 +29,24 @@ module Network.Minio , fGetObject , fPutObject + , ObjectData(..) + , putObject ) where {- This module exports the high-level Minio API for object storage. -} -import qualified Control.Monad.Trans.Resource as R +-- import qualified Control.Monad.Trans.Resource as R import qualified Data.Conduit as C import qualified Data.Conduit.Binary as CB -import qualified System.IO as IO import Lib.Prelude import Network.Minio.Data +import Network.Minio.PutObject import Network.Minio.S3API -import Network.Minio.Utils +-- import Network.Minio.Utils -- | Fetch the object and write it to the given file safely. The -- object is first written to a temporary file in the same directory @@ -56,11 +58,5 @@ fGetObject bucket object fp = do -- | Upload the given file to the given object. fPutObject :: Bucket -> Object -> FilePath -> Minio () -fPutObject bucket object fp = do - (releaseKey, h) <- allocateReadFile fp - - size <- liftIO $ IO.hFileSize h - putObject bucket object [] h 0 (fromIntegral size) - - -- release file handle - R.release releaseKey +fPutObject bucket object f = void $ putObject bucket object $ + ODFile f Nothing diff --git a/src/Network/Minio/Data.hs b/src/Network/Minio/Data.hs index 6da906b..a622766 100644 --- a/src/Network/Minio/Data.hs +++ b/src/Network/Minio/Data.hs @@ -133,6 +133,7 @@ getRegionFromRI ri = maybe "us-east-1" identity (riRegion ri) -- | Various validation errors data MErrV = MErrVSinglePUTSizeExceeded Int64 + | MErrVPutSizeExceeded Int64 | MErrVETagHeaderNotFound deriving (Show) diff --git a/src/Network/Minio/PutObject.hs b/src/Network/Minio/PutObject.hs new file mode 100644 index 0000000..3b27864 --- /dev/null +++ b/src/Network/Minio/PutObject.hs @@ -0,0 +1,147 @@ +module Network.Minio.PutObject + ( + putObject + , ObjectData(..) + ) where + + +import qualified Control.Monad.Trans.Resource as R +import qualified Data.Conduit as C +import qualified Data.Conduit.Binary as CB +import qualified Data.List as List +import qualified Data.ByteString.Lazy as LB + +import Lib.Prelude + +import Network.Minio.Data +import Network.Minio.S3API +import Network.Minio.Utils + + +maxObjectSize :: Int64 +maxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + +oneMiB :: Int64 +oneMiB = 1024 * 1024 + +maxMultipartParts :: Int64 +maxMultipartParts = 10000 + +-- | A data-type to represent the source data for an object. A +-- file-path or a producer-conduit may be provided. +-- +-- For files, a size may be provided - this is useful in cases when +-- the file size cannot be automatically determined or if only some +-- prefix of the file is desired. +-- +-- For streams also, a size may be provided. This is useful to limit +-- the input - if it is not provided, upload will continue until the +-- stream ends or the object reaches `maxObjectsize` size. +data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional size. + | ODStream (C.Producer m ByteString) (Maybe Int64) -- ^ Pass size in bytes as maybe if known. + +-- | Put an object from ObjectData. This high-level API handles +-- objects of all sizes, and even if the object size is unknown. +putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag +putObject b o (ODFile fp sizeMay) = do + isSeekable <- isFileSeekable fp + + -- FIXME: allocateReadFile may return exceptions and shortcircuit + finalSizeMay <- maybe (do (rKey, h) <- allocateReadFile fp + sizeE <- getFileSize h + R.release rKey + return $ hush $ sizeE + ) + (return . Just) sizeMay + + case finalSizeMay of + -- unable to get size, so assume non-seekable file and max-object size + Nothing -> sequentialMultipartUpload b o (Just maxObjectSize) $ + CB.sourceFile fp + + -- got file size, so check for single/multipart upload + Just size -> + if | size <= 64 * oneMiB -> do + (rKey, h) <- allocateReadFile fp + etag <- putObjectSingle b o [] h 0 size + R.release rKey + return etag + | size > maxObjectSize -> throwError $ MErrValidation $ + MErrVPutSizeExceeded size + | isSeekable -> parallelMultipartUpload b o fp size + | otherwise -> sequentialMultipartUpload b o (Just size) $ + CB.sourceFile fp +putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src + +-- | Select part sizes - the logic is that the minimum part-size will +-- be 64MiB. TODO: write quickcheck tests. +selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)] +selectPartSizes size = List.zip3 [1..] partOffsets partSizes + where + partSize = max (64 * oneMiB) (size `div` maxMultipartParts) + (numParts, lastPartSize) = size `divMod` partSize + lastPart = filter (> 0) [lastPartSize] + partSizes = replicate (fromIntegral numParts) partSize ++ lastPart + partOffsets = List.scanl' (+) 0 partSizes + +parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64 + -> Minio ETag +parallelMultipartUpload b o filePath size = do + let partSizeInfo = selectPartSizes size + + -- get new upload id. + uploadId <- newMultipartUpload b o [] + + -- perform upload with 10 threads + uploadedParts <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo + + completeMultipartUpload b o uploadId uploadedParts + where + uploadPart uploadId (partNum, offset, sz) = do + (rKey, h) <- allocateReadFile filePath + pInfo <- putObjectPart b o uploadId partNum [] $ PayloadH h offset sz + R.release rKey + return pInfo + +-- | Upload multipart object from conduit source sequentially without +-- object size information. +sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 + -> C.Producer Minio ByteString -> Minio ETag +sequentialMultipartUpload b o sizeMay src = do + -- get new upload id. + uploadId <- newMultipartUpload b o [] + + -- upload parts in loop + uploadedParts <- loop uploadId rSrc partSizeInfo [] + + -- complete multipart upload + completeMultipartUpload b o uploadId uploadedParts + where + rSrc = C.newResumableSource src + partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay + + -- make a sink that consumes only `s` bytes + limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs + + -- FIXME: test, confirm and remove traceShowM statements + loop _ _ [] uploadedParts = return $ reverse uploadedParts + loop uid rSource ((partNum, _, size):ps) u = do + -- load data from resume-able source into bytestring. + (newSource, buf) <- rSource C.$$++ (limitedSink size) + traceShowM "psize: " + traceShowM (LB.length buf) + -- check if we got size bytes. + if LB.length buf == size + -- upload the full size part. + then do pInfo <- putObjectPart b o uid partNum [] $ + PayloadBS $ LB.toStrict buf + loop uid newSource ps (pInfo:u) + + -- got a smaller part, so its the last one. + else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.") + finalData <- newSource C.$$+- (limitedSink size) + traceShowM "finalData size:" + traceShowM (LB.length finalData) + pInfo <- putObjectPart b o uid partNum [] $ + PayloadBS $ LB.toStrict buf + return $ reverse (pInfo:u) diff --git a/src/Network/Minio/S3API.hs b/src/Network/Minio/S3API.hs index 504a5b3..661d99a 100644 --- a/src/Network/Minio/S3API.hs +++ b/src/Network/Minio/S3API.hs @@ -17,12 +17,7 @@ module Network.Minio.S3API -- * Creating buckets and objects --------------------------------- , putBucket - , putObject - - -- * Deletion APIs - -------------------------- - , deleteBucket - , deleteObject + , putObjectSingle -- * Multipart Upload APIs -------------------------- @@ -31,6 +26,12 @@ module Network.Minio.S3API , completeMultipartUpload , abortMultipartUpload , listIncompleteUploads + + -- * Deletion APIs + -------------------------- + , deleteBucket + , deleteObject + ) where import qualified Data.Conduit as C @@ -89,21 +90,28 @@ maxSinglePutObjectSizeBytes = 5 * 1024 * 1024 * 1024 -- | PUT an object into the service. This function performs a single -- PUT object call, and so can only transfer objects upto 5GiB. -putObject :: Bucket -> Object -> [HT.Header] -> Handle -> Int64 - -> Int64 -> Minio () -putObject bucket object headers h offset size = do +putObjectSingle :: Bucket -> Object -> [HT.Header] -> Handle -> Int64 + -> Int64 -> Minio ETag +putObjectSingle bucket object headers h offset size = do -- check length is within single PUT object size. when (size > maxSinglePutObjectSizeBytes) $ throwError $ MErrValidation $ MErrVSinglePUTSizeExceeded size -- content-length header is automatically set by library. - void $ executeRequest $ - def { riMethod = HT.methodPut - , riBucket = Just bucket - , riObject = Just object - , riHeaders = headers - , riPayload = PayloadH h offset size - } + resp <- executeRequest $ + def { riMethod = HT.methodPut + , riBucket = Just bucket + , riObject = Just object + , riHeaders = headers + , riPayload = PayloadH h offset size + } + + let rheaders = NC.responseHeaders resp + etag = getETagHeader rheaders + maybe + (throwError $ MErrValidation MErrVETagHeaderNotFound) + return etag + -- | List objects in a bucket matching prefix up to delimiter, @@ -156,8 +164,8 @@ newMultipartUpload bucket object headers = do -- | PUT a part of an object as part of a multipart upload. putObjectPart :: Bucket -> Object -> UploadId -> PartNumber -> [HT.Header] - -> Handle -> Int64 -> Int64 -> Minio PartInfo -putObjectPart bucket object uploadId partNumber headers h offset size = do + -> Payload -> Minio PartInfo +putObjectPart bucket object uploadId partNumber headers payload = do resp <- executeRequest $ def { riMethod = HT.methodPut , riBucket = Just bucket @@ -166,7 +174,7 @@ putObjectPart bucket object uploadId partNumber headers h offset size = do show partNumber), ("uploadId", Just $ encodeUtf8 uploadId)] , riHeaders = headers - , riPayload = PayloadH h offset size + , riPayload = payload } let rheaders = NC.responseHeaders resp etag = getETagHeader rheaders diff --git a/src/Network/Minio/Utils.hs b/src/Network/Minio/Utils.hs index 5f95410..448d241 100644 --- a/src/Network/Minio/Utils.hs +++ b/src/Network/Minio/Utils.hs @@ -1,6 +1,9 @@ module Network.Minio.Utils where +import qualified Control.Concurrent.Async.Lifted as A +import qualified Control.Concurrent.QSem as Q import qualified Control.Exception.Lifted as ExL +import Control.Monad.Trans.Control (liftBaseOp_, StM) import qualified Control.Monad.Trans.Resource as R import qualified Data.ByteString.Lazy as LBS import qualified Data.Conduit as C @@ -21,9 +24,21 @@ allocateReadFile fp = do (rk, hdlE) <- R.allocate (openReadFile fp) cleanup either (throwError . MErrIO) (return . (rk,)) hdlE where - openReadFile f = runExceptT $ tryIO $ IO.openBinaryFile f IO.ReadMode + openReadFile f = ExL.try $ IO.openBinaryFile f IO.ReadMode cleanup = either (const $ return ()) IO.hClose +getFileSize :: (R.MonadResourceBase m, R.MonadResource m, MonadError MinioErr m) + => Handle -> m (Either IOException Int64) +getFileSize h = ExL.try $ liftIO $ fromIntegral <$> IO.hFileSize h + +isFileSeekable :: (R.MonadResource m, MonadError MinioErr m) + => FilePath -> m Bool +isFileSeekable fp = do + (rKey, h) <- allocateReadFile fp + isSeekable <- liftIO $ IO.hIsSeekable h + R.release rKey + return isSeekable + lookupHeader :: HT.HeaderName -> [HT.Header] -> Maybe ByteString lookupHeader hdr = headMay . map snd . filter (\(h, _) -> h == hdr) @@ -61,3 +76,22 @@ http req mgr = do lbsResp <- NC.lbsResponse resp throwError $ MErrService $ LBS.toStrict $ NC.responseBody lbsResp return resp + +-- like mapConcurrently but with a limited number of concurrent +-- threads. +limitedMapConcurrently :: forall t a (m :: * -> *) b. + (MonadIO m, R.MonadBaseControl IO m, + StM m a ~ StM m b) + => Int -> (t -> m a) -> [t] -> m [b] +limitedMapConcurrently count act args = do + qSem <- liftIO $ Q.newQSem count + threads <- workOn qSem args + mapM A.wait threads + where + workOn _ [] = return [] + workOn qs (a:as) = liftBaseOp_ + (bracket_ (Q.waitQSem qs) (Q.signalQSem qs)) $ + do + thread <- A.async $ act a + others <- workOn qs as + return (thread : others) diff --git a/test/Spec.hs b/test/Spec.hs index e077462..efcab1b 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -99,7 +99,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server" h <- liftIO $ SIO.openBinaryFile "/tmp/inputfile" SIO.ReadMode let mb15 = 15 * 1024 * 1024 partInfo <- forM [1..10] $ \pnum -> - putObjectPart bucket object uid pnum [] h 0 mb15 + putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15 step "complete multipart" etag <- completeMultipartUpload bucket object uid partInfo @@ -141,6 +141,17 @@ liveServerUnitTests = testGroup "Unit tests against a live server" step "list incomplete multipart uploads" incompleteUploads <- listIncompleteUploads bucket Nothing Nothing Nothing Nothing liftIO $ (length $ lurUploads incompleteUploads) @?= 10 + + , funTestWithBucket "multipart" "testbucket5" $ \step bucket -> do + + step "upload large object" + -- fPutObject bucket "big" "/tmp/large" + -- putObject bucket "big" ("/dev/zero") + etag <- putObject bucket "big" (ODFile "/dev/zero" $ Just $ 1024*1024*100) + traceShowM etag + + step "cleanup" + deleteObject bucket "big" ] unitTests :: TestTree