Remove resuming capabilities of PutObject (#67)

This commit is contained in:
Harshavardhana 2017-10-15 14:05:37 -07:00 committed by Aditya Manthramurthy
parent e995f80052
commit 2b816b7092

View File

@ -25,21 +25,16 @@ module Network.Minio.PutObject
) where ) where
import qualified Data.ByteString as B
import qualified Data.Conduit as C import qualified Data.Conduit as C
import Data.Conduit.Binary (sourceHandleRange)
import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL import qualified Data.Conduit.List as CL
import qualified Data.List as List import qualified Data.List as List
import qualified Data.Map.Strict as Map
import Lib.Prelude import Lib.Prelude
import Network.Minio.Data import Network.Minio.Data
import Network.Minio.Data.Crypto
import Network.Minio.Errors import Network.Minio.Errors
import Network.Minio.ListOps
import Network.Minio.S3API import Network.Minio.S3API
import Network.Minio.Utils import Network.Minio.Utils
@ -117,35 +112,17 @@ selectPartSizes size = uncurry (List.zip3 [1..]) $
| st + m >= sz = [(st, sz - st)] | st + m >= sz = [(st, sz - st)]
| otherwise = (st, m) : loop (st + m) sz | otherwise = (st, m) : loop (st + m) sz
-- returns partinfo if part is already uploaded.
checkUploadNeeded :: Payload -> PartNumber
-> Map.Map PartNumber ObjectPartInfo
-> Minio (Maybe PartTuple)
checkUploadNeeded payload n pmap = do
(md5hash, pSize) <- case payload of
PayloadBS bs -> return (hashMD5 bs, fromIntegral $ B.length bs)
PayloadH h off size -> fmap (, size) $
hashMD5FromSource $ sourceHandleRange h (Just $ fromIntegral off)
(Just $ fromIntegral size)
case Map.lookup n pmap of
Nothing -> return Nothing
Just (ObjectPartInfo _ etag size _) -> return $
bool Nothing (Just (n, etag)) $
md5hash == encodeUtf8 etag && size == pSize
parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64 parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64
-> Minio ETag -> Minio ETag
parallelMultipartUpload b o filePath size = do parallelMultipartUpload b o filePath size = do
(uidMay, pmap) <- getExistingUpload b o -- get a new upload id.
uploadId <- newMultipartUpload b o []
-- get a new upload id if needed.
uploadId <- maybe (newMultipartUpload b o []) return uidMay
let partSizeInfo = selectPartSizes size let partSizeInfo = selectPartSizes size
-- perform upload with 10 threads -- perform upload with 10 threads
uploadedPartsE <- limitedMapConcurrently 10 uploadedPartsE <- limitedMapConcurrently 10
(uploadPart pmap uploadId) partSizeInfo (uploadPart uploadId) partSizeInfo
-- if there were any errors, rethrow exception. -- if there were any errors, rethrow exception.
mapM_ throwM $ lefts uploadedPartsE mapM_ throwM $ lefts uploadedPartsE
@ -153,22 +130,17 @@ parallelMultipartUpload b o filePath size = do
-- if we get here, all parts were successfully uploaded. -- if we get here, all parts were successfully uploaded.
completeMultipartUpload b o uploadId $ rights uploadedPartsE completeMultipartUpload b o uploadId $ rights uploadedPartsE
where where
uploadPart pmap uploadId (partNum, offset, sz) = uploadPart uploadId (partNum, offset, sz) =
withNewHandle filePath $ \h -> do withNewHandle filePath $ \h -> do
let payload = PayloadH h offset sz let payload = PayloadH h offset sz
pInfoMay <- checkUploadNeeded payload partNum pmap putObjectPart b o uploadId partNum [] payload
maybe
(putObjectPart b o uploadId partNum [] payload)
return pInfoMay
-- | Upload multipart object from conduit source sequentially -- | Upload multipart object from conduit source sequentially
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
-> C.Producer Minio ByteString -> Minio ETag -> C.Producer Minio ByteString -> Minio ETag
sequentialMultipartUpload b o sizeMay src = do sequentialMultipartUpload b o sizeMay src = do
(uidMay, pmap) <- getExistingUpload b o -- get a new upload id.
uploadId <- newMultipartUpload b o []
-- get a new upload id if needed.
uploadId <- maybe (newMultipartUpload b o []) return uidMay
-- upload parts in loop -- upload parts in loop
let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
@ -176,35 +148,21 @@ sequentialMultipartUpload b o sizeMay src = do
uploadedParts <- src uploadedParts <- src
C..| chunkBSConduit sizes C..| chunkBSConduit sizes
C..| CL.map PayloadBS C..| CL.map PayloadBS
C..| checkAndUpload uploadId pmap pnums C..| uploadPart' uploadId pnums
C.$$ CC.sinkList C.$$ CC.sinkList
-- complete multipart upload -- complete multipart upload
completeMultipartUpload b o uploadId uploadedParts completeMultipartUpload b o uploadId uploadedParts
where where
checkAndUpload _ _ [] = return () uploadPart' _ [] = return ()
checkAndUpload uid pmap (pn:pns) = do uploadPart' uid (pn:pns) = do
payloadMay <- C.await payloadMay <- C.await
case payloadMay of case payloadMay of
Nothing -> return () Nothing -> return ()
Just payload -> do partMay <- lift $ checkUploadNeeded payload pn pmap Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload
pinfo <- maybe
(lift $ putObjectPart b o uid pn [] payload)
return partMay
C.yield pinfo C.yield pinfo
checkAndUpload uid pmap pns uploadPart' uid pns
-- | Looks for incomplete uploads for an object. Returns the first one
-- if there are many.
getExistingUpload :: Bucket -> Object
-> Minio (Maybe UploadId, Map.Map PartNumber ObjectPartInfo)
getExistingUpload b o = do
uidMay <- (fmap . fmap) uiUploadId $
listIncompleteUploads b (Just o) False C.$$ CC.head
parts <- maybe (return [])
(\uid -> listIncompleteParts b o uid C.$$ CC.sinkList) uidMay
return (uidMay, Map.fromList $ map (\p -> (opiNumber p, p)) parts)
-- | Copy an object using single or multipart copy strategy. -- | Copy an object using single or multipart copy strategy.
copyObjectInternal :: Bucket -> Object -> CopyPartSource copyObjectInternal :: Bucket -> Object -> CopyPartSource