diff --git a/nixpkgs.nix b/nixpkgs.nix index 375c84162..6a21dfbda 100644 --- a/nixpkgs.nix +++ b/nixpkgs.nix @@ -4,7 +4,7 @@ import ((nixpkgs {}).fetchFromGitHub { owner = "NixOS"; repo = "nixpkgs"; - rev = "bc00ecedfa709f4fa91d445dd76ecd792cb2c728"; - sha256 = "0plhwb04srr4b0h7w8qlqi207a19szz2wqz6r4gmic856jlkchaa"; + rev = "a7a1447e5d40a9ad90983d33e151f5474eddeed9"; + sha256 = "1zb8wgsq9grrsdcz81y08h45rj8i5r8ckjhg2cv1cqmam4dczcrf"; fetchSubmodules = true; }) diff --git a/src/Utils/Files.hs b/src/Utils/Files.hs index 3b334391c..f5251825e 100644 --- a/src/Utils/Files.hs +++ b/src/Utils/Files.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE BangPatterns #-} + module Utils.Files ( sinkFile, sinkFiles , sinkFile', sinkFiles' @@ -35,6 +37,8 @@ import qualified Database.Esqueleto.Utils as E import Data.Conduit.Algorithms.FastCDC (fastCDC) +import Control.Monad.Trans.Cont + sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => Bool -- ^ Replace? Use only in serializable transaction @@ -43,14 +47,16 @@ sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnlif sinkFileDB doReplace fileContentContent = do chunkingParams <- getsYesod $ view _appFileChunkingParams - let sinkChunk fileContentChunkContent = do + let sinkChunk !fileContentChunkContent = do fileChunkLockTime <- liftIO getCurrentTime fileChunkLockInstance <- getsYesod appInstanceID observeSunkChunk StorageDB $ olength fileContentChunkContent tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. } + existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash] + let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased] if | existsChunk -> lift setContentBased | otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $ @@ -144,7 +150,22 @@ sinkFile File{ fileContent = Nothing, .. } = return FileReference , fileReferenceModified = fileModified } sinkFile File{ fileContent = Just fileContentContent, .. } = do - (unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE + chunk <- liftIO newEmptyTMVarIO + sourceAsync <- allocateLinkedAsync . runConduit $ fileContentContent .| C.mapM_ (atomically . putTMVar chunk) + + isEmpty <- atomically $ + False <$ readTMVar chunk + <|> True <$ waitSTM sourceAsync + + let fileContentContent' = evalContT . callCC $ \finishConsume -> forever $ do + inpChunk <- atomically $ + Right <$> takeTMVar chunk + <|> Left <$> waitCatchSTM sourceAsync + + case inpChunk of + Right inpChunk' -> lift $ yield inpChunk' + Left (Left exc) -> throwM exc + Left (Right res) -> finishConsume res fileContentHash <- if | not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ sinkFileMinio fileContentContent'