diff --git a/src/Network/Minio/PutObject.hs b/src/Network/Minio/PutObject.hs index 3cf9183..d5c00b5 100644 --- a/src/Network/Minio/PutObject.hs +++ b/src/Network/Minio/PutObject.hs @@ -25,21 +25,16 @@ module Network.Minio.PutObject ) where -import qualified Data.ByteString as B import qualified Data.Conduit as C -import Data.Conduit.Binary (sourceHandleRange) import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.List as CL import qualified Data.List as List -import qualified Data.Map.Strict as Map import Lib.Prelude import Network.Minio.Data -import Network.Minio.Data.Crypto import Network.Minio.Errors -import Network.Minio.ListOps import Network.Minio.S3API import Network.Minio.Utils @@ -117,35 +112,17 @@ selectPartSizes size = uncurry (List.zip3 [1..]) $ | st + m >= sz = [(st, sz - st)] | otherwise = (st, m) : loop (st + m) sz --- returns partinfo if part is already uploaded. -checkUploadNeeded :: Payload -> PartNumber - -> Map.Map PartNumber ObjectPartInfo - -> Minio (Maybe PartTuple) -checkUploadNeeded payload n pmap = do - (md5hash, pSize) <- case payload of - PayloadBS bs -> return (hashMD5 bs, fromIntegral $ B.length bs) - PayloadH h off size -> fmap (, size) $ - hashMD5FromSource $ sourceHandleRange h (Just $ fromIntegral off) - (Just $ fromIntegral size) - case Map.lookup n pmap of - Nothing -> return Nothing - Just (ObjectPartInfo _ etag size _) -> return $ - bool Nothing (Just (n, etag)) $ - md5hash == encodeUtf8 etag && size == pSize - parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64 -> Minio ETag parallelMultipartUpload b o filePath size = do - (uidMay, pmap) <- getExistingUpload b o - - -- get a new upload id if needed. - uploadId <- maybe (newMultipartUpload b o []) return uidMay + -- get a new upload id. + uploadId <- newMultipartUpload b o [] let partSizeInfo = selectPartSizes size -- perform upload with 10 threads uploadedPartsE <- limitedMapConcurrently 10 - (uploadPart pmap uploadId) partSizeInfo + (uploadPart uploadId) partSizeInfo -- if there were any errors, rethrow exception. mapM_ throwM $ lefts uploadedPartsE @@ -153,22 +130,17 @@ parallelMultipartUpload b o filePath size = do -- if we get here, all parts were successfully uploaded. completeMultipartUpload b o uploadId $ rights uploadedPartsE where - uploadPart pmap uploadId (partNum, offset, sz) = + uploadPart uploadId (partNum, offset, sz) = withNewHandle filePath $ \h -> do let payload = PayloadH h offset sz - pInfoMay <- checkUploadNeeded payload partNum pmap - maybe - (putObjectPart b o uploadId partNum [] payload) - return pInfoMay + putObjectPart b o uploadId partNum [] payload -- | Upload multipart object from conduit source sequentially sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 -> C.Producer Minio ByteString -> Minio ETag sequentialMultipartUpload b o sizeMay src = do - (uidMay, pmap) <- getExistingUpload b o - - -- get a new upload id if needed. - uploadId <- maybe (newMultipartUpload b o []) return uidMay + -- get a new upload id. + uploadId <- newMultipartUpload b o [] -- upload parts in loop let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay @@ -176,35 +148,21 @@ sequentialMultipartUpload b o sizeMay src = do uploadedParts <- src C..| chunkBSConduit sizes C..| CL.map PayloadBS - C..| checkAndUpload uploadId pmap pnums + C..| uploadPart' uploadId pnums C.$$ CC.sinkList -- complete multipart upload completeMultipartUpload b o uploadId uploadedParts where - checkAndUpload _ _ [] = return () - checkAndUpload uid pmap (pn:pns) = do + uploadPart' _ [] = return () + uploadPart' uid (pn:pns) = do payloadMay <- C.await case payloadMay of Nothing -> return () - Just payload -> do partMay <- lift $ checkUploadNeeded payload pn pmap - pinfo <- maybe - (lift $ putObjectPart b o uid pn [] payload) - return partMay + Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload C.yield pinfo - checkAndUpload uid pmap pns - --- | Looks for incomplete uploads for an object. Returns the first one --- if there are many. -getExistingUpload :: Bucket -> Object - -> Minio (Maybe UploadId, Map.Map PartNumber ObjectPartInfo) -getExistingUpload b o = do - uidMay <- (fmap . fmap) uiUploadId $ - listIncompleteUploads b (Just o) False C.$$ CC.head - parts <- maybe (return []) - (\uid -> listIncompleteParts b o uid C.$$ CC.sinkList) uidMay - return (uidMay, Map.fromList $ map (\p -> (opiNumber p, p)) parts) + uploadPart' uid pns -- | Copy an object using single or multipart copy strategy. copyObjectInternal :: Bucket -> Object -> CopyPartSource