Make parallel upload also resume an existing upload

This commit is contained in:
Aditya Manthramurthy 2017-02-08 14:48:58 +05:30
parent d17d6f216d
commit 688f326b6e
3 changed files with 74 additions and 43 deletions

View File

@ -43,6 +43,7 @@ library
, cryptonite-conduit
, data-default
, exceptions
, extra
, filepath
, http-client
, http-conduit
@ -87,6 +88,7 @@ test-suite minio-hs-test
, cryptonite-conduit
, data-default
, exceptions
, extra
, filepath
, http-client
, http-conduit

View File

@ -4,6 +4,7 @@ module Network.Minio.Data.Crypto
, hashSHA256FromSource
, hashMD5
, hashMD5FromSource
, hmacSHA256
, hmacSHA256RawBS
@ -32,6 +33,18 @@ hashSHA256FromSource src = do
sinkSHA256Hash :: Monad m => C.Consumer ByteString m (Digest SHA256)
sinkSHA256Hash = sinkHash
hashMD5 :: ByteString -> ByteString
hashMD5 = digestToBase16 . hashWith MD5
hashMD5FromSource :: Monad m => C.Producer m ByteString -> m ByteString
hashMD5FromSource src = do
digest <- src C.$$ sinkMD5Hash
return $ digestToBase16 digest
where
-- To help with type inference
sinkMD5Hash :: Monad m => C.Consumer ByteString m (Digest MD5)
sinkMD5Hash = sinkHash
hmacSHA256 :: ByteString -> ByteString -> HMAC SHA256
hmacSHA256 message key = hmac key message
@ -43,6 +56,3 @@ digestToBS = convert
digestToBase16 :: ByteArrayAccess a => a -> ByteString
digestToBase16 = convertToBase Base16
hashMD5 :: ByteString -> ByteString
hashMD5 = digestToBase16 . hashWith MD5

View File

@ -5,11 +5,14 @@ module Network.Minio.PutObject
) where
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.Binary as CB
import qualified Data.List as List
import Control.Monad.Extra (loopM)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import qualified Data.Conduit as C
import Data.Conduit.Binary (sourceHandleRange)
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import Lib.Prelude
@ -47,6 +50,7 @@ data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and option
-- | Put an object from ObjectData. This high-level API handles
-- objects of all sizes, and even if the object size is unknown.
putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag
putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src
putObject b o (ODFile fp sizeMay) = do
hResE <- withNewHandle fp $ \h ->
liftM2 (,) (isHandleSeekable h) (getFileSize h)
@ -72,7 +76,6 @@ putObject b o (ODFile fp sizeMay) = do
| isSeekable -> parallelMultipartUpload b o fp size
| otherwise -> sequentialMultipartUpload b o (Just size) $
CB.sourceFile fp
putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src
-- | Select part sizes - the logic is that the minimum part-size will
-- be 64MiB. TODO: write quickcheck tests.
@ -85,16 +88,35 @@ selectPartSizes size = List.zip3 [1..] partOffsets partSizes
partSizes = replicate (fromIntegral numParts) partSize ++ lastPart
partOffsets = List.scanl' (+) 0 partSizes
-- returns partinfo if part is already uploaded.
checkUploadNeeded :: Payload -> PartNumber
-> Map.Map PartNumber ListPartInfo
-> Minio (Maybe PartInfo)
checkUploadNeeded payload n pmap = do
(md5hash, pSize) <- case payload of
PayloadBS bs -> return (hashMD5 bs, fromIntegral $ B.length bs)
PayloadH h off size -> liftM (, size) $
hashMD5FromSource $ sourceHandleRange h (Just $ fromIntegral off)
(Just $ fromIntegral size)
case Map.lookup n pmap of
Nothing -> return Nothing
Just (ListPartInfo _ etag size _) -> return $
bool Nothing (Just (PartInfo n etag)) $
md5hash == encodeUtf8 etag && size == pSize
parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64
-> Minio ETag
parallelMultipartUpload b o filePath size = do
(uidMay, pmap) <- getExistingUpload b o
-- get a new upload id if needed.
uploadId <- maybe (newMultipartUpload b o []) return uidMay
let partSizeInfo = selectPartSizes size
-- get new upload id.
uploadId <- newMultipartUpload b o []
-- perform upload with 10 threads
uploadedPartsE <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo
uploadedPartsE <- limitedMapConcurrently 10
(uploadPart pmap uploadId) partSizeInfo
-- if there were any errors, rethrow exception.
mapM_ throwM $ lefts uploadedPartsE
@ -102,20 +124,29 @@ parallelMultipartUpload b o filePath size = do
-- if we get here, all parts were successfully uploaded.
completeMultipartUpload b o uploadId $ rights uploadedPartsE
where
uploadPart uploadId (partNum, offset, sz) = withNewHandle filePath $
\h -> putObjectPart b o uploadId partNum [] $ PayloadH h offset sz
uploadPart pmap uploadId (partNum, offset, sz) =
withNewHandle filePath $ \h -> do
let payload = PayloadH h offset sz
pInfoMay <- checkUploadNeeded payload partNum pmap
maybe
(putObjectPart b o uploadId partNum [] payload)
return pInfoMay
-- | Upload multipart object from conduit source sequentially
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
-> C.Producer Minio ByteString -> Minio ETag
sequentialMultipartUpload b o sizeMay src = do
(uidMay, pinfos) <- getExistingUpload b o
(uidMay, pmap) <- getExistingUpload b o
-- get a new upload id if needed.
uploadId <- maybe (newMultipartUpload b o []) return uidMay
-- upload parts in loop
uploadedParts <- loop pinfos uploadId rSrc partSizeInfo []
let
rSrc = C.newResumableSource src
partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay
uploadedParts <- loopM (loopFunc pmap uploadId rSrc) (partSizeInfo, [])
-- complete multipart upload
completeMultipartUpload b o uploadId uploadedParts
@ -123,44 +154,32 @@ sequentialMultipartUpload b o sizeMay src = do
rSrc = C.newResumableSource src
partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay
-- returns partinfo if part is already uploaded.
checkUploadNeeded :: LByteString -> PartNumber
-> Map.Map PartNumber ListPartInfo
-> Maybe PartInfo
checkUploadNeeded lbs n pmap = do
pinfo@(ListPartInfo _ etag size _) <- Map.lookup n pmap
bool Nothing (return (PartInfo n etag)) $
LB.length lbs == size &&
hashMD5 (LB.toStrict lbs) == encodeUtf8 etag
-- make a sink that consumes only `s` bytes
limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs
-- FIXME: test, confirm and remove traceShowM statements
loop _ _ _ [] uparts = return $ reverse uparts
loop pinfos uid rSource ((partNum, _, size):ps) u = do
-- load data from resume-able source into bytestring.
loopFunc pmap uid rSource ([], uparts) = return $ Right $ reverse uparts
loopFunc pmap uid rSource (((partNum, _, size):ps), uparts) = do
(newSource, buf) <- rSource C.$$++ (limitedSink size)
traceShowM "psize: "
traceShowM (LB.length buf)
case checkUploadNeeded buf partNum pinfos of
Just pinfo -> loop pinfos uid newSource ps (pinfo:u)
let payload = PayloadBS $ LB.toStrict buf
partMay <- checkUploadNeeded payload partNum pmap
case partMay of
Just pinfo -> return $ Left (ps, pinfo:uparts)
Nothing -> do
pInfo <- putObjectPart b o uid partNum [] $
PayloadBS $ LB.toStrict buf
-- upload the part
pInfo <- putObjectPart b o uid partNum [] payload
if LB.length buf == size
-- upload the full size part.
then loop pinfos uid newSource ps (pInfo:u)
then return $ Left (ps, pInfo:uparts)
-- got a smaller part, so its the last one.
else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.")
finalData <- newSource C.$$+- (limitedSink size)
traceShowM "finalData size:"
traceShowM (LB.length finalData)
return $ reverse (pInfo:u)
-- got a smaller part, so its the last one.
else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.")
finalData <- newSource C.$$+- (limitedSink size)
traceShowM "finalData size:"
traceShowM (LB.length finalData)
return $ Right $ reverse (pInfo:uparts)
-- | Looks for incomplete uploads for an object. Returns the first one
-- if there are many.