Makes sequential uploads resumable.

This commit is contained in:
Aditya Manthramurthy 2017-02-07 14:56:28 +05:30
parent 43bfabd186
commit c430e3d747
7 changed files with 120 additions and 86 deletions

View File

@ -24,6 +24,7 @@ library
, Network.Minio.Data.ByteString
, Network.Minio.Data.Crypto
, Network.Minio.Data.Time
, Network.Minio.ListOps
, Network.Minio.PutObject
, Network.Minio.Sign.V4
, Network.Minio.Utils
@ -59,10 +60,10 @@ library
default-language: Haskell2010
default-extensions: FlexibleContexts
, FlexibleInstances
, OverloadedStrings
, NoImplicitPrelude
, MultiParamTypeClasses
, MultiWayIf
, NoImplicitPrelude
, OverloadedStrings
, RankNTypes
, ScopedTypeVariables
, TypeFamilies
@ -124,6 +125,7 @@ test-suite minio-hs-test
, Network.Minio.Data.ByteString
, Network.Minio.Data.Crypto
, Network.Minio.Data.Time
, Network.Minio.ListOps
, Network.Minio.PutObject
, Network.Minio.S3API
, Network.Minio.Sign.V4

View File

@ -11,6 +11,6 @@ module Lib.Prelude
import Protolude as Exports
import Data.Time as Exports (UTCTime)
import Data.Maybe as Exports (catMaybes, listToMaybe)
import Control.Monad.Trans.Maybe as Exports (runMaybeT, MaybeT(..))
import Control.Monad.Catch as Exports (throwM, MonadThrow, MonadCatch)

View File

@ -40,17 +40,15 @@ module Network.Minio
This module exports the high-level Minio API for object storage.
-}
-- import qualified Control.Monad.Trans.Resource as R
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.ListOps
import Network.Minio.PutObject
import Network.Minio.S3API
-- import Network.Minio.Utils
-- | Fetch the object and write it to the given file safely. The
-- object is first written to a temporary file in the same directory
@ -64,46 +62,3 @@ fGetObject bucket object fp = do
fPutObject :: Bucket -> Object -> FilePath -> Minio ()
fPutObject bucket object f = void $ putObject bucket object $
ODFile f Nothing
-- | List objects in a bucket matching the given prefix. If recurse is
-- set to True objects matching prefix are recursively listed.
listObjects :: Bucket -> Maybe Text -> Bool -> C.Producer Minio ObjectInfo
listObjects bucket prefix recurse = loop Nothing
where
loop :: Maybe Text -> C.Producer Minio ObjectInfo
loop nextToken = do
let
delimiter = bool (Just "/") Nothing recurse
res <- lift $ listObjects' bucket prefix nextToken delimiter
CL.sourceList $ lorObjects res
when (lorHasMore res) $
loop (lorNextToken res)
-- | List incomplete uploads in a bucket matching the given prefix. If
-- recurse is set to True incomplete uploads for the given prefix are
-- recursively listed.
listIncompleteUploads :: Bucket -> Maybe Text -> Bool -> C.Producer Minio UploadInfo
listIncompleteUploads bucket prefix recurse = loop Nothing Nothing
where
loop :: Maybe Text -> Maybe Text -> C.Producer Minio UploadInfo
loop nextKeyMarker nextUploadIdMarker = do
let
delimiter = bool (Just "/") Nothing recurse
res <- lift $ listIncompleteUploads' bucket prefix delimiter nextKeyMarker nextUploadIdMarker
CL.sourceList $ lurUploads res
when (lurHasMore res) $
loop nextKeyMarker nextUploadIdMarker
-- | List object parts of an ongoing multipart upload for given
-- bucket, object and uploadId.
listIncompleteParts :: Bucket -> Object -> UploadId -> C.Producer Minio ListPartInfo
listIncompleteParts bucket object uploadId = loop Nothing
where
loop :: Maybe Text -> C.Producer Minio ListPartInfo
loop nextPartMarker = do
res <- lift $ listIncompleteParts' bucket object uploadId Nothing nextPartMarker
CL.sourceList $ lprParts res
when (lprHasMore res) $
loop (show <$> lprNextPart res)

View File

@ -78,7 +78,7 @@ data ListPartsResult = ListPartsResult {
-- | Represents information about an object part in an ongoing
-- multipart upload.
data ListPartInfo = ListPartInfo {
piNumber :: Int
piNumber :: PartNumber
, piETag :: ETag
, piSize :: Int64
, piModTime :: UTCTime
@ -188,10 +188,8 @@ runMinio :: ConnectInfo -> Minio a -> ResourceT IO (Either MinioErr a)
runMinio ci m = do
conn <- liftIO $ connect ci
flip runReaderT conn . unMinio $
(m >>= (return . Right))
`MC.catch` handlerME
`MC.catch` handlerHE
`MC.catch` handlerFE
(m >>= (return . Right)) `MC.catches`
[MC.Handler handlerME, MC.Handler handlerHE, MC.Handler handlerFE]
where
handlerME = return . Left . ME
handlerHE = return . Left . MEHttp

View File

@ -2,13 +2,16 @@ module Network.Minio.Data.Crypto
(
hashSHA256
, hashSHA256FromSource
, hashMD5
, hmacSHA256
, hmacSHA256RawBS
, digestToBS
, digestToBase16
) where
import Crypto.Hash (SHA256(..), hashWith, Digest)
import Crypto.Hash (SHA256(..), MD5(..), hashWith, Digest)
import Crypto.Hash.Conduit (sinkHash)
import Crypto.MAC.HMAC (hmac, HMAC)
import Data.ByteArray (ByteArrayAccess, convert)
@ -40,3 +43,6 @@ digestToBS = convert
digestToBase16 :: ByteArrayAccess a => a -> ByteString
digestToBase16 = convertToBase Base16
hashMD5 :: ByteString -> ByteString
hashMD5 = digestToBase16 . hashWith MD5

View File

@ -0,0 +1,56 @@
module Network.Minio.ListOps where
import qualified Data.Conduit as C
import qualified Data.Conduit.List as CL
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.S3API
-- | List objects in a bucket matching the given prefix. If recurse is
-- set to True objects matching prefix are recursively listed.
listObjects :: Bucket -> Maybe Text -> Bool -> C.Producer Minio ObjectInfo
listObjects bucket prefix recurse = loop Nothing
where
loop :: Maybe Text -> C.Producer Minio ObjectInfo
loop nextToken = do
let
delimiter = bool (Just "/") Nothing recurse
res <- lift $ listObjects' bucket prefix nextToken delimiter
CL.sourceList $ lorObjects res
when (lorHasMore res) $
loop (lorNextToken res)
-- | List incomplete uploads in a bucket matching the given prefix. If
-- recurse is set to True incomplete uploads for the given prefix are
-- recursively listed.
listIncompleteUploads :: Bucket -> Maybe Text -> Bool
-> C.Producer Minio UploadInfo
listIncompleteUploads bucket prefix recurse = loop Nothing Nothing
where
loop :: Maybe Text -> Maybe Text -> C.Producer Minio UploadInfo
loop nextKeyMarker nextUploadIdMarker = do
let
delimiter = bool (Just "/") Nothing recurse
res <- lift $ listIncompleteUploads' bucket prefix delimiter
nextKeyMarker nextUploadIdMarker
CL.sourceList $ lurUploads res
when (lurHasMore res) $
loop nextKeyMarker nextUploadIdMarker
-- | List object parts of an ongoing multipart upload for given
-- bucket, object and uploadId.
listIncompleteParts :: Bucket -> Object -> UploadId
-> C.Producer Minio ListPartInfo
listIncompleteParts bucket object uploadId = loop Nothing
where
loop :: Maybe Text -> C.Producer Minio ListPartInfo
loop nextPartMarker = do
res <- lift $ listIncompleteParts' bucket object uploadId Nothing
nextPartMarker
CL.sourceList $ lprParts res
when (lprHasMore res) $
loop (show <$> lprNextPart res)

View File

@ -6,19 +6,24 @@ module Network.Minio.PutObject
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.Binary as CB
import qualified Data.List as List
import qualified Data.ByteString.Lazy as LB
import qualified Data.Map.Strict as Map
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Data.Crypto
import Network.Minio.ListOps
import Network.Minio.S3API
import Network.Minio.Utils
-- | max obj size is 5TiB
maxObjectSize :: Int64
maxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
maxObjectSize = 5 * 1024 * 1024 * oneMiB
oneMiB :: Int64
oneMiB = 1024 * 1024
@ -43,10 +48,8 @@ data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and option
-- objects of all sizes, and even if the object size is unknown.
putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag
putObject b o (ODFile fp sizeMay) = do
hResE <- withNewHandle fp $ \h -> do
isSeekable <- isHandleSeekable h
handleSizeMay <- getFileSize h
return (isSeekable, handleSizeMay)
hResE <- withNewHandle fp $ \h ->
liftM2 (,) (isHandleSeekable h) (getFileSize h)
(isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
hResE
@ -106,11 +109,13 @@ parallelMultipartUpload b o filePath size = do
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
-> C.Producer Minio ByteString -> Minio ETag
sequentialMultipartUpload b o sizeMay src = do
-- get new upload id.
uploadId <- newMultipartUpload b o []
(uidMay, pinfos) <- getExistingUpload b o
-- get a new upload id if needed.
uploadId <- maybe (newMultipartUpload b o []) return uidMay
-- upload parts in loop
uploadedParts <- loop uploadId rSrc partSizeInfo []
uploadedParts <- loop pinfos uploadId rSrc partSizeInfo []
-- complete multipart upload
completeMultipartUpload b o uploadId uploadedParts
@ -118,40 +123,52 @@ sequentialMultipartUpload b o sizeMay src = do
rSrc = C.newResumableSource src
partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay
-- returns partinfo if part is already uploaded.
checkUploadNeeded :: LByteString -> PartNumber
-> Map.Map PartNumber ListPartInfo
-> Maybe PartInfo
checkUploadNeeded lbs n pmap = do
pinfo@(ListPartInfo _ etag size _) <- Map.lookup n pmap
bool Nothing (return (PartInfo n etag)) $
LB.length lbs == size &&
hashMD5 (LB.toStrict lbs) == encodeUtf8 etag
-- make a sink that consumes only `s` bytes
limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs
-- FIXME: test, confirm and remove traceShowM statements
loop _ _ [] uploadedParts = return $ reverse uploadedParts
loop uid rSource ((partNum, _, size):ps) u = do
loop _ _ _ [] uparts = return $ reverse uparts
loop pinfos uid rSource ((partNum, _, size):ps) u = do
-- load data from resume-able source into bytestring.
(newSource, buf) <- rSource C.$$++ (limitedSink size)
traceShowM "psize: "
traceShowM (LB.length buf)
-- check if we got size bytes.
if LB.length buf == size
-- upload the full size part.
then do pInfo <- putObjectPart b o uid partNum [] $
PayloadBS $ LB.toStrict buf
loop uid newSource ps (pInfo:u)
-- got a smaller part, so its the last one.
else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.")
finalData <- newSource C.$$+- (limitedSink size)
traceShowM "finalData size:"
traceShowM (LB.length finalData)
pInfo <- putObjectPart b o uid partNum [] $
PayloadBS $ LB.toStrict buf
return $ reverse (pInfo:u)
case checkUploadNeeded buf partNum pinfos of
Just pinfo -> loop pinfos uid newSource ps (pinfo:u)
Nothing -> do
pInfo <- putObjectPart b o uid partNum [] $
PayloadBS $ LB.toStrict buf
if LB.length buf == size
-- upload the full size part.
then loop pinfos uid newSource ps (pInfo:u)
-- got a smaller part, so its the last one.
else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.")
finalData <- newSource C.$$+- (limitedSink size)
traceShowM "finalData size:"
traceShowM (LB.length finalData)
return $ reverse (pInfo:u)
-- | Looks for incomplete uploads for an object. Returns the first one
-- if there are many.
getExistingUpload :: Bucket -> Object
-> Minio (Maybe (UploadId, [ListPartInfo]))
-> Minio (Maybe UploadId, Map.Map PartNumber ListPartInfo)
getExistingUpload b o = do
uploadsRes <- listIncompleteUploads' b (Just o) Nothing Nothing Nothing
case uiUploadId <$> listToMaybe (lurUploads uploadsRes) of
Nothing -> return Nothing
Just uid -> do
lpr <- listIncompleteParts' b o uid Nothing Nothing
return $ Just (uid, lprParts lpr)
uidMay <- (fmap . fmap) uiUploadId $
listIncompleteUploads b (Just o) False C.$$ CC.head
parts <- maybe (return [])
(\uid -> listIncompleteParts b o uid C.$$ CC.sinkList) uidMay
return (uidMay, Map.fromList $ map (\p -> (piNumber p, p)) parts)