From c430e3d7472fde1cf4378125a7933ccc609b4b5e Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 7 Feb 2017 14:56:28 +0530 Subject: [PATCH] Makes sequential uploads resumable. --- minio-hs.cabal | 6 ++- src/Lib/Prelude.hs | 2 +- src/Network/Minio.hs | 47 +------------------ src/Network/Minio/Data.hs | 8 ++-- src/Network/Minio/Data/Crypto.hs | 8 +++- src/Network/Minio/ListOps.hs | 56 ++++++++++++++++++++++ src/Network/Minio/PutObject.hs | 79 +++++++++++++++++++------------- 7 files changed, 120 insertions(+), 86 deletions(-) create mode 100644 src/Network/Minio/ListOps.hs diff --git a/minio-hs.cabal b/minio-hs.cabal index e0f997a..7f7b417 100644 --- a/minio-hs.cabal +++ b/minio-hs.cabal @@ -24,6 +24,7 @@ library , Network.Minio.Data.ByteString , Network.Minio.Data.Crypto , Network.Minio.Data.Time + , Network.Minio.ListOps , Network.Minio.PutObject , Network.Minio.Sign.V4 , Network.Minio.Utils @@ -59,10 +60,10 @@ library default-language: Haskell2010 default-extensions: FlexibleContexts , FlexibleInstances - , OverloadedStrings - , NoImplicitPrelude , MultiParamTypeClasses , MultiWayIf + , NoImplicitPrelude + , OverloadedStrings , RankNTypes , ScopedTypeVariables , TypeFamilies @@ -124,6 +125,7 @@ test-suite minio-hs-test , Network.Minio.Data.ByteString , Network.Minio.Data.Crypto , Network.Minio.Data.Time + , Network.Minio.ListOps , Network.Minio.PutObject , Network.Minio.S3API , Network.Minio.Sign.V4 diff --git a/src/Lib/Prelude.hs b/src/Lib/Prelude.hs index f761999..8034753 100644 --- a/src/Lib/Prelude.hs +++ b/src/Lib/Prelude.hs @@ -11,6 +11,6 @@ module Lib.Prelude import Protolude as Exports import Data.Time as Exports (UTCTime) -import Data.Maybe as Exports (catMaybes, listToMaybe) +import Control.Monad.Trans.Maybe as Exports (runMaybeT, MaybeT(..)) import Control.Monad.Catch as Exports (throwM, MonadThrow, MonadCatch) diff --git a/src/Network/Minio.hs b/src/Network/Minio.hs index cd3d3bf..1a6cdcd 100644 --- a/src/Network/Minio.hs +++ b/src/Network/Minio.hs @@ -40,17 +40,15 @@ module Network.Minio This module exports the high-level Minio API for object storage. -} --- import qualified Control.Monad.Trans.Resource as R import qualified Data.Conduit as C import qualified Data.Conduit.Binary as CB -import qualified Data.Conduit.List as CL import Lib.Prelude import Network.Minio.Data +import Network.Minio.ListOps import Network.Minio.PutObject import Network.Minio.S3API --- import Network.Minio.Utils -- | Fetch the object and write it to the given file safely. The -- object is first written to a temporary file in the same directory @@ -64,46 +62,3 @@ fGetObject bucket object fp = do fPutObject :: Bucket -> Object -> FilePath -> Minio () fPutObject bucket object f = void $ putObject bucket object $ ODFile f Nothing - --- | List objects in a bucket matching the given prefix. If recurse is --- set to True objects matching prefix are recursively listed. -listObjects :: Bucket -> Maybe Text -> Bool -> C.Producer Minio ObjectInfo -listObjects bucket prefix recurse = loop Nothing - where - loop :: Maybe Text -> C.Producer Minio ObjectInfo - loop nextToken = do - let - delimiter = bool (Just "/") Nothing recurse - - res <- lift $ listObjects' bucket prefix nextToken delimiter - CL.sourceList $ lorObjects res - when (lorHasMore res) $ - loop (lorNextToken res) - --- | List incomplete uploads in a bucket matching the given prefix. If --- recurse is set to True incomplete uploads for the given prefix are --- recursively listed. -listIncompleteUploads :: Bucket -> Maybe Text -> Bool -> C.Producer Minio UploadInfo -listIncompleteUploads bucket prefix recurse = loop Nothing Nothing - where - loop :: Maybe Text -> Maybe Text -> C.Producer Minio UploadInfo - loop nextKeyMarker nextUploadIdMarker = do - let - delimiter = bool (Just "/") Nothing recurse - - res <- lift $ listIncompleteUploads' bucket prefix delimiter nextKeyMarker nextUploadIdMarker - CL.sourceList $ lurUploads res - when (lurHasMore res) $ - loop nextKeyMarker nextUploadIdMarker - --- | List object parts of an ongoing multipart upload for given --- bucket, object and uploadId. -listIncompleteParts :: Bucket -> Object -> UploadId -> C.Producer Minio ListPartInfo -listIncompleteParts bucket object uploadId = loop Nothing - where - loop :: Maybe Text -> C.Producer Minio ListPartInfo - loop nextPartMarker = do - res <- lift $ listIncompleteParts' bucket object uploadId Nothing nextPartMarker - CL.sourceList $ lprParts res - when (lprHasMore res) $ - loop (show <$> lprNextPart res) diff --git a/src/Network/Minio/Data.hs b/src/Network/Minio/Data.hs index cb284f5..d4f0544 100644 --- a/src/Network/Minio/Data.hs +++ b/src/Network/Minio/Data.hs @@ -78,7 +78,7 @@ data ListPartsResult = ListPartsResult { -- | Represents information about an object part in an ongoing -- multipart upload. data ListPartInfo = ListPartInfo { - piNumber :: Int + piNumber :: PartNumber , piETag :: ETag , piSize :: Int64 , piModTime :: UTCTime @@ -188,10 +188,8 @@ runMinio :: ConnectInfo -> Minio a -> ResourceT IO (Either MinioErr a) runMinio ci m = do conn <- liftIO $ connect ci flip runReaderT conn . unMinio $ - (m >>= (return . Right)) - `MC.catch` handlerME - `MC.catch` handlerHE - `MC.catch` handlerFE + (m >>= (return . Right)) `MC.catches` + [MC.Handler handlerME, MC.Handler handlerHE, MC.Handler handlerFE] where handlerME = return . Left . ME handlerHE = return . Left . MEHttp diff --git a/src/Network/Minio/Data/Crypto.hs b/src/Network/Minio/Data/Crypto.hs index efe10eb..800f4c3 100644 --- a/src/Network/Minio/Data/Crypto.hs +++ b/src/Network/Minio/Data/Crypto.hs @@ -2,13 +2,16 @@ module Network.Minio.Data.Crypto ( hashSHA256 , hashSHA256FromSource + + , hashMD5 + , hmacSHA256 , hmacSHA256RawBS , digestToBS , digestToBase16 ) where -import Crypto.Hash (SHA256(..), hashWith, Digest) +import Crypto.Hash (SHA256(..), MD5(..), hashWith, Digest) import Crypto.Hash.Conduit (sinkHash) import Crypto.MAC.HMAC (hmac, HMAC) import Data.ByteArray (ByteArrayAccess, convert) @@ -40,3 +43,6 @@ digestToBS = convert digestToBase16 :: ByteArrayAccess a => a -> ByteString digestToBase16 = convertToBase Base16 + +hashMD5 :: ByteString -> ByteString +hashMD5 = digestToBase16 . hashWith MD5 diff --git a/src/Network/Minio/ListOps.hs b/src/Network/Minio/ListOps.hs new file mode 100644 index 0000000..b9ea16e --- /dev/null +++ b/src/Network/Minio/ListOps.hs @@ -0,0 +1,56 @@ +module Network.Minio.ListOps where + +import qualified Data.Conduit as C +import qualified Data.Conduit.List as CL + +import Lib.Prelude + +import Network.Minio.Data +import Network.Minio.S3API + +-- | List objects in a bucket matching the given prefix. If recurse is +-- set to True objects matching prefix are recursively listed. +listObjects :: Bucket -> Maybe Text -> Bool -> C.Producer Minio ObjectInfo +listObjects bucket prefix recurse = loop Nothing + where + loop :: Maybe Text -> C.Producer Minio ObjectInfo + loop nextToken = do + let + delimiter = bool (Just "/") Nothing recurse + + res <- lift $ listObjects' bucket prefix nextToken delimiter + CL.sourceList $ lorObjects res + when (lorHasMore res) $ + loop (lorNextToken res) + +-- | List incomplete uploads in a bucket matching the given prefix. If +-- recurse is set to True incomplete uploads for the given prefix are +-- recursively listed. +listIncompleteUploads :: Bucket -> Maybe Text -> Bool + -> C.Producer Minio UploadInfo +listIncompleteUploads bucket prefix recurse = loop Nothing Nothing + where + loop :: Maybe Text -> Maybe Text -> C.Producer Minio UploadInfo + loop nextKeyMarker nextUploadIdMarker = do + let + delimiter = bool (Just "/") Nothing recurse + + res <- lift $ listIncompleteUploads' bucket prefix delimiter + nextKeyMarker nextUploadIdMarker + CL.sourceList $ lurUploads res + when (lurHasMore res) $ + loop nextKeyMarker nextUploadIdMarker + +-- | List object parts of an ongoing multipart upload for given +-- bucket, object and uploadId. +listIncompleteParts :: Bucket -> Object -> UploadId + -> C.Producer Minio ListPartInfo +listIncompleteParts bucket object uploadId = loop Nothing + where + loop :: Maybe Text -> C.Producer Minio ListPartInfo + loop nextPartMarker = do + res <- lift $ listIncompleteParts' bucket object uploadId Nothing + nextPartMarker + CL.sourceList $ lprParts res + when (lprHasMore res) $ + loop (show <$> lprNextPart res) diff --git a/src/Network/Minio/PutObject.hs b/src/Network/Minio/PutObject.hs index f7e633e..b777141 100644 --- a/src/Network/Minio/PutObject.hs +++ b/src/Network/Minio/PutObject.hs @@ -6,19 +6,24 @@ module Network.Minio.PutObject import qualified Data.Conduit as C +import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.Binary as CB import qualified Data.List as List import qualified Data.ByteString.Lazy as LB +import qualified Data.Map.Strict as Map import Lib.Prelude import Network.Minio.Data +import Network.Minio.Data.Crypto +import Network.Minio.ListOps import Network.Minio.S3API import Network.Minio.Utils +-- | max obj size is 5TiB maxObjectSize :: Int64 -maxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 +maxObjectSize = 5 * 1024 * 1024 * oneMiB oneMiB :: Int64 oneMiB = 1024 * 1024 @@ -43,10 +48,8 @@ data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and option -- objects of all sizes, and even if the object size is unknown. putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag putObject b o (ODFile fp sizeMay) = do - hResE <- withNewHandle fp $ \h -> do - isSeekable <- isHandleSeekable h - handleSizeMay <- getFileSize h - return (isSeekable, handleSizeMay) + hResE <- withNewHandle fp $ \h -> + liftM2 (,) (isHandleSeekable h) (getFileSize h) (isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return hResE @@ -106,11 +109,13 @@ parallelMultipartUpload b o filePath size = do sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 -> C.Producer Minio ByteString -> Minio ETag sequentialMultipartUpload b o sizeMay src = do - -- get new upload id. - uploadId <- newMultipartUpload b o [] + (uidMay, pinfos) <- getExistingUpload b o + + -- get a new upload id if needed. + uploadId <- maybe (newMultipartUpload b o []) return uidMay -- upload parts in loop - uploadedParts <- loop uploadId rSrc partSizeInfo [] + uploadedParts <- loop pinfos uploadId rSrc partSizeInfo [] -- complete multipart upload completeMultipartUpload b o uploadId uploadedParts @@ -118,40 +123,52 @@ sequentialMultipartUpload b o sizeMay src = do rSrc = C.newResumableSource src partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay + -- returns partinfo if part is already uploaded. + checkUploadNeeded :: LByteString -> PartNumber + -> Map.Map PartNumber ListPartInfo + -> Maybe PartInfo + checkUploadNeeded lbs n pmap = do + pinfo@(ListPartInfo _ etag size _) <- Map.lookup n pmap + bool Nothing (return (PartInfo n etag)) $ + LB.length lbs == size && + hashMD5 (LB.toStrict lbs) == encodeUtf8 etag + -- make a sink that consumes only `s` bytes limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs -- FIXME: test, confirm and remove traceShowM statements - loop _ _ [] uploadedParts = return $ reverse uploadedParts - loop uid rSource ((partNum, _, size):ps) u = do + loop _ _ _ [] uparts = return $ reverse uparts + loop pinfos uid rSource ((partNum, _, size):ps) u = do + -- load data from resume-able source into bytestring. (newSource, buf) <- rSource C.$$++ (limitedSink size) traceShowM "psize: " traceShowM (LB.length buf) - -- check if we got size bytes. - if LB.length buf == size - -- upload the full size part. - then do pInfo <- putObjectPart b o uid partNum [] $ - PayloadBS $ LB.toStrict buf - loop uid newSource ps (pInfo:u) - -- got a smaller part, so its the last one. - else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.") - finalData <- newSource C.$$+- (limitedSink size) - traceShowM "finalData size:" - traceShowM (LB.length finalData) - pInfo <- putObjectPart b o uid partNum [] $ - PayloadBS $ LB.toStrict buf - return $ reverse (pInfo:u) + case checkUploadNeeded buf partNum pinfos of + Just pinfo -> loop pinfos uid newSource ps (pinfo:u) + Nothing -> do + pInfo <- putObjectPart b o uid partNum [] $ + PayloadBS $ LB.toStrict buf + + if LB.length buf == size + -- upload the full size part. + then loop pinfos uid newSource ps (pInfo:u) + + -- got a smaller part, so its the last one. + else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.") + finalData <- newSource C.$$+- (limitedSink size) + traceShowM "finalData size:" + traceShowM (LB.length finalData) + return $ reverse (pInfo:u) -- | Looks for incomplete uploads for an object. Returns the first one -- if there are many. getExistingUpload :: Bucket -> Object - -> Minio (Maybe (UploadId, [ListPartInfo])) + -> Minio (Maybe UploadId, Map.Map PartNumber ListPartInfo) getExistingUpload b o = do - uploadsRes <- listIncompleteUploads' b (Just o) Nothing Nothing Nothing - case uiUploadId <$> listToMaybe (lurUploads uploadsRes) of - Nothing -> return Nothing - Just uid -> do - lpr <- listIncompleteParts' b o uid Nothing Nothing - return $ Just (uid, lprParts lpr) + uidMay <- (fmap . fmap) uiUploadId $ + listIncompleteUploads b (Just o) False C.$$ CC.head + parts <- maybe (return []) + (\uid -> listIncompleteParts b o uid C.$$ CC.sinkList) uidMay + return (uidMay, Map.fromList $ map (\p -> (piNumber p, p)) parts)