This repository has been archived on 2024-10-24. You can view files and clone it, but cannot push or open issues or pull requests.
fradrive-old/src/Utils/Files.hs
2020-09-09 13:44:01 +02:00

202 lines
9.7 KiB
Haskell

module Utils.Files
( sinkFile, sinkFiles
, sinkFile', sinkFiles'
, FileUploads
, replaceFileReferences, replaceFileReferences'
, sinkFileDB
) where
import Import.NoFoundation
import Foundation.Type
import Handler.Utils.Minio
import qualified Network.Minio as Minio
import qualified Crypto.Hash as Crypto (hash)
import qualified Crypto.Hash.Conduit as Crypto (sinkHash)
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (unfoldM)
import qualified Data.Map.Lazy as Map
import qualified Data.Set as Set
import Control.Monad.State.Class (modify)
import qualified Data.Sequence as Seq
import Database.Persist.Sql (deleteWhereCount)
import Control.Monad.Trans.Resource (allocate)
import qualified Data.UUID.V4 as UUID
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
import Data.Conduit.Algorithms.FastCDC (fastCDC)
sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX)
=> Bool -- ^ Replace? Use only in serializable transaction
-> ConduitT () ByteString (SqlPersistT m) ()
-> SqlPersistT m FileContentReference
sinkFileDB doReplace fileContentContent = do
chunkingParams <- getsYesod $ view _appFileChunkingParams
let sinkChunk fileContentChunkContent = do
fileChunkLockTime <- liftIO getCurrentTime
fileChunkLockInstance <- getsYesod appInstanceID
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
if | existsChunk -> lift setContentBased
| otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $
insert_ FileContentChunk{..}
return $ FileContentChunkKey fileContentChunkHash
where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent
((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $
transPipe lift fileContentContent
.| fastCDC chunkingParams
.| C.mapM (\c -> (c, ) <$> sinkChunk c)
.| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton))
void . withUnliftIO $ \UnliftIO{..} ->
let takeLock = do
fileLockTime <- liftIO getCurrentTime
fileLockInstance <- getsYesod appInstanceID
insert FileLock{ fileLockContent = fileContentHash, .. }
releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ())
in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock)
deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ]
let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash
insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_
[ FileContentEntry{ fileContentEntryHash = fileContentHash, .. }
| fileContentEntryChunkHash <- otoList fileContentChunks
| fileContentEntryIx <- [0..]
]
if | not doReplace -> unlessM entryExists insertEntries
| otherwise -> do
deleteWhere [ FileContentEntryHash ==. fileContentHash ]
insertEntries
return fileContentHash
where fileContentChunkContentBased = True
sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) ()
sinkFiles = C.mapM sinkFile
sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference
sinkFile File{ fileContent = Nothing, .. } = return FileReference
{ fileReferenceContent = Nothing
, fileReferenceTitle = fileTitle
, fileReferenceModified = fileModified
}
sinkFile File{ fileContent = Just fileContentContent, .. } = do
(unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE
fileContentHash <- if
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ do
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
chunk <- liftIO newEmptyMVar
let putChunks = do
nextChunk <- await
case nextChunk of
Nothing
-> putMVar chunk Nothing
Just nextChunk'
-> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks
sinkAsync <- lift . allocateLinkedAsync . runConduit
$ fileContentContent'
.| putChunks
.| Crypto.sinkHash
runAppMinio $ do
tmpUUID <- liftIO UUID.nextRandom
let uploadName = ".tmp." <> toPathPiece tmpUUID
pooOptions = Minio.defaultPutObjectOptions
{ Minio.pooCacheControl = Just "immutable"
}
Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions
fileContentHash <- review _Wrapped <$> waitAsync sinkAsync
let dstName = minioFileReference # fileContentHash
copySrc = Minio.defaultSourceInfo
{ Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName
}
copyDst = Minio.defaultDestinationInfo
{ Minio.dstBucket = uploadBucket
, Minio.dstObject = dstName
}
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions
unless uploadExists $
Minio.copyObject copyDst copySrc
Minio.removeObject uploadBucket uploadName
return fileContentHash
| otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash))
return FileReference
{ fileReferenceContent = Just fileContentHash
, fileReferenceTitle = fileTitle
, fileReferenceModified = fileModified
}
sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) ()
sinkFiles' = C.mapM $ uncurry sinkFile'
sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record
sinkFile' file residual = do
reference <- sinkFile file
return $ _FileReference # (reference, residual)
type FileUploads = ConduitT () FileReference (HandlerFor UniWorX) ()
replaceFileReferences' :: ( MonadIO m, MonadThrow m
, IsFileReference record
, PersistEntityBackend record ~ SqlBackend
)
=> (FileReferenceResidual record -> [Filter record])
-> FileReferenceResidual record
-> ConduitT FileReference Void (SqlPersistT m) (Set (Key record), Set (Key record)) -- ^ @(oldFiles, changes)@
replaceFileReferences' mkFilter residual = do
let resFilter = mkFilter residual
oldFiles <- lift $ Map.fromListWith Set.union . map (\(Entity k v) -> (v ^. _FileReference . _1, Set.singleton k)) <$> selectList resFilter []
let oldFiles' = setOf (folded . folded) oldFiles
let
finsert fRef
| Just sfIds <- fRef `Map.lookup` oldFiles
= modify $ Map.mapMaybe (assertM' (not . Set.null) . (\\ sfIds))
| otherwise = do
let fRef' = _FileReference # (fRef, residual)
forM_ (persistUniqueKeys fRef') $ \u -> maybeT (return ()) $ do
Entity cKey cVal <- MaybeT . lift $ getBy u
deleted <- lift . lift . deleteWhereCount $ resFilter <> [ persistIdField ==. cKey ]
unless (deleted == 1) $
throwM . userError $ "replaceFileReferences tried to delete outside of filter/database inconsistency: deleted=" <> show deleted
lift . modify $ Map.alter (Just . maybe (Set.singleton cKey) (Set.insert cKey)) (cVal ^. _FileReference . _1)
fId <- lift $ insert fRef'
modify $ Map.alter (Just . maybe (Set.singleton fId) (Set.insert fId)) fRef
changes <- fmap (setOf $ folded . folded) . execStateC oldFiles $ C.mapM_ finsert
lift . deleteWhere $ resFilter <> [ persistIdField <-. Set.toList (oldFiles' `Set.intersection` changes) ]
return (oldFiles', changes)
replaceFileReferences :: ( MonadHandler m, MonadThrow m
, HandlerSite m ~ UniWorX
, IsFileReference record
, PersistEntityBackend record ~ SqlBackend
)
=> (FileReferenceResidual record -> [Filter record])
-> FileReferenceResidual record
-> FileUploads
-> SqlPersistT m (Set (Key record), Set (Key record)) -- ^ @(oldFiles, changes)@
replaceFileReferences mkFilter residual fs = runConduit $ transPipe liftHandler fs .| replaceFileReferences' mkFilter residual