fix: work around conduit-bug releasing fh to early
This commit is contained in:
parent
ca29a66330
commit
3ff2cf1fec
@ -4,7 +4,7 @@
|
|||||||
import ((nixpkgs {}).fetchFromGitHub {
|
import ((nixpkgs {}).fetchFromGitHub {
|
||||||
owner = "NixOS";
|
owner = "NixOS";
|
||||||
repo = "nixpkgs";
|
repo = "nixpkgs";
|
||||||
rev = "bc00ecedfa709f4fa91d445dd76ecd792cb2c728";
|
rev = "a7a1447e5d40a9ad90983d33e151f5474eddeed9";
|
||||||
sha256 = "0plhwb04srr4b0h7w8qlqi207a19szz2wqz6r4gmic856jlkchaa";
|
sha256 = "1zb8wgsq9grrsdcz81y08h45rj8i5r8ckjhg2cv1cqmam4dczcrf";
|
||||||
fetchSubmodules = true;
|
fetchSubmodules = true;
|
||||||
})
|
})
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
module Utils.Files
|
module Utils.Files
|
||||||
( sinkFile, sinkFiles
|
( sinkFile, sinkFiles
|
||||||
, sinkFile', sinkFiles'
|
, sinkFile', sinkFiles'
|
||||||
@ -35,6 +37,8 @@ import qualified Database.Esqueleto.Utils as E
|
|||||||
|
|
||||||
import Data.Conduit.Algorithms.FastCDC (fastCDC)
|
import Data.Conduit.Algorithms.FastCDC (fastCDC)
|
||||||
|
|
||||||
|
import Control.Monad.Trans.Cont
|
||||||
|
|
||||||
|
|
||||||
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
|
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
|
||||||
=> Bool -- ^ Replace? Use only in serializable transaction
|
=> Bool -- ^ Replace? Use only in serializable transaction
|
||||||
@ -43,14 +47,16 @@ sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnlif
|
|||||||
sinkFileDB doReplace fileContentContent = do
|
sinkFileDB doReplace fileContentContent = do
|
||||||
chunkingParams <- getsYesod $ view _appFileChunkingParams
|
chunkingParams <- getsYesod $ view _appFileChunkingParams
|
||||||
|
|
||||||
let sinkChunk fileContentChunkContent = do
|
let sinkChunk !fileContentChunkContent = do
|
||||||
fileChunkLockTime <- liftIO getCurrentTime
|
fileChunkLockTime <- liftIO getCurrentTime
|
||||||
fileChunkLockInstance <- getsYesod appInstanceID
|
fileChunkLockInstance <- getsYesod appInstanceID
|
||||||
|
|
||||||
observeSunkChunk StorageDB $ olength fileContentChunkContent
|
observeSunkChunk StorageDB $ olength fileContentChunkContent
|
||||||
|
|
||||||
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
||||||
|
|
||||||
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
||||||
|
|
||||||
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
||||||
if | existsChunk -> lift setContentBased
|
if | existsChunk -> lift setContentBased
|
||||||
| otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $
|
| otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $
|
||||||
@ -144,7 +150,22 @@ sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
|||||||
, fileReferenceModified = fileModified
|
, fileReferenceModified = fileModified
|
||||||
}
|
}
|
||||||
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||||
(unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE
|
chunk <- liftIO newEmptyTMVarIO
|
||||||
|
sourceAsync <- allocateLinkedAsync . runConduit $ fileContentContent .| C.mapM_ (atomically . putTMVar chunk)
|
||||||
|
|
||||||
|
isEmpty <- atomically $
|
||||||
|
False <$ readTMVar chunk
|
||||||
|
<|> True <$ waitSTM sourceAsync
|
||||||
|
|
||||||
|
let fileContentContent' = evalContT . callCC $ \finishConsume -> forever $ do
|
||||||
|
inpChunk <- atomically $
|
||||||
|
Right <$> takeTMVar chunk
|
||||||
|
<|> Left <$> waitCatchSTM sourceAsync
|
||||||
|
|
||||||
|
case inpChunk of
|
||||||
|
Right inpChunk' -> lift $ yield inpChunk'
|
||||||
|
Left (Left exc) -> throwM exc
|
||||||
|
Left (Right res) -> finishConsume res
|
||||||
|
|
||||||
fileContentHash <- if
|
fileContentHash <- if
|
||||||
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ sinkFileMinio fileContentContent'
|
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ sinkFileMinio fileContentContent'
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user