274 lines
12 KiB
Haskell
274 lines
12 KiB
Haskell
-- SPDX-FileCopyrightText: 2022 Gregor Kleen <gregor.kleen@ifi.lmu.de>
|
|
--
|
|
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
{-# LANGUAGE BangPatterns #-}
|
|
|
|
module Utils.Files
|
|
( sinkFile, sinkFiles
|
|
, sinkFile', sinkFiles'
|
|
, FileUploads
|
|
, replaceFileReferences, replaceFileReferences'
|
|
, sinkFileDB, sinkFileMinio
|
|
, isEmptyFileReference
|
|
, sinkMinio
|
|
) where
|
|
|
|
import Import.NoFoundation
|
|
import Utils.Metrics
|
|
import Foundation.Type
|
|
import Handler.Utils.Minio
|
|
import qualified Network.Minio as Minio
|
|
|
|
import qualified Crypto.Hash as Crypto (hash)
|
|
import qualified Crypto.Hash.Conduit as Crypto (sinkHash)
|
|
|
|
import qualified Data.Conduit.Combinators as C
|
|
import qualified Data.Conduit.List as C (unfoldM)
|
|
|
|
import qualified Data.Map.Lazy as Map
|
|
import qualified Data.Set as Set
|
|
import Control.Monad.State.Class (modify)
|
|
|
|
import qualified Data.Sequence as Seq
|
|
|
|
import Database.Persist.Sql (deleteWhereCount)
|
|
|
|
import Control.Monad.Trans.Resource (allocate, register, release)
|
|
|
|
import qualified Database.Esqueleto.Legacy as E
|
|
import qualified Database.Esqueleto.Utils as E
|
|
|
|
import Data.Conduit.Algorithms.FastCDC (fastCDC)
|
|
|
|
import Control.Monad.Trans.Cont
|
|
|
|
import qualified Crypto.Nonce as Nonce
|
|
import System.IO.Unsafe
|
|
|
|
import Data.Typeable (eqT)
|
|
|
|
|
|
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
|
|
=> Bool -- ^ Replace? Use only in serializable transaction
|
|
-> ConduitT () ByteString (SqlPersistT m) ()
|
|
-> SqlPersistT m FileContentReference
|
|
sinkFileDB doReplace fileContentContent = do
|
|
chunkingParams <- getsYesod $ view _appFileChunkingParams
|
|
|
|
let sinkChunk !fileContentChunkContent = do
|
|
fileChunkLockTime <- liftIO getCurrentTime
|
|
fileChunkLockInstance <- getsYesod appInstanceID
|
|
|
|
observeSunkChunk StorageDB $ olength fileContentChunkContent
|
|
|
|
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
|
|
|
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
|
|
|
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
|
if | existsChunk -> lift setContentBased
|
|
| otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $
|
|
insert_ FileContentChunk{..}
|
|
return $ FileContentChunkKey fileContentChunkHash
|
|
where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent
|
|
((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $
|
|
transPipe lift fileContentContent
|
|
.| fastCDC chunkingParams
|
|
.| C.mapM (\c -> (c, ) <$> sinkChunk c)
|
|
.| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton))
|
|
|
|
void . withUnliftIO $ \UnliftIO{..} ->
|
|
let takeLock = do
|
|
fileLockTime <- liftIO getCurrentTime
|
|
fileLockInstance <- getsYesod appInstanceID
|
|
insert FileLock{ fileLockContent = fileContentHash, .. }
|
|
releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ())
|
|
in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock)
|
|
|
|
deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ]
|
|
|
|
let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash
|
|
insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_
|
|
[ FileContentEntry{ fileContentEntryHash = fileContentHash, .. }
|
|
| fileContentEntryChunkHash <- fileContentChunks ^.. traverse . to unFileContentChunkKey
|
|
| fileContentEntryIx <- [0..]
|
|
]
|
|
if | not doReplace -> unlessM entryExists insertEntries
|
|
| otherwise -> do
|
|
deleteWhere [ FileContentEntryHash ==. fileContentHash ]
|
|
insertEntries
|
|
|
|
|
|
return fileContentHash
|
|
where fileContentChunkContentBased = True
|
|
|
|
|
|
minioTmpGenerator :: Nonce.Generator
|
|
minioTmpGenerator = unsafePerformIO Nonce.new
|
|
{-# NOINLINE minioTmpGenerator #-}
|
|
|
|
class Typeable ret => SinkMinio ret where
|
|
_sinkMinioRet :: Iso' ret (Digest SHA3_512)
|
|
default _sinkMinioRet :: (Rewrapping ret ret, Unwrapped ret ~ Digest SHA3_512) => Iso' ret (Digest SHA3_512)
|
|
_sinkMinioRet = _Wrapped
|
|
instance SinkMinio FileContentReference
|
|
instance SinkMinio FileContentChunkReference
|
|
|
|
sinkMinio :: forall ret m.
|
|
( MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m
|
|
, SinkMinio ret
|
|
)
|
|
=> ConduitT () ByteString m ()
|
|
-> MaybeT m ret
|
|
sinkMinio content = do
|
|
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
|
tmpBucket <- getsYesod $ views appSettings appUploadTmpBucket
|
|
chunk <- liftIO newEmptyMVar
|
|
let putChunks = do
|
|
nextChunk <- await
|
|
case nextChunk of
|
|
Nothing
|
|
-> putMVar chunk Nothing
|
|
Just nextChunk' -> do
|
|
observeSunkChunk StorageMinio $ olength nextChunk'
|
|
putMVar chunk $ Just nextChunk'
|
|
yield nextChunk'
|
|
putChunks
|
|
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
|
$ content
|
|
.| putChunks
|
|
.| Crypto.sinkHash
|
|
|
|
runAppMinio $ do
|
|
uploadName <- Nonce.nonce128urlT minioTmpGenerator
|
|
let pooOptions = Minio.defaultPutObjectOptions
|
|
{ Minio.pooCacheControl = Just "immutable"
|
|
}
|
|
removeObject <- withRunInIO $ \runInIO -> runInIO . register . runInIO $ Minio.removeObject tmpBucket uploadName
|
|
Minio.putObject tmpBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions
|
|
contentHash <- waitAsync sinkAsync
|
|
let dstName | Just Refl <- eqT @ret @FileContentReference
|
|
= minioFileReference . _sinkMinioRet # contentHash
|
|
| Just Refl <- eqT @ret @FileContentChunkReference
|
|
= minioFileChunkReference . _sinkMinioRet # contentHash
|
|
| otherwise
|
|
= error "sinkMinio called for return type other than FileContentReference or FileContentChunkReference"
|
|
copySrc = Minio.defaultSourceInfo
|
|
{ Minio.srcBucket = tmpBucket
|
|
, Minio.srcObject = uploadName
|
|
}
|
|
copyDst = Minio.defaultDestinationInfo
|
|
{ Minio.dstBucket = uploadBucket
|
|
, Minio.dstObject = dstName
|
|
}
|
|
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions
|
|
unless uploadExists $
|
|
Minio.copyObject copyDst copySrc
|
|
release removeObject
|
|
return $ _sinkMinioRet # contentHash
|
|
|
|
sinkFileMinio :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m)
|
|
=> ConduitT () ByteString m ()
|
|
-> MaybeT m FileContentReference
|
|
-- ^ Cannot deal with zero length uploads
|
|
sinkFileMinio = sinkMinio @FileContentReference
|
|
|
|
|
|
sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) ()
|
|
sinkFiles = C.mapM sinkFile
|
|
|
|
sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference
|
|
sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
|
{ fileReferenceContent = Nothing
|
|
, fileReferenceTitle = fileTitle
|
|
, fileReferenceModified = fileModified
|
|
}
|
|
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
|
chunk <- liftIO newEmptyTMVarIO
|
|
sourceAsync <- allocateLinkedAsync . runConduit $ fileContentContent .| C.mapM_ (atomically . putTMVar chunk)
|
|
|
|
isEmpty <- atomically $
|
|
False <$ readTMVar chunk
|
|
<|> True <$ waitSTM sourceAsync
|
|
|
|
let fileContentContent' = evalContT . callCC $ \finishConsume -> forever $ do
|
|
inpChunk <- atomically $
|
|
Right <$> takeTMVar chunk
|
|
<|> Left <$> waitCatchSTM sourceAsync
|
|
|
|
case inpChunk of
|
|
Right inpChunk' -> lift $ yield inpChunk'
|
|
Left (Left exc) -> throwM exc
|
|
Left (Right res) -> finishConsume res
|
|
|
|
fileContentHash <- if
|
|
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ sinkFileMinio fileContentContent'
|
|
| otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash))
|
|
|
|
return FileReference
|
|
{ fileReferenceContent = Just fileContentHash
|
|
, fileReferenceTitle = fileTitle
|
|
, fileReferenceModified = fileModified
|
|
}
|
|
|
|
|
|
sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) ()
|
|
sinkFiles' = C.mapM $ uncurry sinkFile'
|
|
|
|
sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record
|
|
sinkFile' file residual = do
|
|
reference <- sinkFile file
|
|
return $ _FileReference # (reference, residual)
|
|
|
|
|
|
type FileUploads = ConduitT () FileReference (HandlerFor UniWorX) ()
|
|
|
|
replaceFileReferences' :: ( MonadIO m, MonadThrow m
|
|
, IsFileReference record
|
|
, PersistEntityBackend record ~ SqlBackend
|
|
)
|
|
=> (FileReferenceResidual record -> [Filter record])
|
|
-> FileReferenceResidual record
|
|
-> ConduitT FileReference Void (SqlPersistT m) (Set (Key record), Set (Key record)) -- ^ @(oldFiles, changes)@
|
|
replaceFileReferences' mkFilter residual = do
|
|
let resFilter = mkFilter residual
|
|
|
|
oldFiles <- lift $ Map.fromListWith Set.union . map (\(Entity k v) -> (v ^. _FileReference . _1, Set.singleton k)) <$> selectList resFilter []
|
|
let oldFiles' = setOf (folded . folded) oldFiles
|
|
|
|
let
|
|
finsert fRef
|
|
| Just sfIds <- fRef `Map.lookup` oldFiles
|
|
= modify $ Map.mapMaybe (assertM' (not . Set.null) . (\\ sfIds))
|
|
| otherwise = do
|
|
let fRef' = _FileReference # (fRef, residual)
|
|
forM_ (persistUniqueKeys fRef') $ \u -> maybeT_ $ do
|
|
Entity cKey cVal <- MaybeT . lift $ getBy u
|
|
deleted <- lift . lift . deleteWhereCount $ resFilter <> [ persistIdField ==. cKey ]
|
|
unless (deleted == 1) $
|
|
throwM . userError $ "replaceFileReferences tried to delete outside of filter/database inconsistency: deleted=" <> show deleted
|
|
lift . modify $ Map.alter (Just . maybe (Set.singleton cKey) (Set.insert cKey)) (cVal ^. _FileReference . _1)
|
|
fId <- lift $ insert fRef'
|
|
modify $ Map.alter (Just . maybe (Set.singleton fId) (Set.insert fId)) fRef
|
|
|
|
changes <- fmap (setOf $ folded . folded) . execStateC oldFiles $ C.mapM_ finsert
|
|
|
|
lift . deleteWhere $ resFilter <> [ persistIdField <-. Set.toList (oldFiles' `Set.intersection` changes) ]
|
|
|
|
return (oldFiles', changes)
|
|
|
|
replaceFileReferences :: ( MonadHandler m, MonadThrow m
|
|
, HandlerSite m ~ UniWorX
|
|
, IsFileReference record
|
|
, PersistEntityBackend record ~ SqlBackend
|
|
)
|
|
=> (FileReferenceResidual record -> [Filter record])
|
|
-> FileReferenceResidual record
|
|
-> FileUploads
|
|
-> SqlPersistT m (Set (Key record), Set (Key record)) -- ^ @(oldFiles, changes)@
|
|
replaceFileReferences mkFilter residual fs = runConduit $ transPipe liftHandler fs .| replaceFileReferences' mkFilter residual
|
|
|
|
isEmptyFileReference :: HasFileReference ref => ref -> Bool
|
|
isEmptyFileReference = views (_FileReference . _1 . _fileReferenceContent) (maybe True (== $$(liftTyped $ FileContentReference $$(emptyHash))))
|