PutObject fixes - does resource cleanup properly now.

This commit is contained in:
Aditya Manthramurthy 2017-02-10 15:41:43 +05:30
parent fd66d85167
commit 75743ab946
3 changed files with 44 additions and 38 deletions

View File

@ -7,11 +7,11 @@ module Network.Minio.PutObject
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import qualified Data.Conduit as C import qualified Data.Conduit as C
import Data.Conduit.Binary (sourceHandleRange) import Data.Conduit.Binary (sourceHandleRange)
import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List import qualified Data.List as List
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
@ -144,40 +144,29 @@ sequentialMultipartUpload b o sizeMay src = do
uploadId <- maybe (newMultipartUpload b o []) return uidMay uploadId <- maybe (newMultipartUpload b o []) return uidMay
-- upload parts in loop -- upload parts in loop
uploadedParts <- uploadPartsSequentially b o uploadId pmap sizeMay src let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
(pnums, _, sizes) = List.unzip3 partSizes
uploadedParts <- src
C..| chunkBSConduit sizes
C..| CL.map PayloadBS
C..| checkAndUpload uploadId pmap pnums
C.$$ CC.sinkList
-- complete multipart upload -- complete multipart upload
completeMultipartUpload b o uploadId uploadedParts completeMultipartUpload b o uploadId uploadedParts
uploadPartsSequentially :: Bucket -> Object -> UploadId
-> Map PartNumber ListPartInfo -> Maybe Int64
-> C.Source Minio ByteString -> Minio [PartInfo]
uploadPartsSequentially b o uid pmap sizeMay src' = do
let
rSrc = C.newResumableSource src'
partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
loopIt rSrc partSizes []
where where
-- make a sink that consumes only `s` bytes checkAndUpload _ _ [] = return ()
limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs checkAndUpload uid pmap (pn:pns) = do
payloadMay <- C.await
loopIt _ [] uparts = return $ reverse uparts case payloadMay of
loopIt src ((n, _, size):ps) uparts = do Nothing -> return ()
(newSrc, buf) <- src C.$$++ (limitedSink size) Just payload -> do partMay <- lift $ checkUploadNeeded payload pn pmap
pinfo <- maybe
let buflen = LB.length buf (lift $ putObjectPart b o uid pn [] payload)
payload = PayloadBS $ LB.toStrict buf return partMay
C.yield pinfo
partMay <- checkUploadNeeded payload n pmap checkAndUpload uid pmap pns
if buflen == 0
then return $ reverse uparts
else do pInfo <- maybe (putObjectPart b o uid n [] payload)
return partMay
loopIt newSrc ps (pInfo:uparts)
-- | Looks for incomplete uploads for an object. Returns the first one -- | Looks for incomplete uploads for an object. Returns the first one
-- if there are many. -- if there are many.

View File

@ -3,9 +3,11 @@ module Network.Minio.Utils where
import qualified Control.Concurrent.Async.Lifted as A import qualified Control.Concurrent.Async.Lifted as A
import qualified Control.Concurrent.QSem as Q import qualified Control.Concurrent.QSem as Q
import qualified Control.Exception.Lifted as ExL import qualified Control.Exception.Lifted as ExL
import qualified Control.Monad.Catch as MC
import Control.Monad.Trans.Control (liftBaseOp_, StM) import Control.Monad.Trans.Control (liftBaseOp_, StM)
import qualified Control.Monad.Trans.Resource as R import qualified Control.Monad.Trans.Resource as R
import qualified Control.Monad.Catch as MC
import qualified Data.ByteString as B
import qualified Data.Conduit as C import qualified Data.Conduit as C
import Data.Text.Encoding.Error (lenientDecode) import Data.Text.Encoding.Error (lenientDecode)
import qualified Network.HTTP.Client as NClient import qualified Network.HTTP.Client as NClient
@ -136,3 +138,22 @@ mkQuery k mv = (k,) <$> mv
-- don't use it with mandatory query params with empty value. -- don't use it with mandatory query params with empty value.
mkOptionalParams :: [(Text, Maybe Text)] -> HT.Query mkOptionalParams :: [(Text, Maybe Text)] -> HT.Query
mkOptionalParams params = HT.toQuery $ (uncurry mkQuery) <$> params mkOptionalParams params = HT.toQuery $ (uncurry mkQuery) <$> params
chunkBSConduit :: (Monad m, Integral a)
=> [a] -> C.Conduit ByteString m ByteString
chunkBSConduit s = loop 0 [] s
where
loop _ _ [] = return ()
loop n readChunks (size:sizes) = do
bsMay <- C.await
case bsMay of
Nothing -> if n > 0
then C.yield $ B.concat readChunks
else return ()
Just bs -> if n + fromIntegral (B.length bs) >= size
then do let (a, b) = B.splitAt (fromIntegral $ size - n) bs
chunkBS = B.concat $ readChunks ++ [a]
C.yield chunkBS
loop (fromIntegral $ B.length b) [b] sizes
else loop (n + fromIntegral (B.length bs))
(readChunks ++ [bs]) (size:sizes)

View File

@ -7,7 +7,6 @@ import Lib.Prelude
import System.Directory (getTemporaryDirectory) import System.Directory (getTemporaryDirectory)
import qualified System.IO as SIO import qualified System.IO as SIO
import System.IO.Temp (openBinaryTempFile, withSystemTempDirectory)
import qualified Control.Monad.Trans.Resource as R import qualified Control.Monad.Trans.Resource as R
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
@ -225,10 +224,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
, funTestWithBucket "multipart" $ \step bucket -> do , funTestWithBucket "multipart" $ \step bucket -> do
step "upload large object" step "upload large object"
-- fPutObject bucket "big" "/tmp/large" void $ putObject bucket "big" (ODFile "/dev/zero" $ Just $ 1024*1024*100)
-- putObject bucket "big" ("/dev/zero")
etag <- putObject bucket "big" (ODFile "/dev/zero" $ Just $ 1024*1024*100)
traceShowM etag
step "cleanup" step "cleanup"
deleteObject bucket "big" deleteObject bucket "big"
@ -245,7 +241,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
step "put object parts 1..10" step "put object parts 1..10"
inputFile <- mkRandFile mb15 inputFile <- mkRandFile mb15
h <- liftIO $ SIO.openBinaryFile inputFile SIO.ReadMode h <- liftIO $ SIO.openBinaryFile inputFile SIO.ReadMode
forM [1..10] $ \pnum -> forM_ [1..10] $ \pnum ->
putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15 putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15
step "fetch list parts" step "fetch list parts"
@ -296,7 +292,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
step "put object parts 1..10" step "put object parts 1..10"
inputFile <- mkRandFile mb15 inputFile <- mkRandFile mb15
h <- liftIO $ SIO.openBinaryFile inputFile SIO.ReadMode h <- liftIO $ SIO.openBinaryFile inputFile SIO.ReadMode
forM [1..10] $ \pnum -> forM_ [1..10] $ \pnum ->
putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15 putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15
step "fetch list parts" step "fetch list parts"