From 8f608c19552ef7bd6ce61af92496b3d5f5bf61b1 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 2 Sep 2020 21:25:20 +0200 Subject: [PATCH] feat(files): chunking BREAKING CHANGE: files now chunked --- config/settings.yml | 1 + models/files.model | 20 +++- src/Crypto/Hash/Instances.hs | 5 + src/Database/Esqueleto/Utils.hs | 47 +++++++- src/Foundation/Type.hs | 3 +- src/Handler/Admin/Test/Download.hs | 6 +- src/Handler/Sheet/PersonalisedFiles.hs | 5 +- src/Handler/Submission/Download.hs | 7 +- src/Handler/Utils.hs | 14 ++- src/Handler/Utils/Allocation.hs | 2 +- src/Handler/Utils/Files.hs | 86 +++++++++----- src/Handler/Utils/Form.hs | 27 +++-- src/Handler/Utils/Mail.hs | 9 +- src/Handler/Utils/Rating.hs | 14 +-- src/Handler/Utils/Rating/Format.hs | 7 +- src/Handler/Utils/Rating/Format/Legacy.hs | 6 +- src/Handler/Utils/Submission.hs | 4 +- src/Handler/Utils/Zip.hs | 102 ++++++++-------- src/Import/NoModel.hs | 2 + src/Jobs/Handler/Files.hs | 136 ++++++++++++++-------- src/Model/Migration.hs | 15 ++- src/Model/Types/Common.hs | 2 - src/Model/Types/File.hs | 52 ++++++++- src/Settings.hs | 2 + src/Utils.hs | 27 ++++- src/Utils/Files.hs | 122 ++++++++++++++----- src/Utils/Sql.hs | 40 ++++++- test/Database/Fill.hs | 5 +- 28 files changed, 546 insertions(+), 222 deletions(-) diff --git a/config/settings.yml b/config/settings.yml index 3824a9f2b..ee5a9a247 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -159,6 +159,7 @@ upload-cache: disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false" upload-cache-bucket: "uni2work-uploads" inject-files: 10 +file-upload-db-chunksize: 1048576 # 1MiB server-sessions: idle-timeout: 28807 diff --git a/models/files.model b/models/files.model index 428331b36..2a8656a3e 100644 --- a/models/files.model +++ b/models/files.model @@ -1,9 +1,20 @@ -FileContent +FileContentEntry hash FileContentReference + ix Natural + chunkHash FileContentChunkId + UniqueFileContentEntry hash ix + +FileContentChunk + hash FileContentChunkReference content ByteString - unreferencedSince UTCTime Maybe + contentBased Bool default=false -- For Migration Primary hash +FileContentChunkUnreferenced + hash FileContentChunkId + since UTCTime + UniqueFileContentChunkUnreferenced hash + SessionFile content FileContentReference Maybe touched UTCTime @@ -12,3 +23,8 @@ FileLock content FileContentReference instance InstanceId time UTCTime + +FileChunkLock + hash FileContentChunkReference + instance InstanceId + time UTCTime \ No newline at end of file diff --git a/src/Crypto/Hash/Instances.hs b/src/Crypto/Hash/Instances.hs index 93bf63516..0be90af18 100644 --- a/src/Crypto/Hash/Instances.hs +++ b/src/Crypto/Hash/Instances.hs @@ -18,6 +18,8 @@ import Data.Aeson as Aeson import Control.Monad.Fail +import Language.Haskell.TH.Syntax (Lift(liftTyped)) +import Instances.TH.Lift () instance HashAlgorithm hash => PersistField (Digest hash) where toPersistValue = PersistByteString . convert @@ -46,3 +48,6 @@ instance HashAlgorithm hash => FromJSON (Digest hash) where instance Hashable (Digest hash) where hashWithSalt s = (hashWithSalt s :: ByteString -> Int) . convert + +instance HashAlgorithm hash => Lift (Digest hash) where + liftTyped dgst = [||fromMaybe (error "Lifted digest has wrong length") $ digestFromByteString $$(liftTyped (convert dgst :: ByteString))||] diff --git a/src/Database/Esqueleto/Utils.hs b/src/Database/Esqueleto/Utils.hs index 23daf5679..7db0e3c39 100644 --- a/src/Database/Esqueleto/Utils.hs +++ b/src/Database/Esqueleto/Utils.hs @@ -6,6 +6,7 @@ module Database.Esqueleto.Utils , justVal, justValList , isJust , isInfixOf, hasInfix + , strConcat, substring , or, and , any, all , subSelectAnd, subSelectOr @@ -39,7 +40,8 @@ import qualified Data.Set as Set import qualified Data.List as List import qualified Data.Foldable as F import qualified Database.Esqueleto as E -import qualified Database.Esqueleto.Internal.Sql as E +import qualified Database.Esqueleto.PostgreSQL as E +import qualified Database.Esqueleto.Internal.Internal as E import Database.Esqueleto.Utils.TH import qualified Data.Text.Lazy as Lazy (Text) @@ -96,6 +98,42 @@ hasInfix :: ( E.SqlString s1 => E.SqlExpr (E.Value s2) -> E.SqlExpr (E.Value s1) -> E.SqlExpr (E.Value Bool) hasInfix = flip isInfixOf +infixl 6 `strConcat` + +strConcat :: E.SqlString s + => E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s) +strConcat = E.unsafeSqlBinOp " || " + +substring :: ( E.SqlString str + , Num from, Num for + ) + => E.SqlExpr (E.Value str) + -> E.SqlExpr (E.Value from) + -> E.SqlExpr (E.Value for) + -> E.SqlExpr (E.Value str) +substring (E.ERaw p1 f1) (E.ERaw p2 f2) (E.ERaw p3 f3) + = E.ERaw E.Never $ \info -> + let (strTLB, strVals) = f1 info + (fromiTLB, fromiVals) = f2 info + (foriTLB, foriVals) = f3 info + in ( "SUBSTRING" <> E.parens (E.parensM p1 strTLB <> " FROM " <> E.parensM p2 fromiTLB <> " FOR " <> E.parensM p3 foriTLB) + , strVals <> fromiVals <> foriVals + ) +substring a b c = substring (construct a) (construct b) (construct c) + where construct :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a) + construct (E.ERaw p f) = E.ERaw E.Parens $ \info -> + let (b1, vals) = f info + build ("?", [E.PersistList vals']) = + (E.uncommas $ replicate (length vals') "?", vals') + build expr = expr + in build (E.parensM p b1, vals) + construct (E.ECompositeKey f) = + E.ERaw E.Parens $ \info -> (E.uncommas $ f info, mempty) + construct (E.EAliasedValue i _) = + E.ERaw E.Never $ E.aliasedValueIdentToRawSql i + construct (E.EValueReference i i') = + E.ERaw E.Never $ E.valueReferenceToRawSql i i' + and, or :: Foldable f => f (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool) and = F.foldr (E.&&.) true or = F.foldr (E.||.) false @@ -111,8 +149,11 @@ all :: MonoFoldable f => (Element f -> E.SqlExpr (E.Value Bool)) -> f -> E.SqlEx all test = and . map test . otoList subSelectAnd, subSelectOr :: E.SqlQuery (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool) -subSelectAnd q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_and" <$> q -subSelectOr q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_or" <$> q +subSelectAnd q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_and" E.AggModeAll) [] <$> q +subSelectOr q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_or" E.AggModeAll) [] <$> q + +parens :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a) +parens = E.unsafeSqlFunction "" -- Allow usage of Tuples as DbtRowKey, i.e. SqlIn instances for tuples diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index 5595127e8..5257f1c35 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -7,7 +7,7 @@ module Foundation.Type , _SessionStorageMemcachedSql, _SessionStorageAcid , SMTPPool , _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport - , DB, Form, MsgRenderer, MailM + , DB, Form, MsgRenderer, MailM, DBFile ) where import Import.NoFoundation @@ -81,3 +81,4 @@ type DB = YesodDB UniWorX type Form x = Html -> MForm (HandlerFor UniWorX) (FormResult x, WidgetFor UniWorX ()) type MsgRenderer = MsgRendererS UniWorX -- see Utils type MailM a = MailT (HandlerFor UniWorX) a +type DBFile = File (YesodDB UniWorX) diff --git a/src/Handler/Admin/Test/Download.hs b/src/Handler/Admin/Test/Download.hs index dc02ae8e0..9bf85419d 100644 --- a/src/Handler/Admin/Test/Download.hs +++ b/src/Handler/Admin/Test/Download.hs @@ -80,12 +80,12 @@ testDownload = do sourceDBChunks :: ConduitT () Int DB () sourceDBChunks = forever sourceDBFiles .| C.mapM (\x -> x <$ $logDebugS "testDownload.sourceDBChunks" (tshow $ entityKey x)) - .| C.map ((length $!!) . fileContentContent . entityVal) + .| C.map ((length $!!) . fileContentChunkContent . entityVal) .| takeLimit dlMaxSize where - sourceDBFiles = E.selectSource . E.from $ \fileContent -> do + sourceDBFiles = E.selectSource . E.from $ \fileContentChunk -> do E.orderBy [E.asc $ E.random_ @Int64] - return fileContent + return fileContentChunk takeLimit n | n <= 0 = return () takeLimit n = do diff --git a/src/Handler/Sheet/PersonalisedFiles.hs b/src/Handler/Sheet/PersonalisedFiles.hs index 362c73d29..dc29fb61f 100644 --- a/src/Handler/Sheet/PersonalisedFiles.hs +++ b/src/Handler/Sheet/PersonalisedFiles.hs @@ -195,7 +195,7 @@ sourcePersonalisedSheetFiles :: forall m. -> Maybe SheetId -> Maybe (Set UserId) -> PersonalisedSheetFilesDownloadAnonymous - -> ConduitT () (Either PersonalisedSheetFile File) (SqlPersistT m) () + -> ConduitT () (Either PersonalisedSheetFile DBFile) (SqlPersistT m) () sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do (mbIdx, cIDKey) <- lift . newPersonalisedFilesKey $ maybe (Left cId) Right mbsid let @@ -255,9 +255,10 @@ sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do , fileModified = courseParticipantRegistration } yieldM . fmap Right $ do - fileContent <- lift $ Just . toStrict <$> formatPersonalisedSheetFilesMeta anonMode cPart cID + fileContent' <- lift $ formatPersonalisedSheetFilesMeta anonMode cPart cID let fileTitle = (dirName ) . ensureExtension "yaml" . unpack . mr $ MsgPersonalisedSheetFilesMetaFilename cID fileModified = courseParticipantRegistration + fileContent = Just $ C.sourceLazy fileContent' return File{..} _dirCache %= Set.insert dirName whenIsJust mbPFile $ \(Entity _ pFile@PersonalisedSheetFile{..}) -> do diff --git a/src/Handler/Submission/Download.hs b/src/Handler/Submission/Download.hs index 99f2f4457..6d55dfa62 100644 --- a/src/Handler/Submission/Download.hs +++ b/src/Handler/Submission/Download.hs @@ -11,8 +11,6 @@ import Handler.Utils.Submission import qualified Data.Set as Set -import qualified Data.Text.Encoding as Text - import qualified Database.Esqueleto as E import qualified Data.Conduit.Combinators as Conduit @@ -32,9 +30,8 @@ getSubDownloadR tid ssh csh shn cID (submissionFileTypeIsUpdate -> isUpdate) pat case isRating of True - | isUpdate -> runDB $ do - file <- runMaybeT $ lift . ratingFile cID =<< MaybeT (getRating submissionID) - maybe notFound (return . toTypedContent . Text.decodeUtf8) $ fileContent =<< file + | isUpdate -> maybe notFound sendThisFile <=< runDB . runMaybeT $ + lift . ratingFile cID =<< MaybeT (getRating submissionID) | otherwise -> notFound False -> do let results = (.| Conduit.map entityVal) . E.selectSource . E.from $ \sf -> do diff --git a/src/Handler/Utils.hs b/src/Handler/Utils.hs index b2b99ac45..c4ddd7db3 100644 --- a/src/Handler/Utils.hs +++ b/src/Handler/Utils.hs @@ -34,11 +34,13 @@ import Control.Monad.Logger -- | Simply send a `File`-Value -sendThisFile :: File -> Handler TypedContent +sendThisFile :: DBFile -> Handler TypedContent sendThisFile File{..} | Just fileContent' <- fileContent = do setContentDisposition' . Just $ takeFileName fileTitle - return $ TypedContent (simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8") (toContent fileContent') + let cType = simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8" + respondSourceDB cType $ + fileContent' .| Conduit.map toFlushBuilder | otherwise = sendResponseStatus noContent204 () -- | Serve a single file, identified through a given DB query @@ -46,7 +48,7 @@ serveOneFile :: forall file. HasFileReference file => ConduitT () file (YesodDB serveOneFile source = do results <- runDB . runConduit $ source .| Conduit.take 2 -- We don't need more than two files to make a decision below case results of - [file] -> sendThisFile =<< runDB (sourceFile' file) + [file] -> sendThisFile $ sourceFile' file [] -> notFound _other -> do $logErrorS "SFileR" "Multiple matching files found." @@ -58,7 +60,7 @@ serveOneFile source = do serveSomeFiles :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent serveSomeFiles archiveName source = serveSomeFiles' archiveName $ source .| C.map Left -serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent +serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent serveSomeFiles' archiveName source = do (source', results) <- runDB $ runPeekN 2 source @@ -66,7 +68,7 @@ serveSomeFiles' archiveName source = do case results of [] -> notFound - [file] -> sendThisFile =<< either (runDB . sourceFile') return file + [file] -> sendThisFile $ either sourceFile' id file _moreFiles -> do setContentDisposition' $ Just archiveName respondSourceDB typeZip $ do @@ -79,7 +81,7 @@ serveSomeFiles' archiveName source = do serveZipArchive :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent serveZipArchive archiveName source = serveZipArchive' archiveName $ source .| C.map Left -serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent +serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent serveZipArchive' archiveName source = do (source', results) <- runDB $ runPeekN 1 source diff --git a/src/Handler/Utils/Allocation.hs b/src/Handler/Utils/Allocation.hs index 510e9fc70..db512e25a 100644 --- a/src/Handler/Utils/Allocation.hs +++ b/src/Handler/Utils/Allocation.hs @@ -276,7 +276,7 @@ storeAllocationResult :: AllocationId -> (AllocationFingerprint, Set (UserId, CourseId), Seq MatchingLogRun) -> DB () storeAllocationResult allocId now (allocFp, allocMatchings, ppMatchingLog -> allocLog) = do - FileReference{..} <- sinkFile $ File "matchings.log" (Just $ encodeUtf8 allocLog) now + FileReference{..} <- sinkFile $ File "matchings.log" (Just . yield $ encodeUtf8 allocLog) now insert_ . AllocationMatching allocId allocFp now $ fromMaybe (error "allocation result stored without fileReferenceContent") fileReferenceContent doAllocation allocId now allocMatchings diff --git a/src/Handler/Utils/Files.hs b/src/Handler/Utils/Files.hs index 9400a9a4b..2375bd4e7 100644 --- a/src/Handler/Utils/Files.hs +++ b/src/Handler/Utils/Files.hs @@ -7,12 +7,13 @@ module Handler.Utils.Files import Import import qualified Data.Conduit.Combinators as C +import qualified Data.Conduit.List as C (unfoldM) import Handler.Utils.Minio import qualified Network.Minio as Minio -import qualified Data.ByteString.Base64.URL as Base64 -import qualified Data.ByteArray as ByteArray +import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.Utils as E data SourceFilesException @@ -22,36 +23,59 @@ data SourceFilesException deriving anyclass (Exception) -sourceFiles :: ConduitT FileReference File (YesodDB UniWorX) () -sourceFiles = C.mapM sourceFile +sourceFiles :: Monad m => ConduitT FileReference DBFile m () +sourceFiles = C.map sourceFile -sourceFile :: FileReference -> DB File -sourceFile FileReference{..} = do - mFileContent <- traverse get $ FileContentKey <$> fileReferenceContent - fileContent <- if - | is (_Just . _Nothing) mFileContent - , Just fileContentHash <- fileReferenceContent -- Not a restriction - -> maybeT (throwM SourceFilesContentUnavailable) $ do - let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash - uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket - fmap Just . hoistMaybe <=< runAppMinio . runMaybeT $ do - objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions - lift . runConduit $ Minio.gorObjectStream objRes .| C.fold - | fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent - -> throwM SourceFilesMismatchedHashes - | Just fileContent' <- fileContentContent <$> join mFileContent - -> return $ Just fileContent' - | otherwise - -> return Nothing +sourceFile :: FileReference -> DBFile +sourceFile FileReference{..} = File + { fileTitle = fileReferenceTitle + , fileModified = fileReferenceModified + , fileContent = toFileContent <$> fileReferenceContent + } + where + toFileContent fileReference + | fileReference == $$(liftTyped $ FileContentReference $$(emptyHash)) + = return () + toFileContent fileReference = do + inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference + if + | inDB -> do + dbChunksize <- getsYesod $ view _appFileUploadDBChunksize + let retrieveChunk chunkHash = \case + Nothing -> return Nothing + Just start -> do + chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do + E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash + return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) + case chunk of + Nothing -> throwM SourceFilesContentUnavailable + Just (E.Value c) -> return . Just . (c, ) $ if + | olength c >= dbChunksize -> Just $ start + dbChunksize + | otherwise -> Nothing + chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ] + return $ fileContentEntry E.^. FileContentEntryChunkHash + chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int)) + | otherwise -> do + chunkVar <- newEmptyTMVarIO + minioAsync <- lift . allocateLinkedAsync $ + maybeT (throwM SourceFilesContentUnavailable) $ do + let uploadName = minioFileReference # fileReference + uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket + hoistMaybe <=< runAppMinio . runMaybeT $ do + objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions + lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just) + atomically $ putTMVar chunkVar Nothing + let go = do + mChunk <- atomically $ readTMVar chunkVar + case mChunk of + Nothing -> waitAsync minioAsync + Just chunk -> yield chunk >> go + in go - return File - { fileTitle = fileReferenceTitle - , fileContent - , fileModified = fileReferenceModified - } +sourceFiles' :: forall file m. (HasFileReference file, Monad m) => ConduitT file DBFile m () +sourceFiles' = C.map sourceFile' -sourceFiles' :: forall file. HasFileReference file => ConduitT file File (YesodDB UniWorX) () -sourceFiles' = C.mapM sourceFile' - -sourceFile' :: forall file. HasFileReference file => file -> DB File +sourceFile' :: forall file. HasFileReference file => file -> DBFile sourceFile' = sourceFile . view (_FileReference . _1) diff --git a/src/Handler/Utils/Form.hs b/src/Handler/Utils/Form.hs index 68f9f68a0..4be16133b 100644 --- a/src/Handler/Utils/Form.hs +++ b/src/Handler/Utils/Form.hs @@ -32,7 +32,7 @@ import Yesod.Form.Bootstrap3 import Handler.Utils.Zip import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.List as C (mapMaybe) +import qualified Data.Conduit.List as C (mapMaybe, mapMaybeM) import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E @@ -831,7 +831,10 @@ pseudonymWordField = checkMMap doCheck id $ ciField & addDatalist (return $ mkOp uploadContents :: (MonadHandler m, HandlerSite m ~ UniWorX) => ConduitT FileReference ByteString m () -uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybe fileContent +uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybeM fileContent' + where fileContent' f = runMaybeT $ do + File{fileContent = Just fc} <- return f + liftHandler . runDB . runConduit $ fc .| C.fold data FileFieldUserOption a = FileFieldUserOption { fieldOptionForce :: Bool @@ -893,11 +896,21 @@ genericFileField mkOpts = Field{..} , Map.filter (views _3 $ (&&) <$> fieldOptionForce <*> fieldOptionDefault) fieldAdditionalFiles ] - handleUpload :: FileField -> Maybe Text -> ConduitT File FileReference (YesodDB UniWorX) () + handleUpload :: FileField -> Maybe Text -> ConduitT (File Handler) FileReference (YesodDB UniWorX) () handleUpload FileField{fieldMaxFileSize} mIdent - = C.filter (\File{..} -> maybe (const True) (>) fieldMaxFileSize $ maybe 0 (fromIntegral . olength) fileContent) - .| sinkFiles - .| C.mapM mkSessionFile + = C.map (transFile liftHandler) + .| C.mapMaybeM (\f@File{..} -> maybeT (return $ Just f) $ do + maxSize <- fromIntegral <$> hoistMaybe fieldMaxFileSize + fc <- hoistMaybe fileContent + let peekNE n = do + str <- C.takeE n .| C.fold + leftover str + yield str + (unsealConduitT -> fc', size) <- lift $ fc $$+ peekNE (succ maxSize) .| C.lengthE + return . guardOn (size <= maxSize) $ f { fileContent = Just fc' } + ) + .| sinkFiles + .| C.mapM mkSessionFile where mkSessionFile fRef@FileReference{..} = fRef <$ do now <- liftIO getCurrentTime @@ -924,7 +937,7 @@ genericFileField mkOpts = Field{..} doUnpack | fieldOptionForce fieldUnpackZips = fieldOptionDefault fieldUnpackZips | otherwise = unpackZips `elem` vals - handleFile :: FileInfo -> ConduitT () File Handler () + handleFile :: FileInfo -> ConduitT () (File Handler) Handler () handleFile | doUnpack = receiveFiles | otherwise = yieldM . acceptFile diff --git a/src/Handler/Utils/Mail.hs b/src/Handler/Utils/Mail.hs index 85038a894..18785044c 100644 --- a/src/Handler/Utils/Mail.hs +++ b/src/Handler/Utils/Mail.hs @@ -12,9 +12,7 @@ import Handler.Utils.Files import qualified Data.CaseInsensitive as CI -import qualified Data.ByteString.Lazy as LBS - -import qualified Data.Conduit.List as C +import qualified Data.Conduit.Combinators as C import qualified Text.Pandoc as P @@ -72,12 +70,13 @@ addFileDB :: ( MonadMail m , HandlerSite m ~ UniWorX ) => FileReference -> m (Maybe MailObjectId) addFileDB fRef = runMaybeT $ do - File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent} <- lift . liftHandler . runDB $ sourceFile fRef + File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent'} <- return $ sourceFile fRef + fileContent <- liftHandler . runDB . runConduit $ fileContent' .| C.sinkLazy lift . addPart $ do _partType .= decodeUtf8 (mimeLookup fileName) _partEncoding .= Base64 _partDisposition .= AttachmentDisposition fileName - _partContent .= PartContent (LBS.fromStrict fileContent) + _partContent .= PartContent fileContent setMailObjectIdPseudorandom (fileName, fileContent) :: StateT Part (HandlerFor UniWorX) MailObjectId diff --git a/src/Handler/Utils/Rating.hs b/src/Handler/Utils/Rating.hs index 5a82c8921..ef65892b0 100644 --- a/src/Handler/Utils/Rating.hs +++ b/src/Handler/Utils/Rating.hs @@ -16,11 +16,9 @@ import Handler.Utils.DateTime (getDateTimeFormatter) import qualified Data.Text as Text -import qualified Data.ByteString.Lazy as Lazy.ByteString - import qualified Database.Esqueleto as E -import qualified Data.Conduit.List as Conduit +import qualified Data.Conduit.Combinators as C import Handler.Utils.Rating.Format @@ -91,15 +89,16 @@ extensionRating = "txt" ratingFile :: ( MonadHandler m , HandlerSite m ~ UniWorX + , Monad m' ) - => CryptoFileNameSubmission -> Rating -> m File + => CryptoFileNameSubmission -> Rating -> m (File m') ratingFile cID rating@Rating{ ratingValues = Rating'{..} } = do mr'@(MsgRenderer mr) <- getMsgRenderer dtFmt <- getDateTimeFormatter fileModified <- maybe (liftIO getCurrentTime) return ratingTime let fileTitle = ensureExtension extensionRating . unpack . mr $ MsgRatingFileTitle cID - fileContent = Just . Lazy.ByteString.toStrict $ formatRating mr' dtFmt cID rating + fileContent = Just . C.sourceLazy $ formatRating mr' dtFmt cID rating return File{..} type SubmissionContent = Either FileReference (SubmissionId, Rating') @@ -107,13 +106,12 @@ type SubmissionContent = Either FileReference (SubmissionId, Rating') extractRatings :: ( MonadHandler m , HandlerSite m ~ UniWorX ) => ConduitT FileReference SubmissionContent m () -extractRatings = Conduit.mapM $ \fRef@FileReference{..} -> liftHandler $ do +extractRatings = C.mapM $ \fRef@FileReference{..} -> liftHandler $ do msId <- isRatingFile fileReferenceTitle if | Just sId <- msId , isJust fileReferenceContent -> do - f <- runDB $ sourceFile fRef - (rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) $ parseRating f + (rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) . runDB . parseRating $ sourceFile fRef sId' <- traverse decrypt cID unless (maybe (const True) (==) sId' sId) $ throwM $ RatingFileException fileReferenceTitle RatingSubmissionIDIncorrect diff --git a/src/Handler/Utils/Rating/Format.hs b/src/Handler/Utils/Rating/Format.hs index fd8ab2fb5..e7cd03568 100644 --- a/src/Handler/Utils/Rating/Format.hs +++ b/src/Handler/Utils/Rating/Format.hs @@ -35,6 +35,8 @@ import qualified System.FilePath.Cryptographic as Explicit import Control.Exception (ErrorCall(..)) +import qualified Data.Conduit.Combinators as C + data PrettifyState = PrettifyInitial @@ -195,8 +197,9 @@ instance ns ~ CryptoIDNamespace (CI FilePath) SubmissionId => YAML.FromYAML (May ) -parseRating :: MonadCatch m => File -> m (Rating', Maybe CryptoFileNameSubmission) -parseRating f@File{ fileContent = Just (fromStrict -> input), .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do +parseRating :: MonadCatch m => File m -> m (Rating', Maybe CryptoFileNameSubmission) +parseRating f@File{ fileContent = Just input', .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do + input <- runConduit $ input' .| C.sinkLazy let evStream = YAML.Event.parseEvents input delimitDocument = do ev <- maybe (throwM RatingYAMLStreamTerminatedUnexpectedly) return =<< await diff --git a/src/Handler/Utils/Rating/Format/Legacy.hs b/src/Handler/Utils/Rating/Format/Legacy.hs index bd523d06a..c027e1b07 100644 --- a/src/Handler/Utils/Rating/Format/Legacy.hs +++ b/src/Handler/Utils/Rating/Format/Legacy.hs @@ -16,6 +16,8 @@ import qualified Data.ByteString.Lazy as Lazy (ByteString) import qualified Data.CaseInsensitive as CI +import qualified Data.Conduit.Combinators as C + import Text.Read (readEither) @@ -55,9 +57,9 @@ formatRating cID Rating{ ratingValues = Rating'{..}, ..} = let ] in Lazy.Text.encodeUtf8 . (<> "\n") $ displayT doc -parseRating :: MonadCatch m => File -> m Rating' +parseRating :: MonadCatch m => File m -> m Rating' parseRating File{ fileContent = Just input, .. } = handle (throwM . RatingParseLegacyException) $ do - inputText <- either (throwM . RatingNotUnicode) return $ Text.decodeUtf8' input + inputText <- either (throwM . RatingNotUnicode) return . Text.decodeUtf8' =<< runConduit (input .| C.fold) let (headerLines', commentLines) = break (commentSep `Text.isInfixOf`) $ Text.lines inputText (reverse -> ratingLines, reverse -> _headerLines) = break (sep' `Text.isInfixOf`) $ reverse headerLines' diff --git a/src/Handler/Utils/Submission.hs b/src/Handler/Utils/Submission.hs index 13ac4d067..2dcac8f28 100644 --- a/src/Handler/Utils/Submission.hs +++ b/src/Handler/Utils/Submission.hs @@ -256,7 +256,7 @@ planSubmissions sid restriction = do maximumsBy f xs = flip Set.filter xs $ \x -> maybe True (((==) `on` f) x . maximumBy (comparing f)) $ fromNullable xs -submissionFileSource :: SubmissionId -> ConduitT () File (YesodDB UniWorX) () +submissionFileSource :: SubmissionId -> ConduitT () DBFile (YesodDB UniWorX) () submissionFileSource subId = E.selectSource (E.from $ submissionFileQuery subId) .| C.map entityVal .| sourceFiles' @@ -319,7 +319,7 @@ submissionMultiArchive anonymous (Set.toList -> ids) = do setContentDisposition' $ Just ((addExtension `on` unpack) (mr archiveName) extensionZip) respondSource typeZip . (<* lift cleanup) . transPipe (runDBRunner dbrunner) $ do let - fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () File (YesodDB UniWorX) () + fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () DBFile (YesodDB UniWorX) () fileEntitySource' (rating, Entity submissionID Submission{}, subTime, (shn,csh,ssh,tid,sheetAnonymous)) = do cID <- encrypt submissionID diff --git a/src/Handler/Utils/Zip.hs b/src/Handler/Utils/Zip.hs index 255d8f93b..0b469bcdb 100644 --- a/src/Handler/Utils/Zip.hs +++ b/src/Handler/Utils/Zip.hs @@ -17,11 +17,6 @@ import Codec.Archive.Zip.Conduit.Types import Codec.Archive.Zip.Conduit.UnZip import Codec.Archive.Zip.Conduit.Zip --- import qualified Data.ByteString.Lazy as Lazy (ByteString) -import qualified Data.ByteString.Lazy as Lazy.ByteString - -import qualified Data.ByteString as ByteString - import System.FilePath import Data.Time.LocalTime (localTimeToUTC, utcToLocalTime) @@ -38,6 +33,8 @@ import Data.Encoding ( decodeStrictByteStringExplicit import Data.Encoding.CP437 import qualified Data.Char as Char +import Control.Monad.Trans.Cont + typeZip :: ContentType typeZip = "application/zip" @@ -53,49 +50,54 @@ instance Default ZipInfo where } -consumeZip :: forall b m. +consumeZip :: forall b m m'. ( MonadThrow b , MonadThrow m , MonadBase b m , PrimMonad b + , MonadUnliftIO m + , MonadResource m + , MonadIO m' ) - => ConduitT ByteString File m ZipInfo -consumeZip = transPipe liftBase unZipStream `fuseUpstream` consumeZip' - where - consumeZip' :: ConduitT (Either ZipEntry ByteString) File m () - consumeZip' = do - input <- await - case input of - Nothing -> return () - Just (Right _) -> throwM $ userError "Data chunk in unexpected place when parsing ZIP" - Just (Left ZipEntry{..}) -> do - contentChunks <- toConsumer accContents - zipEntryName' <- decodeZipEntryName zipEntryName - let - fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise $ makeValid zipEntryName' - fileModified = localTimeToUTC utc zipEntryTime - fileContent - | hasTrailingPathSeparator zipEntryName' = Nothing - | otherwise = Just $ mconcat contentChunks - yield File{..} - consumeZip' - accContents :: ConduitT (Either a b') Void m [b'] - accContents = do - input <- await - case input of - Just (Right x) -> (x :) <$> accContents - Just (Left x) -> [] <$ leftover (Left x) - _ -> return [] + => ConduitT () ByteString m () -> ConduitT () (File m') m ZipInfo +consumeZip inpBS = do + inps <- liftIO newBroadcastTMChanIO + let feedSingle inp = atomically $ do + guardM $ isEmptyTMChan inps + writeTMChan inps inp + zipAsync <- lift . allocateLinkedAsync . runConduit $ do + zipInfo <- (inpBS .| transPipe liftBase unZipStream) `fuseUpstream` C.mapM_ feedSingle + atomically $ closeTMChan inps + return zipInfo -produceZip :: forall b m. - ( MonadThrow b - , MonadThrow m - , MonadBase b m - , PrimMonad b + evalContT . callCC $ \finish -> forever $ do + (fileChan, fileDef) <- atomically $ do + fileChan <- dupTMChan inps + fileDef <- readTMChan fileChan + return (fileChan, fileDef) + case fileDef of + Nothing -> finish =<< waitAsync zipAsync + Just (Right _) -> return () + Just (Left ZipEntry{..}) -> do + zipEntryName' <- decodeZipEntryName zipEntryName + let + fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise $ makeValid zipEntryName' + fileModified = localTimeToUTC utc zipEntryTime + isDirectory = hasTrailingPathSeparator zipEntryName' + fileContent + | isDirectory = Nothing + | otherwise = Just . evalContT . callCC $ \finishContent -> forever $ do + nextVal <- atomically $ (preview _Right =<<) <$> readTMChan fileChan + maybe (finishContent ()) (lift . yield) nextVal + lift $ yield File{..} + +produceZip :: forall m. + ( MonadThrow m + , PrimMonad m ) => ZipInfo - -> ConduitT File ByteString m () -produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOptions) + -> ConduitT (File m) ByteString m () +produceZip info = C.map toZipData .| void (zipStream zipOptions) where zipOptions = ZipOptions { zipOpt64 = True @@ -103,13 +105,11 @@ produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOpt , zipOptInfo = info } - toZipData :: File -> (ZipEntry, ZipData b) - toZipData f@File{..} = - let zData = maybe mempty (ZipDataByteString . Lazy.ByteString.fromStrict) fileContent - zEntry = (toZipEntry f){ zipEntrySize = fromIntegral . ByteString.length <$> fileContent } - in (zEntry, zData) + toZipData :: File m -> (ZipEntry, ZipData m) + toZipData f@File{..} + = (toZipEntry f, ) $ maybe mempty ZipDataSource fileContent - toZipEntry :: File -> ZipEntry + toZipEntry :: File m -> ZipEntry toZipEntry File{..} = ZipEntry{..} where isDir = isNothing fileContent @@ -119,26 +119,26 @@ produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOpt zipEntrySize = Nothing zipEntryExternalAttributes = Nothing -modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT File File m () +modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT (File m') (File m') m () modifyFileTitle f = mapC $ \x@File{..} -> x{ fileTitle = f fileTitle } -- Takes FileInfo and if it is a ZIP-Archive, extract files, otherwiese yield fileinfo -receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, MonadBase IO m) => FileInfo -> ConduitT () File m () +receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, MonadBase IO m, MonadUnliftIO m, MonadResource m') => FileInfo -> ConduitT () (File m') m () receiveFiles fInfo | ((==) `on` simpleContentType) mimeType typeZip = do $logInfoS "sourceFiles" "Unpacking ZIP" - fileSource fInfo .| void consumeZip + void . consumeZip $ fileSource fInfo | otherwise = do $logDebugS "sourceFiles" [st|Not unpacking file of type #{decodeUtf8 mimeType}|] yieldM $ acceptFile fInfo where mimeType = mimeLookup $ fileName fInfo -acceptFile :: MonadResource m => FileInfo -> m File +acceptFile :: (MonadResource m, MonadResource m') => FileInfo -> m (File m') acceptFile fInfo = do let fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise . unpack $ fileName fInfo + fileContent = Just $ fileSource fInfo fileModified <- liftIO getCurrentTime - fileContent <- fmap Just . runConduit $ fileSource fInfo .| foldC return File{..} diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index ed39106a0..7c03533a4 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -128,6 +128,8 @@ import Data.Proxy as Import (Proxy(..)) import Data.List.PointedList as Import (PointedList) +import Language.Haskell.TH.Syntax as Import (Lift(liftTyped)) + import Language.Haskell.TH.Instances as Import () import Data.NonNull.Instances as Import () import Data.Monoid.Instances as Import () diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 77e6337e2..48c4af882 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -9,20 +9,15 @@ import Import hiding (matching) import Database.Persist.Sql (deleteWhereCount) import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.PostgreSQL as E import qualified Database.Esqueleto.Utils as E -import qualified Database.Esqueleto.Internal.Sql as E (unsafeSqlCastAs) import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.List as C (mapMaybe) +import qualified Data.Conduit.List as C (mapMaybe, unfoldM) import Handler.Utils.Minio import qualified Network.Minio as Minio -import qualified Crypto.Hash as Crypto -import qualified Data.ByteString.Base64.URL as Base64 - -import Control.Monad.Memo (startEvalMemoT, memo) - dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do @@ -44,6 +39,9 @@ fileReferences (E.just -> fHash) , E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash , E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash , E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash + , E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash + E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash) ] @@ -53,63 +51,111 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do now <- liftIO getCurrentTime interval <- fmap (fmap $ max 0) . getsYesod $ view _appPruneUnreferencedFiles keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles + + E.insertSelectWithConflict + (UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint") + (E.from $ \fileContentChunk -> do + E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId + return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash + return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now + ) + (\current excluded -> + [ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ] + ) - E.update $ \fileContent -> do - let isReferenced = E.any E.exists . fileReferences $ fileContent E.^. FileContentHash - now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now - shouldBe = E.bool (E.just . E.maybe now' (E.min now') $ fileContent E.^. FileContentUnreferencedSince) E.nothing isReferenced - E.set fileContent [ FileContentUnreferencedSince E.=. shouldBe ] + E.delete . E.from $ \fileContentChunkUnreferenced -> + E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash let - getCandidates = E.selectSource . E.from $ \fileContent -> do - E.where_ . E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) $ fileContent E.^. FileContentUnreferencedSince - return ( fileContent E.^. FileContentHash - , E.length_ $ fileContent E.^. FileContentContent + getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do + let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do + E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince + E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince + + E.groupBy $ fileContentEntry E.^. FileContentEntryHash + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ] + + return $ fileContentEntry E.^. FileContentEntryHash + + deleteEntry :: _ -> DB (Sum Natural) + deleteEntry (E.Value fRef) = + bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef] + + Sum deletedEntries <- runConduit $ + getEntryCandidates + .| maybe (C.map id) (takeWhileTime . (/ 3)) interval + .| C.mapM deleteEntry + .| C.fold + + when (deletedEntries > 0) $ + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|] + + let + getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do + E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now) + E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + + return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + , E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent) ) - Sum deleted <- runConduit $ - getCandidates - .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + deleteChunk :: _ -> DB (Sum Natural, Sum Word64) + deleteChunk (E.Value cRef, E.Value size) = do + deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ] + (, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef] + + (Sum deletedChunks, Sum deletedChunkSize) <- runConduit $ + getChunkCandidates + .| maybe (C.map id) (takeWhileTime . (/ 3)) interval .| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64) - .| C.map (view $ _1 . _Value) - .| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef]) + .| C.mapM deleteChunk .| C.fold - when (deleted > 0) $ - $logInfoS "PruneUnreferencedFiles" [st|Deleted #{deleted} long-unreferenced files|] + + when (deletedChunks > 0) $ + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|] dispatchJobInjectFiles :: JobHandler UniWorX dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do uploadBucket <- getsYesod $ view _appUploadCacheBucket interval <- getsYesod $ view _appInjectFiles - now <- liftIO getCurrentTime let - extractReference (Minio.ListItemObject oi) - | Right bs <- Base64.decodeUnpadded . encodeUtf8 $ Minio.oiObject oi - , Just fRef <- Crypto.digestFromByteString bs - = Just (oi, fRef) + extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference extractReference _ = Nothing injectOrDelete :: (Minio.Object, FileContentReference) - -> Handler (Sum Int64, Sum Int64) -- ^ Injected, Already existed - injectOrDelete (obj, fRef) = maybeT (return mempty) $ do - res <- hoist (startEvalMemoT . hoistStateCache (runDB . setSerializable)) $ do - alreadyInjected <- lift . lift $ exists [ FileContentHash ==. fRef ] - if | alreadyInjected -> return (mempty, Sum 1) - | otherwise -> do - content <- flip memo obj $ \obj' -> hoistMaybeM . runAppMinio . runMaybeT $ do - objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj' Minio.defaultGetObjectOptions - lift . runConduit $ Minio.gorObjectStream objRes .| C.fold + -> Handler (Sum Int64) -- ^ Injected + injectOrDelete (obj, fRef) = do + fRef' <- runDB . setSerializable $ do + chunkVar <- newEmptyTMVarIO + dbAsync <- allocateLinkedAsync $ do + atomically $ isEmptyTMVar chunkVar >>= guard . not + sinkFileDB $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () - fmap ((, mempty) . Sum) . lift. lift . E.insertSelectCount $ - let isReferenced = E.any E.exists $ fileReferences (E.val fRef) - now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now - in return $ FileContent E.<# E.val fRef E.<&> E.val content E.<&> E.bool (E.just now') E.nothing isReferenced - runAppMinio . maybeT (return ()) . catchIfMaybeT minioIsDoesNotExist $ Minio.removeObject uploadBucket obj - return res + didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do + objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions + lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just) + return True + if + | not didSend -> Nothing <$ cancel dbAsync + | otherwise -> do + atomically $ putTMVar chunkVar Nothing + Just <$> waitAsync dbAsync + let matchesFRef = is _Just $ assertM (== fRef) fRef' + if | matchesFRef -> + maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj + | otherwise -> + $logErrorS "InjectFiles" [st|Minio object “#{obj}”'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|] + return . bool mempty (Sum 1) $ is _Just fRef' - (Sum inj, Sum exc) <- + Sum inj <- runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) .| C.mapMaybe extractReference .| maybe (C.map id) (takeWhileTime . (/ 2)) interval @@ -118,7 +164,5 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do .| transPipe lift (C.mapM injectOrDelete) .| C.fold - when (exc > 0) $ - $logInfoS "InjectFiles" [st|Deleted #{exc} files from upload cache because they were already injected|] when (inj > 0) $ $logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|] diff --git a/src/Model/Migration.hs b/src/Model/Migration.hs index 711fffde2..e52ecd577 100644 --- a/src/Model/Migration.hs +++ b/src/Model/Migration.hs @@ -915,13 +915,26 @@ customMigrations = Map.fromListWith (>>) ) , ( AppliedMigrationKey [migrationVersion|39.0.0|] [version|40.0.0|] - , whenM (tableExists "study_features") $ do + , whenM (tableExists "study_features") $ [executeQQ| ALTER TABLE study_features RENAME updated TO last_observed; ALTER TABLE study_features ADD COLUMN first_observed timestamp with time zone; UPDATE study_features SET first_observed = (SELECT MAX(last_observed) FROM study_features as other WHERE other."user" = study_features."user" AND other.degree = study_features.degree AND other.field = study_features.field AND other.type = study_features.type AND other.semester = study_features.semester - 1); |] ) + , ( AppliedMigrationKey [migrationVersion|40.0.0|] [version|41.0.0|] + , whenM (tableExists "file_content") $ + [executeQQ| + ALTER TABLE file_content RENAME TO file_content_chunk; + + CREATE TABLE file_content_chunk_unreferenced (id bigserial, hash bytea NOT NULL, since timestamp with time zone NOT NULL); + INSERT INTO file_content_chunk_unreferenced (since, hash) (SELECT unreferenced_since as since, hash FROM file_content_chunk WHERE NOT (unreferenced_since IS NULL)); + ALTER TABLE file_content_chunk_chunk DROP COLUMN unreferenced_since; + + CREATE TABLE file_content_entry (hash bytea NOT NULL, ix bigint NOT NULL, chunk_hash bytea NOT NULL); + INSERT INTO file_content_entry (hash, chunk_hash, ix) (SELECT hash, hash as chunk_hash, 0 as ix FROM file_content_chunk); + |] + ) ] diff --git a/src/Model/Types/Common.hs b/src/Model/Types/Common.hs index ceb97f2a2..08783669c 100644 --- a/src/Model/Types/Common.hs +++ b/src/Model/Types/Common.hs @@ -54,5 +54,3 @@ type InstanceId = UUID type ClusterId = UUID type TokenId = UUID type TermCandidateIncidence = UUID - -type FileContentReference = Digest SHA3_512 diff --git a/src/Model/Types/File.hs b/src/Model/Types/File.hs index 33bdadbb7..27855ce2b 100644 --- a/src/Model/Types/File.hs +++ b/src/Model/Types/File.hs @@ -1,23 +1,65 @@ module Model.Types.File - ( File(..), _fileTitle, _fileContent, _fileModified + ( FileContentChunkReference(..), FileContentReference(..) + , File(..), _fileTitle, _fileContent, _fileModified + , transFile + , minioFileReference , FileReference(..), _fileReferenceTitle, _fileReferenceContent, _fileReferenceModified , HasFileReference(..), IsFileReference(..), FileReferenceResidual(..) ) where import Import.NoModel -import Model.Types.Common (FileContentReference) + +import Database.Persist.Sql (PersistFieldSql) +import Web.HttpApiData (ToHttpApiData, FromHttpApiData) +import Data.ByteArray (ByteArrayAccess) + +import qualified Data.ByteString.Base64.URL as Base64 +import qualified Data.ByteArray as ByteArray +import qualified Network.Minio as Minio (Object) +import qualified Crypto.Hash as Crypto (digestFromByteString) import Utils.Lens.TH -data File = File + +newtype FileContentChunkReference = FileContentChunkReference (Digest SHA3_512) + deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable) + deriving newtype ( PersistField, PersistFieldSql + , PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON + , Hashable, NFData + , ByteArrayAccess + ) + +makeWrapped ''FileContentChunkReference + +newtype FileContentReference = FileContentReference (Digest SHA3_512) + deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable) + deriving newtype ( PersistField, PersistFieldSql + , PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON + , Hashable, NFData + , ByteArrayAccess + ) + +makeWrapped ''FileContentReference + + +minioFileReference :: Prism' Minio.Object FileContentReference +minioFileReference = prism' toObjectName fromObjectName + where toObjectName = decodeUtf8 . Base64.encodeUnpadded . ByteArray.convert + fromObjectName = fmap (review _Wrapped) . Crypto.digestFromByteString <=< preview _Right . Base64.decodeUnpadded . encodeUtf8 + + +data File m = File { fileTitle :: FilePath - , fileContent :: Maybe ByteString + , fileContent :: Maybe (ConduitT () ByteString m ()) , fileModified :: UTCTime - } deriving (Eq, Ord, Read, Show, Generic, Typeable) + } deriving (Generic, Typeable) makeLenses_ ''File +transFile :: Monad m => (forall a. m a -> n a) -> (File m -> File n) +transFile l File{..} = File{ fileContent = transPipe l <$> fileContent, .. } + data FileReference = FileReference { fileReferenceTitle :: FilePath , fileReferenceContent :: Maybe FileContentReference diff --git a/src/Settings.hs b/src/Settings.hs index 56bead944..3dffc78b4 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -172,6 +172,7 @@ data AppSettings = AppSettings , appUploadCacheConf :: Maybe Minio.ConnectInfo , appUploadCacheBucket :: Minio.Bucket , appInjectFiles :: Maybe NominalDiffTime + , appFileUploadDBChunksize :: Int , appFavouritesQuickActionsBurstsize , appFavouritesQuickActionsAvgInverseRate :: Word64 @@ -474,6 +475,7 @@ instance FromJSON AppSettings where appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files" appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0 appInjectFiles <- o .:? "inject-files" + appFileUploadDBChunksize <- o .: "file-upload-db-chunksize" appMaximumContentLength <- o .: "maximum-content-length" diff --git a/src/Utils.hs b/src/Utils.hs index 3a517d231..5f25e1ac3 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -56,7 +56,8 @@ import Control.Arrow as Utils ((>>>)) import Control.Monad.Trans.Except (ExceptT(..), throwE, runExceptT) import Control.Monad.Except (MonadError(..)) import Control.Monad.Trans.Maybe as Utils (MaybeT(..)) -import Control.Monad.Trans.Writer.Lazy (WriterT, execWriterT, tell) +import Control.Monad.Trans.Writer.Strict (execWriterT) +import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Catch import Control.Monad.Morph (hoist) import Control.Monad.Fail @@ -83,6 +84,9 @@ import qualified Crypto.Saltine.Class as Saltine import qualified Crypto.Data.PKCS7 as PKCS7 import Crypto.MAC.KMAC (KMAC, HashSHAKE) import qualified Crypto.MAC.KMAC as KMAC +import qualified Crypto.Hash as Crypto +import Crypto.Hash (HashAlgorithm, Digest) +import Crypto.Hash.Instances () import Data.ByteArray (ByteArrayAccess) @@ -843,7 +847,7 @@ diffTimeout timeoutLength timeoutRes act = fromMaybe timeoutRes <$> timeout time = let (MkFixed micro :: Micro) = realToFrac timeoutLength in fromInteger micro -tellM :: (Monad m, Monoid x) => m x -> WriterT x m () +tellM :: (MonadTrans t, MonadWriter x (t m), Monad m) => m x -> t m () tellM = tell <=< lift ------------- @@ -856,6 +860,19 @@ peekN n = do mapM_ leftover peeked return peeked +peekWhile :: forall a o m. Monad m => (a -> Bool) -> ConduitT a o m [a] +peekWhile p = do + let go acc = do + next <- await + case next of + Nothing -> return (reverse acc, Nothing) + Just x + | p x -> go $ x : acc + | otherwise -> return (reverse acc, Just x) + (peeked, failed) <- go [] + mapM_ leftover $ peeked ++ hoistMaybe failed + return peeked + anyMC, allMC :: forall a o m. Monad m => (a -> m Bool) -> ConduitT a o m Bool anyMC f = C.mapM f .| orC allMC f = C.mapM f .| andC @@ -1057,6 +1074,12 @@ kmaclazy :: forall a string key ba chunk. -> KMAC a kmaclazy str k = KMAC.finalize . KMAC.updates (KMAC.initialize @a str k) . toChunks +emptyHash :: forall a. HashAlgorithm a => Q (TExp (Digest a)) +-- ^ Hash of `mempty` +-- +-- Computationally preferrable to computing the hash at runtime +emptyHash = TH.liftTyped $ Crypto.hashFinalize Crypto.hashInit + ------------- -- Caching -- ------------- diff --git a/src/Utils/Files.hs b/src/Utils/Files.hs index 0138a93c4..fadc7f23d 100644 --- a/src/Utils/Files.hs +++ b/src/Utils/Files.hs @@ -3,6 +3,7 @@ module Utils.Files , sinkFile', sinkFiles' , FileUploads , replaceFileReferences, replaceFileReferences' + , sinkFileDB ) where import Import.NoFoundation @@ -11,31 +12,46 @@ import Handler.Utils.Minio import qualified Network.Minio as Minio import qualified Crypto.Hash as Crypto (hash) +import qualified Crypto.Hash.Conduit as Crypto (sinkHash) import qualified Data.Conduit.Combinators as C - -import qualified Data.ByteString.Base64.URL as Base64 -import qualified Data.ByteArray as ByteArray +import qualified Data.Conduit.List as C (unfoldM) import qualified Data.Map.Lazy as Map import qualified Data.Set as Set import Control.Monad.State.Class (modify) +import qualified Data.Sequence as Seq + import Database.Persist.Sql (deleteWhereCount) import Control.Monad.Trans.Resource (allocate) +import qualified Data.UUID.V4 as UUID -sinkFiles :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT File FileReference (SqlPersistT m) () -sinkFiles = C.mapM sinkFile +import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.Utils as E + + +sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) + => ConduitT () ByteString (SqlPersistT m) () -> SqlPersistT m FileContentReference +sinkFileDB fileContentContent = do + dbChunksize <- getsYesod $ view _appFileUploadDBChunksize + + let sinkChunk fileContentChunkContent = do + fileChunkLockTime <- liftIO getCurrentTime + fileChunkLockInstance <- getsYesod appInstanceID + + tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. } + lift . handleIfSql isUniqueConstraintViolation (const $ return ()) $ + insert_ FileContentChunk{..} + return $ FileContentChunkKey fileContentChunkHash + where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent + ((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ transPipe lift fileContentContent + .| C.chunksOfE dbChunksize + .| C.mapM (\c -> (c, ) <$> sinkChunk c) + .| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton)) -sinkFile :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File -> SqlPersistT m FileReference -sinkFile File{ fileContent = Nothing, .. } = return FileReference - { fileReferenceContent = Nothing - , fileReferenceTitle = fileTitle - , fileReferenceModified = fileModified - } -sinkFile File{ fileContent = Just fileContentContent, .. } = do void . withUnliftIO $ \UnliftIO{..} -> let takeLock = do fileLockTime <- liftIO getCurrentTime @@ -44,35 +60,81 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ()) in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock) - inDB <- exists [ FileContentHash ==. fileContentHash ] + deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ] - let sinkFileDB = unless inDB $ repsert (FileContentKey fileContentHash) FileContent{ fileContentUnreferencedSince = Nothing, .. } - maybeT sinkFileDB $ do - let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash - uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket - unless inDB . runAppMinio $ do - uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket uploadName Minio.defaultGetObjectOptions - unless uploadExists $ do - let - pooOptions = Minio.defaultPutObjectOptions - { Minio.pooCacheControl = Just "immutable" - } - Minio.putObject uploadBucket uploadName (C.sourceLazy $ fromStrict fileContentContent) (Just . fromIntegral $ olength fileContentContent) pooOptions - -- Note that MinIO does not accept length zero uploads without an explicit length specification (not `Nothing` in the line above for the api we use) + let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash + unlessM entryExists . void $ + insertMany_ [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. } + | fileContentEntryChunkHash <- otoList fileContentChunks + | fileContentEntryIx <- [0..] + ] + + return fileContentHash + where fileContentChunkContentBased = False + + +sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) () +sinkFiles = C.mapM sinkFile + +sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference +sinkFile File{ fileContent = Nothing, .. } = return FileReference + { fileReferenceContent = Nothing + , fileReferenceTitle = fileTitle + , fileReferenceModified = fileModified + } +sinkFile File{ fileContent = Just fileContentContent, .. } = do + (unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE + + fileContentHash <- if + | not isEmpty -> maybeT (sinkFileDB fileContentContent') $ do + uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket + chunk <- liftIO newEmptyMVar + let putChunks = do + nextChunk <- await + case nextChunk of + Nothing + -> putMVar chunk Nothing + Just nextChunk' + -> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks + sinkAsync <- lift . allocateLinkedAsync . runConduit + $ fileContentContent' + .| putChunks + .| Crypto.sinkHash + + runAppMinio $ do + tmpUUID <- liftIO UUID.nextRandom + let uploadName = ".tmp." <> toPathPiece tmpUUID + pooOptions = Minio.defaultPutObjectOptions + { Minio.pooCacheControl = Just "immutable" + } + Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions + fileContentHash <- review _Wrapped <$> waitAsync sinkAsync + let dstName = minioFileReference # fileContentHash + copySrc = Minio.defaultSourceInfo + { Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName + } + copyDst = Minio.defaultDestinationInfo + { Minio.dstBucket = uploadBucket + , Minio.dstObject = dstName + } + uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions + unless uploadExists $ + Minio.copyObject copyDst copySrc + Minio.removeObject uploadBucket uploadName + return fileContentHash + | otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash)) return FileReference { fileReferenceContent = Just fileContentHash , fileReferenceTitle = fileTitle , fileReferenceModified = fileModified } - where - fileContentHash = Crypto.hash fileContentContent -sinkFiles' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File, FileReferenceResidual record) record (SqlPersistT m) () +sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) () sinkFiles' = C.mapM $ uncurry sinkFile' -sinkFile' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File -> FileReferenceResidual record -> SqlPersistT m record +sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record sinkFile' file residual = do reference <- sinkFile file return $ _FileReference # (reference, residual) diff --git a/src/Utils/Sql.hs b/src/Utils/Sql.hs index 2a03a76da..ad51820d0 100644 --- a/src/Utils/Sql.hs +++ b/src/Utils/Sql.hs @@ -1,22 +1,32 @@ module Utils.Sql ( setSerializable, setSerializable' + , catchSql, handleSql + , isUniqueConstraintViolation + , catchIfSql, handleIfSql ) where -import ClassyPrelude.Yesod +import ClassyPrelude.Yesod hiding (handle) import Numeric.Natural import Settings.Log -import Database.PostgreSQL.Simple (SqlError) +import Database.PostgreSQL.Simple (SqlError(..)) import Database.PostgreSQL.Simple.Errors (isSerializationError) -import Control.Monad.Catch (MonadMask) +import Control.Monad.Catch import Database.Persist.Sql import Database.Persist.Sql.Raw.QQ +import qualified Data.ByteString as ByteString + import Control.Retry import Control.Lens ((&)) +import qualified Data.UUID as UUID +import Control.Monad.Random.Class (MonadRandom(getRandom)) + +import Text.Shakespeare.Text (st) + setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 @@ -54,5 +64,29 @@ setSerializable' policy act = do transactionSaveWithIsolation ReadCommitted return res +catchSql :: forall m a. (MonadCatch m, MonadIO m) => SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a +catchSql = flip handleSql +handleSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a +handleSql recover act = do + savepointName <- liftIO $ UUID.toString <$> getRandom + let recover' :: SqlError -> SqlPersistT m a + recover' exc = do + rawExecute [st|ROLLBACK TO SAVEPOINT "#{savepointName}"|] [] + recover exc + + handle recover' $ do + rawExecute [st|SAVEPOINT "#{savepointName}"|] [] + res <- act + rawExecute [st|RELEASE SAVEPOINT "#{savepointName}"|] [] + return res + +catchIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a +catchIfSql p = flip $ handleIfSql p + +handleIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a +handleIfSql p recover = handleSql (\err -> bool throwM recover (p err) err) + +isUniqueConstraintViolation :: SqlError -> Bool +isUniqueConstraintViolation SqlError{..} = "duplicate key value violates unique constraint" `ByteString.isPrefixOf` sqlErrorMsg diff --git a/test/Database/Fill.hs b/test/Database/Fill.hs index 4c1d4a3ee..0292308da 100644 --- a/test/Database/Fill.hs +++ b/test/Database/Fill.hs @@ -5,7 +5,6 @@ module Database.Fill import "uniworx" Import hiding (Option(..), currentYear) import Handler.Utils.Form (SheetGrading'(..), SheetType'(..), SheetGroup'(..)) -import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import qualified Data.Text as Text -- import Data.Text.IO (hPutStrLn) @@ -30,6 +29,8 @@ import qualified Data.Csv as Csv import Crypto.Random (getRandomBytes) import Data.List (genericLength) +import qualified Data.Conduit.Combinators as C + testdataDir :: FilePath testdataDir = "testdata" @@ -37,7 +38,7 @@ testdataDir = "testdata" insertFile :: ( HasFileReference fRef, PersistRecordBackend fRef SqlBackend ) => FileReferenceResidual fRef -> FilePath -> DB (Key fRef) insertFile residual fileTitle = do - fileContent <- liftIO . fmap Just . BS.readFile $ testdataDir fileTitle + let fileContent = Just . C.sourceFile $ testdataDir fileTitle fileModified <- liftIO getCurrentTime sinkFile' File{..} residual >>= insert