{-# LANGUAGE BangPatterns #-} module Utils.Files ( sinkFile, sinkFiles , sinkFile', sinkFiles' , FileUploads , replaceFileReferences, replaceFileReferences' , sinkFileDB, sinkFileMinio , isEmptyFileReference ) where import Import.NoFoundation import Utils.Metrics import Foundation.Type import Handler.Utils.Minio import qualified Network.Minio as Minio import qualified Crypto.Hash as Crypto (hash) import qualified Crypto.Hash.Conduit as Crypto (sinkHash) import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.List as C (unfoldM) import qualified Data.Map.Lazy as Map import qualified Data.Set as Set import Control.Monad.State.Class (modify) import qualified Data.Sequence as Seq import Database.Persist.Sql (deleteWhereCount) import Control.Monad.Trans.Resource (allocate) import qualified Data.UUID.V4 as UUID import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E import Data.Conduit.Algorithms.FastCDC (fastCDC) import Control.Monad.Trans.Cont sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => Bool -- ^ Replace? Use only in serializable transaction -> ConduitT () ByteString (SqlPersistT m) () -> SqlPersistT m FileContentReference sinkFileDB doReplace fileContentContent = do chunkingParams <- getsYesod $ view _appFileChunkingParams let sinkChunk !fileContentChunkContent = do fileChunkLockTime <- liftIO getCurrentTime fileChunkLockInstance <- getsYesod appInstanceID observeSunkChunk StorageDB $ olength fileContentChunkContent tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. } existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash] let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased] if | existsChunk -> lift setContentBased | otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $ insert_ FileContentChunk{..} return $ FileContentChunkKey fileContentChunkHash where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent ((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ transPipe lift fileContentContent .| fastCDC chunkingParams .| C.mapM (\c -> (c, ) <$> sinkChunk c) .| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton)) void . withUnliftIO $ \UnliftIO{..} -> let takeLock = do fileLockTime <- liftIO getCurrentTime fileLockInstance <- getsYesod appInstanceID insert FileLock{ fileLockContent = fileContentHash, .. } releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ()) in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock) deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ] let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_ [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. } | fileContentEntryChunkHash <- otoList fileContentChunks | fileContentEntryIx <- [0..] ] if | not doReplace -> unlessM entryExists insertEntries | otherwise -> do deleteWhere [ FileContentEntryHash ==. fileContentHash ] insertEntries return fileContentHash where fileContentChunkContentBased = True sinkFileMinio :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m) => ConduitT () ByteString m () -> MaybeT m FileContentReference -- ^ Cannot deal with zero length uploads sinkFileMinio fileContentContent = do uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket chunk <- liftIO newEmptyMVar let putChunks = do nextChunk <- await case nextChunk of Nothing -> putMVar chunk Nothing Just nextChunk' -> do observeSunkChunk StorageMinio $ olength nextChunk' putMVar chunk $ Just nextChunk' yield nextChunk' putChunks sinkAsync <- lift . allocateLinkedAsync . runConduit $ fileContentContent .| putChunks .| Crypto.sinkHash runAppMinio $ do tmpUUID <- liftIO UUID.nextRandom let uploadName = ".tmp." <> toPathPiece tmpUUID pooOptions = Minio.defaultPutObjectOptions { Minio.pooCacheControl = Just "immutable" } Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions fileContentHash <- review _Wrapped <$> waitAsync sinkAsync let dstName = minioFileReference # fileContentHash copySrc = Minio.defaultSourceInfo { Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName } copyDst = Minio.defaultDestinationInfo { Minio.dstBucket = uploadBucket , Minio.dstObject = dstName } uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions unless uploadExists $ Minio.copyObject copyDst copySrc Minio.removeObject uploadBucket uploadName return fileContentHash sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) () sinkFiles = C.mapM sinkFile sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference sinkFile File{ fileContent = Nothing, .. } = return FileReference { fileReferenceContent = Nothing , fileReferenceTitle = fileTitle , fileReferenceModified = fileModified } sinkFile File{ fileContent = Just fileContentContent, .. } = do chunk <- liftIO newEmptyTMVarIO sourceAsync <- allocateLinkedAsync . runConduit $ fileContentContent .| C.mapM_ (atomically . putTMVar chunk) isEmpty <- atomically $ False <$ readTMVar chunk <|> True <$ waitSTM sourceAsync let fileContentContent' = evalContT . callCC $ \finishConsume -> forever $ do inpChunk <- atomically $ Right <$> takeTMVar chunk <|> Left <$> waitCatchSTM sourceAsync case inpChunk of Right inpChunk' -> lift $ yield inpChunk' Left (Left exc) -> throwM exc Left (Right res) -> finishConsume res fileContentHash <- if | not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ sinkFileMinio fileContentContent' | otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash)) return FileReference { fileReferenceContent = Just fileContentHash , fileReferenceTitle = fileTitle , fileReferenceModified = fileModified } sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) () sinkFiles' = C.mapM $ uncurry sinkFile' sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record sinkFile' file residual = do reference <- sinkFile file return $ _FileReference # (reference, residual) type FileUploads = ConduitT () FileReference (HandlerFor UniWorX) () replaceFileReferences' :: ( MonadIO m, MonadThrow m , IsFileReference record , PersistEntityBackend record ~ SqlBackend ) => (FileReferenceResidual record -> [Filter record]) -> FileReferenceResidual record -> ConduitT FileReference Void (SqlPersistT m) (Set (Key record), Set (Key record)) -- ^ @(oldFiles, changes)@ replaceFileReferences' mkFilter residual = do let resFilter = mkFilter residual oldFiles <- lift $ Map.fromListWith Set.union . map (\(Entity k v) -> (v ^. _FileReference . _1, Set.singleton k)) <$> selectList resFilter [] let oldFiles' = setOf (folded . folded) oldFiles let finsert fRef | Just sfIds <- fRef `Map.lookup` oldFiles = modify $ Map.mapMaybe (assertM' (not . Set.null) . (\\ sfIds)) | otherwise = do let fRef' = _FileReference # (fRef, residual) forM_ (persistUniqueKeys fRef') $ \u -> maybeT (return ()) $ do Entity cKey cVal <- MaybeT . lift $ getBy u deleted <- lift . lift . deleteWhereCount $ resFilter <> [ persistIdField ==. cKey ] unless (deleted == 1) $ throwM . userError $ "replaceFileReferences tried to delete outside of filter/database inconsistency: deleted=" <> show deleted lift . modify $ Map.alter (Just . maybe (Set.singleton cKey) (Set.insert cKey)) (cVal ^. _FileReference . _1) fId <- lift $ insert fRef' modify $ Map.alter (Just . maybe (Set.singleton fId) (Set.insert fId)) fRef changes <- fmap (setOf $ folded . folded) . execStateC oldFiles $ C.mapM_ finsert lift . deleteWhere $ resFilter <> [ persistIdField <-. Set.toList (oldFiles' `Set.intersection` changes) ] return (oldFiles', changes) replaceFileReferences :: ( MonadHandler m, MonadThrow m , HandlerSite m ~ UniWorX , IsFileReference record , PersistEntityBackend record ~ SqlBackend ) => (FileReferenceResidual record -> [Filter record]) -> FileReferenceResidual record -> FileUploads -> SqlPersistT m (Set (Key record), Set (Key record)) -- ^ @(oldFiles, changes)@ replaceFileReferences mkFilter residual fs = runConduit $ transPipe liftHandler fs .| replaceFileReferences' mkFilter residual isEmptyFileReference :: HasFileReference ref => ref -> Bool isEmptyFileReference = views (_FileReference . _1 . _fileReferenceContent) (maybe True (== $$(liftTyped $ FileContentReference $$(emptyHash))))