diff --git a/config/settings.yml b/config/settings.yml index 3824a9f2b..d8b8b0330 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -35,7 +35,8 @@ bearer-expiration: 604800 bearer-encoding: HS256 maximum-content-length: "_env:MAX_UPLOAD_SIZE:134217728" session-files-expire: 3600 -prune-unreferenced-files: 28800 +prune-unreferenced-files-within: 57600 +prune-unreferenced-files-interval: 3600 keep-unreferenced-files: 86400 health-check-interval: matching-cluster-config: "_env:HEALTHCHECK_INTERVAL_MATCHING_CLUSTER_CONFIG:600" @@ -158,7 +159,13 @@ upload-cache: auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true" disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false" upload-cache-bucket: "uni2work-uploads" -inject-files: 10 + +inject-files: 307 +rechunk-files: 601 + +file-upload-db-chunksize: 4194304 # 4MiB +file-chunking-target-exponent: 21 # 2MiB +file-chunking-hash-window: 4096 server-sessions: idle-timeout: 28807 @@ -229,6 +236,9 @@ token-buckets: depth: 1572864000 # 1500MiB inv-rate: 1.9e-6 # 2MiB/s initial-value: 0 - + rechunk-files: + depth: 20971520 # 20MiB + inv-rate: 9.5e-7 # 1MiB/s + initial-value: 0 fallback-personalised-sheet-files-keys-expire: 2419200 diff --git a/config/test-settings.yml b/config/test-settings.yml index 7ba4552eb..ab9bd7f5a 100644 --- a/config/test-settings.yml +++ b/config/test-settings.yml @@ -10,4 +10,5 @@ log-settings: auth-dummy-login: true server-session-acid-fallback: true +job-cron-interval: null job-workers: 1 diff --git a/models/files.model b/models/files.model index 428331b36..2a8656a3e 100644 --- a/models/files.model +++ b/models/files.model @@ -1,9 +1,20 @@ -FileContent +FileContentEntry hash FileContentReference + ix Natural + chunkHash FileContentChunkId + UniqueFileContentEntry hash ix + +FileContentChunk + hash FileContentChunkReference content ByteString - unreferencedSince UTCTime Maybe + contentBased Bool default=false -- For Migration Primary hash +FileContentChunkUnreferenced + hash FileContentChunkId + since UTCTime + UniqueFileContentChunkUnreferenced hash + SessionFile content FileContentReference Maybe touched UTCTime @@ -12,3 +23,8 @@ FileLock content FileContentReference instance InstanceId time UTCTime + +FileChunkLock + hash FileContentChunkReference + instance InstanceId + time UTCTime \ No newline at end of file diff --git a/package.yaml b/package.yaml index 04808b215..c7ccdf75b 100644 --- a/package.yaml +++ b/package.yaml @@ -151,6 +151,7 @@ dependencies: - minio-hs - network-ip - data-textual + - fastcdc other-extensions: - GeneralizedNewtypeDeriving diff --git a/src/Application.hs b/src/Application.hs index 490040eed..d4dd082fb 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -227,13 +227,15 @@ makeFoundation appSettings'@AppSettings{..} = do forM_ ldapPool $ registerFailoverMetrics "ldap" -- Perform database migration using our application's logging settings. - if - | appAutoDbMigrate -> do - $logDebugS "setup" "Migration" - migrateAll `runSqlPool` sqlPool - | otherwise -> whenM (requiresMigration `runSqlPool` sqlPool) $ do - $logErrorS "setup" "Migration required" - liftIO . exitWith $ ExitFailure 2 + flip runReaderT tempFoundation $ + if + | appAutoDbMigrate -> do + $logDebugS "setup" "Migration" + migrateAll `runSqlPool` sqlPool + | otherwise -> whenM (requiresMigration `runSqlPool` sqlPool) $ do + $logErrorS "setup" "Migration required" + liftIO . exitWith $ ExitFailure 2 + $logDebugS "setup" "Cluster-Config" appCryptoIDKey <- clusterSetting (Proxy :: Proxy 'ClusterCryptoIDKey) `runSqlPool` sqlPool appSecretBoxKey <- clusterSetting (Proxy :: Proxy 'ClusterSecretBoxKey) `runSqlPool` sqlPool diff --git a/src/Crypto/Hash/Instances.hs b/src/Crypto/Hash/Instances.hs index 93bf63516..0be90af18 100644 --- a/src/Crypto/Hash/Instances.hs +++ b/src/Crypto/Hash/Instances.hs @@ -18,6 +18,8 @@ import Data.Aeson as Aeson import Control.Monad.Fail +import Language.Haskell.TH.Syntax (Lift(liftTyped)) +import Instances.TH.Lift () instance HashAlgorithm hash => PersistField (Digest hash) where toPersistValue = PersistByteString . convert @@ -46,3 +48,6 @@ instance HashAlgorithm hash => FromJSON (Digest hash) where instance Hashable (Digest hash) where hashWithSalt s = (hashWithSalt s :: ByteString -> Int) . convert + +instance HashAlgorithm hash => Lift (Digest hash) where + liftTyped dgst = [||fromMaybe (error "Lifted digest has wrong length") $ digestFromByteString $$(liftTyped (convert dgst :: ByteString))||] diff --git a/src/Database/Esqueleto/Utils.hs b/src/Database/Esqueleto/Utils.hs index 23daf5679..7db0e3c39 100644 --- a/src/Database/Esqueleto/Utils.hs +++ b/src/Database/Esqueleto/Utils.hs @@ -6,6 +6,7 @@ module Database.Esqueleto.Utils , justVal, justValList , isJust , isInfixOf, hasInfix + , strConcat, substring , or, and , any, all , subSelectAnd, subSelectOr @@ -39,7 +40,8 @@ import qualified Data.Set as Set import qualified Data.List as List import qualified Data.Foldable as F import qualified Database.Esqueleto as E -import qualified Database.Esqueleto.Internal.Sql as E +import qualified Database.Esqueleto.PostgreSQL as E +import qualified Database.Esqueleto.Internal.Internal as E import Database.Esqueleto.Utils.TH import qualified Data.Text.Lazy as Lazy (Text) @@ -96,6 +98,42 @@ hasInfix :: ( E.SqlString s1 => E.SqlExpr (E.Value s2) -> E.SqlExpr (E.Value s1) -> E.SqlExpr (E.Value Bool) hasInfix = flip isInfixOf +infixl 6 `strConcat` + +strConcat :: E.SqlString s + => E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s) -> E.SqlExpr (E.Value s) +strConcat = E.unsafeSqlBinOp " || " + +substring :: ( E.SqlString str + , Num from, Num for + ) + => E.SqlExpr (E.Value str) + -> E.SqlExpr (E.Value from) + -> E.SqlExpr (E.Value for) + -> E.SqlExpr (E.Value str) +substring (E.ERaw p1 f1) (E.ERaw p2 f2) (E.ERaw p3 f3) + = E.ERaw E.Never $ \info -> + let (strTLB, strVals) = f1 info + (fromiTLB, fromiVals) = f2 info + (foriTLB, foriVals) = f3 info + in ( "SUBSTRING" <> E.parens (E.parensM p1 strTLB <> " FROM " <> E.parensM p2 fromiTLB <> " FOR " <> E.parensM p3 foriTLB) + , strVals <> fromiVals <> foriVals + ) +substring a b c = substring (construct a) (construct b) (construct c) + where construct :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a) + construct (E.ERaw p f) = E.ERaw E.Parens $ \info -> + let (b1, vals) = f info + build ("?", [E.PersistList vals']) = + (E.uncommas $ replicate (length vals') "?", vals') + build expr = expr + in build (E.parensM p b1, vals) + construct (E.ECompositeKey f) = + E.ERaw E.Parens $ \info -> (E.uncommas $ f info, mempty) + construct (E.EAliasedValue i _) = + E.ERaw E.Never $ E.aliasedValueIdentToRawSql i + construct (E.EValueReference i i') = + E.ERaw E.Never $ E.valueReferenceToRawSql i i' + and, or :: Foldable f => f (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool) and = F.foldr (E.&&.) true or = F.foldr (E.||.) false @@ -111,8 +149,11 @@ all :: MonoFoldable f => (Element f -> E.SqlExpr (E.Value Bool)) -> f -> E.SqlEx all test = and . map test . otoList subSelectAnd, subSelectOr :: E.SqlQuery (E.SqlExpr (E.Value Bool)) -> E.SqlExpr (E.Value Bool) -subSelectAnd q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_and" <$> q -subSelectOr q = E.subSelectUnsafe $ E.unsafeSqlFunction "bool_or" <$> q +subSelectAnd q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_and" E.AggModeAll) [] <$> q +subSelectOr q = parens . E.subSelectUnsafe $ flip (E.unsafeSqlAggregateFunction "bool_or" E.AggModeAll) [] <$> q + +parens :: E.SqlExpr (E.Value a) -> E.SqlExpr (E.Value a) +parens = E.unsafeSqlFunction "" -- Allow usage of Tuples as DbtRowKey, i.e. SqlIn instances for tuples diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index 5595127e8..5257f1c35 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -7,7 +7,7 @@ module Foundation.Type , _SessionStorageMemcachedSql, _SessionStorageAcid , SMTPPool , _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport - , DB, Form, MsgRenderer, MailM + , DB, Form, MsgRenderer, MailM, DBFile ) where import Import.NoFoundation @@ -81,3 +81,4 @@ type DB = YesodDB UniWorX type Form x = Html -> MForm (HandlerFor UniWorX) (FormResult x, WidgetFor UniWorX ()) type MsgRenderer = MsgRendererS UniWorX -- see Utils type MailM a = MailT (HandlerFor UniWorX) a +type DBFile = File (YesodDB UniWorX) diff --git a/src/Handler/Admin/Test/Download.hs b/src/Handler/Admin/Test/Download.hs index dc02ae8e0..9bf85419d 100644 --- a/src/Handler/Admin/Test/Download.hs +++ b/src/Handler/Admin/Test/Download.hs @@ -80,12 +80,12 @@ testDownload = do sourceDBChunks :: ConduitT () Int DB () sourceDBChunks = forever sourceDBFiles .| C.mapM (\x -> x <$ $logDebugS "testDownload.sourceDBChunks" (tshow $ entityKey x)) - .| C.map ((length $!!) . fileContentContent . entityVal) + .| C.map ((length $!!) . fileContentChunkContent . entityVal) .| takeLimit dlMaxSize where - sourceDBFiles = E.selectSource . E.from $ \fileContent -> do + sourceDBFiles = E.selectSource . E.from $ \fileContentChunk -> do E.orderBy [E.asc $ E.random_ @Int64] - return fileContent + return fileContentChunk takeLimit n | n <= 0 = return () takeLimit n = do diff --git a/src/Handler/Sheet/PersonalisedFiles.hs b/src/Handler/Sheet/PersonalisedFiles.hs index 362c73d29..91da0bcc2 100644 --- a/src/Handler/Sheet/PersonalisedFiles.hs +++ b/src/Handler/Sheet/PersonalisedFiles.hs @@ -55,7 +55,7 @@ data PersonalisedSheetFileUnresolved a = PSFUnresolvedDirectory a | PSFUnresolvedCollatable Text a | PSFUnresolved a - deriving (Eq, Ord, Read, Show, Generic, Typeable) + deriving (Eq, Ord, Read, Show, Functor, Foldable, Traversable, Generic, Typeable) makePrisms ''PersonalisedSheetFileUnresolved @@ -195,7 +195,7 @@ sourcePersonalisedSheetFiles :: forall m. -> Maybe SheetId -> Maybe (Set UserId) -> PersonalisedSheetFilesDownloadAnonymous - -> ConduitT () (Either PersonalisedSheetFile File) (SqlPersistT m) () + -> ConduitT () (Either PersonalisedSheetFile DBFile) (SqlPersistT m) () sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do (mbIdx, cIDKey) <- lift . newPersonalisedFilesKey $ maybe (Left cId) Right mbsid let @@ -255,9 +255,10 @@ sourcePersonalisedSheetFiles cId mbsid mbuids anonMode = do , fileModified = courseParticipantRegistration } yieldM . fmap Right $ do - fileContent <- lift $ Just . toStrict <$> formatPersonalisedSheetFilesMeta anonMode cPart cID + fileContent' <- lift $ formatPersonalisedSheetFilesMeta anonMode cPart cID let fileTitle = (dirName ) . ensureExtension "yaml" . unpack . mr $ MsgPersonalisedSheetFilesMetaFilename cID fileModified = courseParticipantRegistration + fileContent = Just $ C.sourceLazy fileContent' return File{..} _dirCache %= Set.insert dirName whenIsJust mbPFile $ \(Entity _ pFile@PersonalisedSheetFile{..}) -> do diff --git a/src/Handler/Submission/Download.hs b/src/Handler/Submission/Download.hs index 99f2f4457..6d55dfa62 100644 --- a/src/Handler/Submission/Download.hs +++ b/src/Handler/Submission/Download.hs @@ -11,8 +11,6 @@ import Handler.Utils.Submission import qualified Data.Set as Set -import qualified Data.Text.Encoding as Text - import qualified Database.Esqueleto as E import qualified Data.Conduit.Combinators as Conduit @@ -32,9 +30,8 @@ getSubDownloadR tid ssh csh shn cID (submissionFileTypeIsUpdate -> isUpdate) pat case isRating of True - | isUpdate -> runDB $ do - file <- runMaybeT $ lift . ratingFile cID =<< MaybeT (getRating submissionID) - maybe notFound (return . toTypedContent . Text.decodeUtf8) $ fileContent =<< file + | isUpdate -> maybe notFound sendThisFile <=< runDB . runMaybeT $ + lift . ratingFile cID =<< MaybeT (getRating submissionID) | otherwise -> notFound False -> do let results = (.| Conduit.map entityVal) . E.selectSource . E.from $ \sf -> do diff --git a/src/Handler/Utils.hs b/src/Handler/Utils.hs index b2b99ac45..c4ddd7db3 100644 --- a/src/Handler/Utils.hs +++ b/src/Handler/Utils.hs @@ -34,11 +34,13 @@ import Control.Monad.Logger -- | Simply send a `File`-Value -sendThisFile :: File -> Handler TypedContent +sendThisFile :: DBFile -> Handler TypedContent sendThisFile File{..} | Just fileContent' <- fileContent = do setContentDisposition' . Just $ takeFileName fileTitle - return $ TypedContent (simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8") (toContent fileContent') + let cType = simpleContentType (mimeLookup $ pack fileTitle) <> "; charset=utf-8" + respondSourceDB cType $ + fileContent' .| Conduit.map toFlushBuilder | otherwise = sendResponseStatus noContent204 () -- | Serve a single file, identified through a given DB query @@ -46,7 +48,7 @@ serveOneFile :: forall file. HasFileReference file => ConduitT () file (YesodDB serveOneFile source = do results <- runDB . runConduit $ source .| Conduit.take 2 -- We don't need more than two files to make a decision below case results of - [file] -> sendThisFile =<< runDB (sourceFile' file) + [file] -> sendThisFile $ sourceFile' file [] -> notFound _other -> do $logErrorS "SFileR" "Multiple matching files found." @@ -58,7 +60,7 @@ serveOneFile source = do serveSomeFiles :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent serveSomeFiles archiveName source = serveSomeFiles' archiveName $ source .| C.map Left -serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent +serveSomeFiles' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent serveSomeFiles' archiveName source = do (source', results) <- runDB $ runPeekN 2 source @@ -66,7 +68,7 @@ serveSomeFiles' archiveName source = do case results of [] -> notFound - [file] -> sendThisFile =<< either (runDB . sourceFile') return file + [file] -> sendThisFile $ either sourceFile' id file _moreFiles -> do setContentDisposition' $ Just archiveName respondSourceDB typeZip $ do @@ -79,7 +81,7 @@ serveSomeFiles' archiveName source = do serveZipArchive :: forall file. HasFileReference file => FilePath -> ConduitT () file (YesodDB UniWorX) () -> Handler TypedContent serveZipArchive archiveName source = serveZipArchive' archiveName $ source .| C.map Left -serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file File) (YesodDB UniWorX) () -> Handler TypedContent +serveZipArchive' :: forall file. HasFileReference file => FilePath -> ConduitT () (Either file DBFile) (YesodDB UniWorX) () -> Handler TypedContent serveZipArchive' archiveName source = do (source', results) <- runDB $ runPeekN 1 source diff --git a/src/Handler/Utils/Allocation.hs b/src/Handler/Utils/Allocation.hs index 510e9fc70..db512e25a 100644 --- a/src/Handler/Utils/Allocation.hs +++ b/src/Handler/Utils/Allocation.hs @@ -276,7 +276,7 @@ storeAllocationResult :: AllocationId -> (AllocationFingerprint, Set (UserId, CourseId), Seq MatchingLogRun) -> DB () storeAllocationResult allocId now (allocFp, allocMatchings, ppMatchingLog -> allocLog) = do - FileReference{..} <- sinkFile $ File "matchings.log" (Just $ encodeUtf8 allocLog) now + FileReference{..} <- sinkFile $ File "matchings.log" (Just . yield $ encodeUtf8 allocLog) now insert_ . AllocationMatching allocId allocFp now $ fromMaybe (error "allocation result stored without fileReferenceContent") fileReferenceContent doAllocation allocId now allocMatchings diff --git a/src/Handler/Utils/DateTime.hs b/src/Handler/Utils/DateTime.hs index 7fd3c4c54..a321bebff 100644 --- a/src/Handler/Utils/DateTime.hs +++ b/src/Handler/Utils/DateTime.hs @@ -2,7 +2,7 @@ module Handler.Utils.DateTime ( utcToLocalTime, utcToZonedTime - , localTimeToUTC, TZ.LocalToUTCResult(..) + , localTimeToUTC, TZ.LocalToUTCResult(..), localTimeToUTCSimple , toTimeOfDay , toMidnight, beforeMidnight, toMidday, toMorning , formatDiffDays @@ -47,6 +47,9 @@ utcToZonedTime = ZonedTime <$> TZ.utcToLocalTimeTZ appTZ <*> TZ.timeZoneForUTCTi localTimeToUTC :: LocalTime -> LocalToUTCResult localTimeToUTC = TZ.localTimeToUTCFull appTZ +localTimeToUTCSimple :: LocalTime -> UTCTime +localTimeToUTCSimple = TZ.localTimeToUTCTZ appTZ + -- | Local midnight of given day toMidnight :: Day -> UTCTime toMidnight = toTimeOfDay 0 0 0 diff --git a/src/Handler/Utils/Files.hs b/src/Handler/Utils/Files.hs index 9400a9a4b..d53cc24ec 100644 --- a/src/Handler/Utils/Files.hs +++ b/src/Handler/Utils/Files.hs @@ -2,17 +2,22 @@ module Handler.Utils.Files ( sourceFile, sourceFile' , sourceFiles, sourceFiles' , SourceFilesException(..) + , sourceFileDB + , acceptFile ) where import Import import qualified Data.Conduit.Combinators as C +import qualified Data.Conduit.List as C (unfoldM) import Handler.Utils.Minio import qualified Network.Minio as Minio -import qualified Data.ByteString.Base64.URL as Base64 -import qualified Data.ByteArray as ByteArray +import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.Utils as E + +import System.FilePath (normalise) data SourceFilesException @@ -22,36 +27,72 @@ data SourceFilesException deriving anyclass (Exception) -sourceFiles :: ConduitT FileReference File (YesodDB UniWorX) () -sourceFiles = C.mapM sourceFile +sourceFileDB :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) + => FileContentReference -> ConduitT () ByteString (SqlPersistT m) () +sourceFileDB fileReference = do + dbChunksize <- getsYesod $ view _appFileUploadDBChunksize + let retrieveChunk chunkHash = \case + Nothing -> return Nothing + Just start -> do + chunk <- E.selectMaybe . E.from $ \fileContentChunk -> do + E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash + return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) + case chunk of + Nothing -> throwM SourceFilesContentUnavailable + Just (E.Value c) -> return . Just . (c, ) $ if + | olength c >= dbChunksize -> Just $ start + dbChunksize + | otherwise -> Nothing + chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ] + return $ fileContentEntry E.^. FileContentEntryChunkHash + chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int)) -sourceFile :: FileReference -> DB File -sourceFile FileReference{..} = do - mFileContent <- traverse get $ FileContentKey <$> fileReferenceContent - fileContent <- if - | is (_Just . _Nothing) mFileContent - , Just fileContentHash <- fileReferenceContent -- Not a restriction - -> maybeT (throwM SourceFilesContentUnavailable) $ do - let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash - uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket - fmap Just . hoistMaybe <=< runAppMinio . runMaybeT $ do - objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions - lift . runConduit $ Minio.gorObjectStream objRes .| C.fold - | fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent - -> throwM SourceFilesMismatchedHashes - | Just fileContent' <- fileContentContent <$> join mFileContent - -> return $ Just fileContent' - | otherwise - -> return Nothing - return File - { fileTitle = fileReferenceTitle - , fileContent - , fileModified = fileReferenceModified - } +sourceFiles :: Monad m => ConduitT FileReference DBFile m () +sourceFiles = C.map sourceFile -sourceFiles' :: forall file. HasFileReference file => ConduitT file File (YesodDB UniWorX) () -sourceFiles' = C.mapM sourceFile' +sourceFile :: FileReference -> DBFile +sourceFile FileReference{..} = File + { fileTitle = fileReferenceTitle + , fileModified = fileReferenceModified + , fileContent = toFileContent <$> fileReferenceContent + } + where + toFileContent fileReference + | fileReference == $$(liftTyped $ FileContentReference $$(emptyHash)) + = return () + toFileContent fileReference = do + inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference + if + | inDB -> sourceFileDB fileReference + | otherwise -> do + chunkVar <- newEmptyTMVarIO + minioAsync <- lift . allocateLinkedAsync $ + maybeT (throwM SourceFilesContentUnavailable) $ do + let uploadName = minioFileReference # fileReference + uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket + hoistMaybe <=< runAppMinio . runMaybeT $ do + objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions + lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just) + atomically $ putTMVar chunkVar Nothing + let go = do + mChunk <- atomically $ readTMVar chunkVar + case mChunk of + Nothing -> waitAsync minioAsync + Just chunk -> yield chunk >> go + in go -sourceFile' :: forall file. HasFileReference file => file -> DB File +sourceFiles' :: forall file m. (HasFileReference file, Monad m) => ConduitT file DBFile m () +sourceFiles' = C.map sourceFile' + +sourceFile' :: forall file. HasFileReference file => file -> DBFile sourceFile' = sourceFile . view (_FileReference . _1) + + +acceptFile :: (MonadResource m, MonadResource m') => FileInfo -> m (File m') +acceptFile fInfo = do + let fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise . unpack $ fileName fInfo + fileContent = Just $ fileSource fInfo + fileModified <- liftIO getCurrentTime + return File{..} diff --git a/src/Handler/Utils/Form.hs b/src/Handler/Utils/Form.hs index 68f9f68a0..4be16133b 100644 --- a/src/Handler/Utils/Form.hs +++ b/src/Handler/Utils/Form.hs @@ -32,7 +32,7 @@ import Yesod.Form.Bootstrap3 import Handler.Utils.Zip import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.List as C (mapMaybe) +import qualified Data.Conduit.List as C (mapMaybe, mapMaybeM) import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E @@ -831,7 +831,10 @@ pseudonymWordField = checkMMap doCheck id $ ciField & addDatalist (return $ mkOp uploadContents :: (MonadHandler m, HandlerSite m ~ UniWorX) => ConduitT FileReference ByteString m () -uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybe fileContent +uploadContents = transPipe (liftHandler . runDB) sourceFiles .| C.mapMaybeM fileContent' + where fileContent' f = runMaybeT $ do + File{fileContent = Just fc} <- return f + liftHandler . runDB . runConduit $ fc .| C.fold data FileFieldUserOption a = FileFieldUserOption { fieldOptionForce :: Bool @@ -893,11 +896,21 @@ genericFileField mkOpts = Field{..} , Map.filter (views _3 $ (&&) <$> fieldOptionForce <*> fieldOptionDefault) fieldAdditionalFiles ] - handleUpload :: FileField -> Maybe Text -> ConduitT File FileReference (YesodDB UniWorX) () + handleUpload :: FileField -> Maybe Text -> ConduitT (File Handler) FileReference (YesodDB UniWorX) () handleUpload FileField{fieldMaxFileSize} mIdent - = C.filter (\File{..} -> maybe (const True) (>) fieldMaxFileSize $ maybe 0 (fromIntegral . olength) fileContent) - .| sinkFiles - .| C.mapM mkSessionFile + = C.map (transFile liftHandler) + .| C.mapMaybeM (\f@File{..} -> maybeT (return $ Just f) $ do + maxSize <- fromIntegral <$> hoistMaybe fieldMaxFileSize + fc <- hoistMaybe fileContent + let peekNE n = do + str <- C.takeE n .| C.fold + leftover str + yield str + (unsealConduitT -> fc', size) <- lift $ fc $$+ peekNE (succ maxSize) .| C.lengthE + return . guardOn (size <= maxSize) $ f { fileContent = Just fc' } + ) + .| sinkFiles + .| C.mapM mkSessionFile where mkSessionFile fRef@FileReference{..} = fRef <$ do now <- liftIO getCurrentTime @@ -924,7 +937,7 @@ genericFileField mkOpts = Field{..} doUnpack | fieldOptionForce fieldUnpackZips = fieldOptionDefault fieldUnpackZips | otherwise = unpackZips `elem` vals - handleFile :: FileInfo -> ConduitT () File Handler () + handleFile :: FileInfo -> ConduitT () (File Handler) Handler () handleFile | doUnpack = receiveFiles | otherwise = yieldM . acceptFile diff --git a/src/Handler/Utils/Mail.hs b/src/Handler/Utils/Mail.hs index 85038a894..18785044c 100644 --- a/src/Handler/Utils/Mail.hs +++ b/src/Handler/Utils/Mail.hs @@ -12,9 +12,7 @@ import Handler.Utils.Files import qualified Data.CaseInsensitive as CI -import qualified Data.ByteString.Lazy as LBS - -import qualified Data.Conduit.List as C +import qualified Data.Conduit.Combinators as C import qualified Text.Pandoc as P @@ -72,12 +70,13 @@ addFileDB :: ( MonadMail m , HandlerSite m ~ UniWorX ) => FileReference -> m (Maybe MailObjectId) addFileDB fRef = runMaybeT $ do - File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent} <- lift . liftHandler . runDB $ sourceFile fRef + File{fileTitle = pack . takeBaseName -> fileName, fileContent = Just fileContent'} <- return $ sourceFile fRef + fileContent <- liftHandler . runDB . runConduit $ fileContent' .| C.sinkLazy lift . addPart $ do _partType .= decodeUtf8 (mimeLookup fileName) _partEncoding .= Base64 _partDisposition .= AttachmentDisposition fileName - _partContent .= PartContent (LBS.fromStrict fileContent) + _partContent .= PartContent fileContent setMailObjectIdPseudorandom (fileName, fileContent) :: StateT Part (HandlerFor UniWorX) MailObjectId diff --git a/src/Handler/Utils/Rating.hs b/src/Handler/Utils/Rating.hs index 5a82c8921..ef65892b0 100644 --- a/src/Handler/Utils/Rating.hs +++ b/src/Handler/Utils/Rating.hs @@ -16,11 +16,9 @@ import Handler.Utils.DateTime (getDateTimeFormatter) import qualified Data.Text as Text -import qualified Data.ByteString.Lazy as Lazy.ByteString - import qualified Database.Esqueleto as E -import qualified Data.Conduit.List as Conduit +import qualified Data.Conduit.Combinators as C import Handler.Utils.Rating.Format @@ -91,15 +89,16 @@ extensionRating = "txt" ratingFile :: ( MonadHandler m , HandlerSite m ~ UniWorX + , Monad m' ) - => CryptoFileNameSubmission -> Rating -> m File + => CryptoFileNameSubmission -> Rating -> m (File m') ratingFile cID rating@Rating{ ratingValues = Rating'{..} } = do mr'@(MsgRenderer mr) <- getMsgRenderer dtFmt <- getDateTimeFormatter fileModified <- maybe (liftIO getCurrentTime) return ratingTime let fileTitle = ensureExtension extensionRating . unpack . mr $ MsgRatingFileTitle cID - fileContent = Just . Lazy.ByteString.toStrict $ formatRating mr' dtFmt cID rating + fileContent = Just . C.sourceLazy $ formatRating mr' dtFmt cID rating return File{..} type SubmissionContent = Either FileReference (SubmissionId, Rating') @@ -107,13 +106,12 @@ type SubmissionContent = Either FileReference (SubmissionId, Rating') extractRatings :: ( MonadHandler m , HandlerSite m ~ UniWorX ) => ConduitT FileReference SubmissionContent m () -extractRatings = Conduit.mapM $ \fRef@FileReference{..} -> liftHandler $ do +extractRatings = C.mapM $ \fRef@FileReference{..} -> liftHandler $ do msId <- isRatingFile fileReferenceTitle if | Just sId <- msId , isJust fileReferenceContent -> do - f <- runDB $ sourceFile fRef - (rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) $ parseRating f + (rating, cID) <- handle (throwM . RatingFileException fileReferenceTitle) . runDB . parseRating $ sourceFile fRef sId' <- traverse decrypt cID unless (maybe (const True) (==) sId' sId) $ throwM $ RatingFileException fileReferenceTitle RatingSubmissionIDIncorrect diff --git a/src/Handler/Utils/Rating/Format.hs b/src/Handler/Utils/Rating/Format.hs index fd8ab2fb5..e7cd03568 100644 --- a/src/Handler/Utils/Rating/Format.hs +++ b/src/Handler/Utils/Rating/Format.hs @@ -35,6 +35,8 @@ import qualified System.FilePath.Cryptographic as Explicit import Control.Exception (ErrorCall(..)) +import qualified Data.Conduit.Combinators as C + data PrettifyState = PrettifyInitial @@ -195,8 +197,9 @@ instance ns ~ CryptoIDNamespace (CI FilePath) SubmissionId => YAML.FromYAML (May ) -parseRating :: MonadCatch m => File -> m (Rating', Maybe CryptoFileNameSubmission) -parseRating f@File{ fileContent = Just (fromStrict -> input), .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do +parseRating :: MonadCatch m => File m -> m (Rating', Maybe CryptoFileNameSubmission) +parseRating f@File{ fileContent = Just input', .. } = handle onFailure . handle (throwM . RatingParseException) . handleIf isYAMLUnicodeError (\(ErrorCall msg) -> throwM $ RatingYAMLNotUnicode msg) $ do + input <- runConduit $ input' .| C.sinkLazy let evStream = YAML.Event.parseEvents input delimitDocument = do ev <- maybe (throwM RatingYAMLStreamTerminatedUnexpectedly) return =<< await diff --git a/src/Handler/Utils/Rating/Format/Legacy.hs b/src/Handler/Utils/Rating/Format/Legacy.hs index bd523d06a..c027e1b07 100644 --- a/src/Handler/Utils/Rating/Format/Legacy.hs +++ b/src/Handler/Utils/Rating/Format/Legacy.hs @@ -16,6 +16,8 @@ import qualified Data.ByteString.Lazy as Lazy (ByteString) import qualified Data.CaseInsensitive as CI +import qualified Data.Conduit.Combinators as C + import Text.Read (readEither) @@ -55,9 +57,9 @@ formatRating cID Rating{ ratingValues = Rating'{..}, ..} = let ] in Lazy.Text.encodeUtf8 . (<> "\n") $ displayT doc -parseRating :: MonadCatch m => File -> m Rating' +parseRating :: MonadCatch m => File m -> m Rating' parseRating File{ fileContent = Just input, .. } = handle (throwM . RatingParseLegacyException) $ do - inputText <- either (throwM . RatingNotUnicode) return $ Text.decodeUtf8' input + inputText <- either (throwM . RatingNotUnicode) return . Text.decodeUtf8' =<< runConduit (input .| C.fold) let (headerLines', commentLines) = break (commentSep `Text.isInfixOf`) $ Text.lines inputText (reverse -> ratingLines, reverse -> _headerLines) = break (sep' `Text.isInfixOf`) $ reverse headerLines' diff --git a/src/Handler/Utils/Submission.hs b/src/Handler/Utils/Submission.hs index 13ac4d067..2dcac8f28 100644 --- a/src/Handler/Utils/Submission.hs +++ b/src/Handler/Utils/Submission.hs @@ -256,7 +256,7 @@ planSubmissions sid restriction = do maximumsBy f xs = flip Set.filter xs $ \x -> maybe True (((==) `on` f) x . maximumBy (comparing f)) $ fromNullable xs -submissionFileSource :: SubmissionId -> ConduitT () File (YesodDB UniWorX) () +submissionFileSource :: SubmissionId -> ConduitT () DBFile (YesodDB UniWorX) () submissionFileSource subId = E.selectSource (E.from $ submissionFileQuery subId) .| C.map entityVal .| sourceFiles' @@ -319,7 +319,7 @@ submissionMultiArchive anonymous (Set.toList -> ids) = do setContentDisposition' $ Just ((addExtension `on` unpack) (mr archiveName) extensionZip) respondSource typeZip . (<* lift cleanup) . transPipe (runDBRunner dbrunner) $ do let - fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () File (YesodDB UniWorX) () + fileEntitySource' :: (Rating, Entity Submission, Maybe UTCTime, (SheetName,CourseShorthand,SchoolId,TermId,Bool)) -> ConduitT () DBFile (YesodDB UniWorX) () fileEntitySource' (rating, Entity submissionID Submission{}, subTime, (shn,csh,ssh,tid,sheetAnonymous)) = do cID <- encrypt submissionID diff --git a/src/Handler/Utils/Zip.hs b/src/Handler/Utils/Zip.hs index 255d8f93b..70147cbd2 100644 --- a/src/Handler/Utils/Zip.hs +++ b/src/Handler/Utils/Zip.hs @@ -13,17 +13,14 @@ module Handler.Utils.Zip import Import +import Handler.Utils.Files (acceptFile) +import Handler.Utils.DateTime (localTimeToUTCSimple, utcToLocalTime) + import Codec.Archive.Zip.Conduit.Types import Codec.Archive.Zip.Conduit.UnZip import Codec.Archive.Zip.Conduit.Zip --- import qualified Data.ByteString.Lazy as Lazy (ByteString) -import qualified Data.ByteString.Lazy as Lazy.ByteString - -import qualified Data.ByteString as ByteString - import System.FilePath -import Data.Time.LocalTime (localTimeToUTC, utcToLocalTime) import Data.List (dropWhileEnd) @@ -38,6 +35,10 @@ import Data.Encoding ( decodeStrictByteStringExplicit import Data.Encoding.CP437 import qualified Data.Char as Char +import Control.Monad.Trans.Cont +import Control.Monad.Trans.State.Strict (evalStateT) +import qualified Control.Monad.State.Class as State + typeZip :: ContentType typeZip = "application/zip" @@ -53,94 +54,157 @@ instance Default ZipInfo where } -consumeZip :: forall b m. - ( MonadThrow b - , MonadThrow m - , MonadBase b m - , PrimMonad b - ) - => ConduitT ByteString File m ZipInfo -consumeZip = transPipe liftBase unZipStream `fuseUpstream` consumeZip' - where - consumeZip' :: ConduitT (Either ZipEntry ByteString) File m () - consumeZip' = do - input <- await - case input of - Nothing -> return () - Just (Right _) -> throwM $ userError "Data chunk in unexpected place when parsing ZIP" - Just (Left ZipEntry{..}) -> do - contentChunks <- toConsumer accContents - zipEntryName' <- decodeZipEntryName zipEntryName - let - fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise $ makeValid zipEntryName' - fileModified = localTimeToUTC utc zipEntryTime - fileContent - | hasTrailingPathSeparator zipEntryName' = Nothing - | otherwise = Just $ mconcat contentChunks - yield File{..} - consumeZip' - accContents :: ConduitT (Either a b') Void m [b'] - accContents = do - input <- await - case input of - Just (Right x) -> (x :) <$> accContents - Just (Left x) -> [] <$ leftover (Left x) - _ -> return [] +data ConsumeZipException + = ConsumeZipUnZipException SomeException + | ConsumeZipUnexpectedContent + deriving (Show, Generic, Typeable) + deriving anyclass (Exception) -produceZip :: forall b m. - ( MonadThrow b - , MonadThrow m - , MonadBase b m - , PrimMonad b + +consumeZip :: forall m m'. + ( MonadThrow m + , PrimMonad m + , MonadUnliftIO m + , MonadResource m + , MonadIO m' + , MonadThrow m' + ) + => ConduitT () ByteString m () -> ConduitT () (File m') m ZipInfo +consumeZip inpBS = do + inpChunk <- liftIO newEmptyTMVarIO + zipAsync <- lift . allocateLinkedAsync $ + runConduit $ (inpBS .| unZipStream) `fuseUpstream` C.mapM_ (atomically . putTMVar inpChunk) + + flip evalStateT Nothing . evalContT . callCC $ \finishConsume -> forever $ do + inpChunk' <- atomically $ + Right <$> takeTMVar inpChunk + <|> Left <$> waitCatchSTM zipAsync + + fileSink <- State.get + case (fileSink, inpChunk') of + (mFSink , Left (Left unzipExc) ) -> do + for_ mFSink $ \fSink' -> atomically $ do + writeTMChan fSink' $ Left unzipExc + closeTMChan fSink' + throwM unzipExc + + (mFSink , Left (Right zInfo) ) -> do + for_ mFSink $ atomically . closeTMChan + finishConsume zInfo + + (Just fSink, Right (Right bs) ) -> + atomically . writeTMChan fSink $ Right bs + + (Nothing , Right (Right _) ) -> + throwM ConsumeZipUnexpectedContent + + (mFSink , Right (Left ZipEntry{..})) -> do + for_ mFSink $ atomically . closeTMChan + State.put Nothing + + zipEntryName' <- decodeZipEntryName zipEntryName + let + fileTitle = "." zipEntryName' + & normalise + & makeValid + & dropWhile isPathSeparator + & dropWhileEnd isPathSeparator + & normalise + & makeValid + fileModified = localTimeToUTCSimple zipEntryTime + isDirectory = hasTrailingPathSeparator zipEntryName' + fileContent <- if + | isDirectory -> return Nothing + | otherwise -> do + fileChan <- liftIO newTMChanIO + State.put $ Just fileChan + return . Just . evalContT . callCC $ \finishFileContent -> forever $ do + nextVal <- atomically $ asum + [ readTMChan fileChan + , do + inpChunk'' <- Right <$> takeTMVar inpChunk + <|> Left <$> waitCatchSTM zipAsync + case inpChunk'' of + Left (Left unzipExc) -> return . Just $ Left unzipExc + Left (Right _ ) -> return Nothing + Right (Left zInfo ) -> Nothing <$ putTMVar inpChunk (Left zInfo) + Right (Right bs ) -> return . Just $ Right bs + ] + case nextVal of + Nothing -> finishFileContent () + Just (Right bs) -> lift $ yield bs + Just (Left exc) -> throwM $ ConsumeZipUnZipException exc + lift . lift $ yield File{..} + +produceZip :: forall m. + ( MonadThrow m + , PrimMonad m ) => ZipInfo - -> ConduitT File ByteString m () -produceZip info = C.map toZipData .| transPipe liftBase (void $ zipStream zipOptions) + -> ConduitT (File m) ByteString m () +produceZip info = C.map toZipData .| void (zipStream zipOptions) where zipOptions = ZipOptions - { zipOpt64 = True - , zipOptCompressLevel = -1 -- This is passed through all the way to the C zlib, where it means "default level" + { zipOpt64 = False + , zipOptCompressLevel = defaultCompression , zipOptInfo = info } - toZipData :: File -> (ZipEntry, ZipData b) - toZipData f@File{..} = - let zData = maybe mempty (ZipDataByteString . Lazy.ByteString.fromStrict) fileContent - zEntry = (toZipEntry f){ zipEntrySize = fromIntegral . ByteString.length <$> fileContent } - in (zEntry, zData) + -- toZipData :: forall v. File m -> ConduitT v (ZipEntry, ZipData m) m () + -- toZipData f + -- | Just fc <- fileContent f = do + -- outpChunk <- newEmptyTMVarIO + -- outpAsync <- lift . allocateLinkedAsync $ + -- runConduit $ fc .| C.mapM_ (atomically . putTMVar outpChunk) + -- yield ( toZipEntry f + -- , ZipDataSource . evalContT . callCC $ \finishContent -> forever $ do + -- nextVal <- atomically $ + -- Right <$> takeTMVar outpChunk + -- <|> Left <$> waitCatchSTM outpAsync + -- case nextVal of + -- Right chunk -> lift $ yield chunk + -- Left (Right () ) -> finishContent () + -- Left (Left exc) -> throwM exc + -- ) + -- | otherwise = yield (toZipEntry f, mempty) - toZipEntry :: File -> ZipEntry + toZipData :: File m -> (ZipEntry, ZipData m) + toZipData f@File{..} + = (toZipEntry f, maybe mempty ZipDataSource fileContent) + + toZipEntry :: File m -> ZipEntry toZipEntry File{..} = ZipEntry{..} where - isDir = isNothing fileContent + isDir = is _Nothing fileContent - zipEntryName = encodeZipEntryName . bool (dropWhileEnd isPathSeparator) addTrailingPathSeparator isDir . normalise $ makeValid fileTitle - zipEntryTime = utcToLocalTime utc fileModified + zipEntryName = "." fileTitle + & normalise + & makeValid + & dropWhile isPathSeparator + & dropWhileEnd isPathSeparator + & bool id addTrailingPathSeparator isDir + & normalise + & makeValid + & encodeZipEntryName + zipEntryTime = utcToLocalTime fileModified zipEntrySize = Nothing zipEntryExternalAttributes = Nothing -modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT File File m () +modifyFileTitle :: Monad m => (FilePath -> FilePath) -> ConduitT (File m') (File m') m () modifyFileTitle f = mapC $ \x@File{..} -> x{ fileTitle = f fileTitle } -- Takes FileInfo and if it is a ZIP-Archive, extract files, otherwiese yield fileinfo -receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, MonadBase IO m) => FileInfo -> ConduitT () File m () +receiveFiles :: (MonadLogger m, MonadResource m, MonadThrow m, PrimMonad m, MonadUnliftIO m, MonadResource m', MonadThrow m') => FileInfo -> ConduitT () (File m') m () receiveFiles fInfo | ((==) `on` simpleContentType) mimeType typeZip = do $logInfoS "sourceFiles" "Unpacking ZIP" - fileSource fInfo .| void consumeZip + void . consumeZip $ fileSource fInfo | otherwise = do $logDebugS "sourceFiles" [st|Not unpacking file of type #{decodeUtf8 mimeType}|] yieldM $ acceptFile fInfo where mimeType = mimeLookup $ fileName fInfo -acceptFile :: MonadResource m => FileInfo -> m File -acceptFile fInfo = do - let fileTitle = dropWhile isPathSeparator . dropTrailingPathSeparator . normalise . unpack $ fileName fInfo - fileModified <- liftIO getCurrentTime - fileContent <- fmap Just . runConduit $ fileSource fInfo .| foldC - return File{..} - decodeZipEntryName :: MonadThrow m => Either Text ByteString -> m FilePath -- ^ Extract the filename from a 'ZipEntry' doing decoding along the way. diff --git a/src/Import.hs b/src/Import.hs index b387f44ad..b18a9fe1e 100644 --- a/src/Import.hs +++ b/src/Import.hs @@ -4,6 +4,7 @@ module Import import Foundation as Import import Import.NoFoundation as Import +import Model.Migration as Import import Utils.SystemMessage as Import import Utils.Metrics as Import diff --git a/src/Import/NoFoundation.hs b/src/Import/NoFoundation.hs index 0fdedc192..b86de7350 100644 --- a/src/Import/NoFoundation.hs +++ b/src/Import/NoFoundation.hs @@ -4,7 +4,6 @@ module Import.NoFoundation import Import.NoModel as Import import Model as Import -import Model.Migration as Import import Model.Rating as Import import Model.Submission as Import import Model.Tokens as Import diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index ed39106a0..7c03533a4 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -128,6 +128,8 @@ import Data.Proxy as Import (Proxy(..)) import Data.List.PointedList as Import (PointedList) +import Language.Haskell.TH.Syntax as Import (Lift(liftTyped)) + import Language.Haskell.TH.Instances as Import () import Data.NonNull.Instances as Import () import Data.Monoid.Instances as Import () diff --git a/src/Jobs.hs b/src/Jobs.hs index 320b9f56a..7e849400f 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -5,7 +5,7 @@ module Jobs , stopJobCtl ) where -import Import +import Import hiding (StateT) import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab @@ -30,6 +30,7 @@ import qualified Data.Map.Strict as Map import Data.Map.Strict ((!)) import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST) +import Control.Monad.Trans.State.Strict (StateT, evalStateT) import qualified Control.Monad.State.Class as State import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Cont (ContT(..), callCC) @@ -99,6 +100,7 @@ handleJobs foundation@UniWorX{..} jobConfirm <- liftIO $ newTVarIO HashMap.empty jobShutdown <- liftIO newEmptyTMVarIO jobCurrentCrontab <- liftIO $ newTVarIO Nothing + jobHeldLocks <- liftIO $ newTVarIO Set.empty atomically $ putTMVar appJobState JobState { jobContext = JobContext{..} , .. @@ -114,8 +116,9 @@ manageCrontab foundation@UniWorX{..} unmask = do jState <- atomically $ readTMVar appJobState liftIO . unsafeHandler foundation . void $ do atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState - runReaderT ?? foundation $ - writeJobCtlBlock JobCtlDetermineCrontab + when (has (_appJobCronInterval . _Just) foundation) $ + runReaderT ?? foundation $ + writeJobCtlBlock JobCtlDetermineCrontab void $ evalRWST (forever execCrontab) jState HashMap.empty let awaitTermination = guardM $ @@ -414,13 +417,14 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) handleCmd (JobCtlQueue job) = lift $ queueJob' job - handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do + handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j + $logInfoS logIdent $ tshow content $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content instanceID' <- getsYesod $ view instanceID @@ -466,40 +470,45 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do . Set.filter (((/=) `on` classifyHealthReport) newReport . snd) atomically . modifyTVar' hrStorage $ force . updateReports -jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a -jLocked jId act = do - hasLock <- liftIO $ newTVarIO False - +jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a +jLocked jId act = flip evalStateT False $ do let - lock = runDB . setSerializable $ do - qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId + lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob) + lock = hoist (hoist $ runDB . setSerializable) $ do + qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId instanceID' <- getsYesod $ view instanceID threshold <- getsYesod $ view _appJobStaleThreshold now <- liftIO getCurrentTime + heldLocks <- asks jobHeldLocks + isHeld <- (jId `Set.member`) <$> readTVarIO heldLocks hadStale <- maybeT (return False) $ do lockTime <- MaybeT $ return queuedJobLockTime lockInstance <- MaybeT $ return queuedJobLockInstance if | lockInstance == instanceID' , diffUTCTime now lockTime >= threshold + , not isHeld -> return True | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj) - val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID' - , QueuedJobLockTime =. Just now - ] - liftIO . atomically $ writeTVar hasLock True - return val + State.put True + val <- lift . lift $ updateGet jId [ QueuedJobLockInstance =. Just instanceID' + , QueuedJobLockTime =. Just now + ] + atomically . modifyTVar' heldLocks $ Set.insert jId + return $ Entity jId val - unlock = whenM (readTVarIO hasLock) $ - runDB . setSerializable $ - update jId [ QueuedJobLockInstance =. Nothing - , QueuedJobLockTime =. Nothing - ] + unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) () + unlock (Entity jId' _) = whenM State.get $ do + atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks + lift . lift . runDB . setSerializable $ + update jId' [ QueuedJobLockInstance =. Nothing + , QueuedJobLockTime =. Nothing + ] - bracket lock (const unlock) act + bracket lock unlock $ lift . act pruneLastExecs :: Crontab JobCtl -> DB () diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index 1624060c2..8127e16e8 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -40,22 +40,14 @@ determineCrontab = execWriterT $ do } Nothing -> return () - tell $ HashMap.singleton - JobCtlDetermineCrontab - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = appJobCronInterval - , cronNotAfter = Right CronNotScheduled - } - whenIsJust appPruneUnreferencedFiles $ \pInterval -> + whenIsJust appJobCronInterval $ \interval -> tell $ HashMap.singleton - (JobCtlQueue JobPruneUnreferencedFiles) + JobCtlDetermineCrontab Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = pInterval - , cronNotAfter = Right CronNotScheduled + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = interval + , cronNotAfter = Right CronNotScheduled } oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1] @@ -98,6 +90,15 @@ determineCrontab = execWriterT $ do , cronRateLimit = iInterval , cronNotAfter = Right CronNotScheduled } + whenIsJust appRechunkFiles $ \rInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobRechunkFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = rInterval + , cronNotAfter = Right CronNotScheduled + } tell . flip foldMap universeF $ \kind -> case appHealthCheckInterval kind of @@ -138,33 +139,31 @@ determineCrontab = execWriterT $ do , cronNotAfter = Right CronNotScheduled } + let + getNextIntervals within interval cInterval = do + now <- liftIO getPOSIXTime + return $ do + let + epochInterval = within / 2 + (currEpoch, epochNow) = now `divMod'` epochInterval + currInterval = epochNow `div'` interval + numIntervals = floor $ epochInterval / interval + n = ceiling $ 4 * cInterval / interval + i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ] + let + ((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals + nextIntervalTime + = posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval + return (nextEpoch, nextInterval, nextIntervalTime, numIntervals) if | is _Just appLdapConf - , is _Just appLdapConf , Just syncWithin <- appSynchroniseLdapUsersWithin + , Just cInterval <- appJobCronInterval -> do - now <- liftIO getPOSIXTime - let - epochInterval = syncWithin / 2 - interval = appSynchroniseLdapUsersInterval + nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval - (ldapEpoch, epochNow) = now `divMod'` epochInterval - ldapInterval = epochNow `div'` interval - numIntervals = floor $ epochInterval / interval - - nextIntervals = do - let - n = ceiling $ 4 * appJobCronInterval / appSynchroniseLdapUsersInterval - i <- [negate (ceiling $ n % 2) .. ceiling $ n % 2] - let - ((+ ldapEpoch) -> nextEpoch, nextInterval) = (ldapInterval + i) `divMod` numIntervals - nextIntervalTime - = posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval - return (nextEpoch, nextInterval, nextIntervalTime) - - forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime) -> do - $logDebugS "SynchroniseLdap" [st|currentTime: #{tshow ldapEpoch}.#{tshow epochNow}; upcomingSync: #{tshow nextEpoch}.#{tshow (fromInteger nextInterval * interval)}; upcomingData: #{tshow (numIntervals, nextEpoch, nextInterval)}|] + forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do tell $ HashMap.singleton (JobCtlQueue JobSynchroniseLdap { jEpoch = fromInteger nextEpoch @@ -180,6 +179,22 @@ determineCrontab = execWriterT $ do | otherwise -> return () + whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do + nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval + forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do + tell $ HashMap.singleton + (JobCtlQueue JobPruneUnreferencedFiles + { jEpoch = fromInteger nextEpoch + , jNumIterations = fromInteger numIntervals + , jIteration = fromInteger nextInterval + } + ) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime + , cronRepeat = CronRepeatNever + , cronRateLimit = appPruneUnreferencedFilesInterval + , cronNotAfter = Left within + } let sheetJobs (Entity nSheet Sheet{..}) = do diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 77e6337e2..55218e170 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -1,27 +1,36 @@ module Jobs.Handler.Files ( dispatchJobPruneSessionFiles , dispatchJobPruneUnreferencedFiles - , dispatchJobInjectFiles + , dispatchJobInjectFiles, dispatchJobRechunkFiles ) where -import Import hiding (matching) +import Import hiding (matching, maximumBy, init) import Database.Persist.Sql (deleteWhereCount) import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.PostgreSQL as E import qualified Database.Esqueleto.Utils as E -import qualified Database.Esqueleto.Internal.Sql as E (unsafeSqlCastAs) import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.List as C (mapMaybe) +import qualified Data.Conduit.List as C (mapMaybe, unfoldM) import Handler.Utils.Minio import qualified Network.Minio as Minio -import qualified Crypto.Hash as Crypto -import qualified Data.ByteString.Base64.URL as Base64 +import Crypto.Hash (hashDigestSize, digestFromByteString) -import Control.Monad.Memo (startEvalMemoT, memo) +import Data.List ((!!), unfoldr, maximumBy, init, genericLength) +import qualified Data.ByteString as ByteString +import Data.Bits (Bits(shiftR)) + +import qualified Data.Map.Strict as Map + +import Control.Monad.Random.Lazy +import System.Random.Shuffle (shuffleM) +import System.IO.Unsafe + +import Handler.Utils.Files (sourceFileDB) dispatchJobPruneSessionFiles :: JobHandler UniWorX @@ -44,72 +53,190 @@ fileReferences (E.just -> fHash) , E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash , E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash , E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash + , E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash + E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash) ] +{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-} +pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)]) +pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty -dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX -dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do +dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX +dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomic . hoist lift $ do now <- liftIO getCurrentTime - interval <- fmap (fmap $ max 0) . getsYesod $ view _appPruneUnreferencedFiles + interval <- getsYesod $ view _appPruneUnreferencedFilesInterval keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles - E.update $ \fileContent -> do - let isReferenced = E.any E.exists . fileReferences $ fileContent E.^. FileContentHash - now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now - shouldBe = E.bool (E.just . E.maybe now' (E.min now') $ fileContent E.^. FileContentUnreferencedSince) E.nothing isReferenced - E.set fileContent [ FileContentUnreferencedSince E.=. shouldBe ] + let + chunkHashBytes :: forall h. + ( Unwrapped FileContentChunkReference ~ Digest h ) + => Integer + chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h)) + chunkHashBits = chunkHashBytes * 8 + base :: Integer + base = 2 ^ chunkHashBits + intervals :: [Integer] + -- | Exclusive upper bounds + intervals + | numIterations <= 0 = pure base + | otherwise = go protoIntervals ^.. folded . _1 + where + go [] = [] + go ints + | maximumOf (folded . _1) ints == Just base = ints + | otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts + where + closest = maximumBy (comparing $ view _2) ints + (lts, geqs) = partition (((>) `on` view _1) closest) ints + gts = filter (((<) `on` view _1) closest) geqs + -- | Exclusive upper bounds + protoIntervals :: [(Integer, Integer)] + protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations + | i <- [1 .. toInteger numIterations] + ] + + intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals + + toDigest :: Integer -> Maybe FileContentChunkReference + toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step + where step i + | i <= 0 = Nothing + | otherwise = Just (fromIntegral i, i `shiftR` 8) + pad bs + | toInteger (ByteString.length bs) >= chunkHashBytes = bs + | otherwise = pad $ ByteString.cons 0 bs + + intervalsDgsts <- atomically $ do + cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache + case Map.lookup numIterations cachedDgsts of + Just c -> return c + Nothing -> do + modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts' + return intervalsDgsts' let - getCandidates = E.selectSource . E.from $ \fileContent -> do - E.where_ . E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) $ fileContent E.^. FileContentUnreferencedSince - return ( fileContent E.^. FileContentHash - , E.length_ $ fileContent E.^. FileContentContent + permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch) + + (minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts) + chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool) + chunkIdFilter cRef = E.and $ catMaybes + [ minBoundDgst <&> \b -> cRef E.>=. E.val b + , maxBoundDgst <&> \b -> cRef E.<. E.val b + ] + + $logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst) + + E.insertSelectWithConflict + (UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint") + (E.from $ \fileContentChunk -> do + E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId + return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash + E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash + return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now + ) + (\current excluded -> + [ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ] + ) + + E.delete . E.from $ \fileContentChunkUnreferenced -> do + E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash + E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) + + let + getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do + let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do + E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) + return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince + E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince + + E.groupBy $ fileContentEntry E.^. FileContentEntryHash + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ] + + return $ fileContentEntry E.^. FileContentEntryHash + + deleteEntry :: _ -> DB (Sum Natural) + deleteEntry (E.Value fRef) = + bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef] + + Sum deletedEntries <- runConduit $ + getEntryCandidates + .| takeWhileTime (interval / 3) + .| C.mapM deleteEntry + .| C.fold + + when (deletedEntries > 0) $ + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|] + + let + getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do + E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now) + E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + + E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) + + return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + , E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent) ) - Sum deleted <- runConduit $ - getCandidates - .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + deleteChunk :: _ -> DB (Sum Natural, Sum Word64) + deleteChunk (E.Value cRef, E.Value size) = do + deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ] + (, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef] + + (Sum deletedChunks, Sum deletedChunkSize) <- runConduit $ + getChunkCandidates + .| takeWhileTime (interval / 3) .| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64) - .| C.map (view $ _1 . _Value) - .| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef]) + .| C.mapM deleteChunk .| C.fold - when (deleted > 0) $ - $logInfoS "PruneUnreferencedFiles" [st|Deleted #{deleted} long-unreferenced files|] + + when (deletedChunks > 0) $ + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|] dispatchJobInjectFiles :: JobHandler UniWorX dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do uploadBucket <- getsYesod $ view _appUploadCacheBucket interval <- getsYesod $ view _appInjectFiles - now <- liftIO getCurrentTime let - extractReference (Minio.ListItemObject oi) - | Right bs <- Base64.decodeUnpadded . encodeUtf8 $ Minio.oiObject oi - , Just fRef <- Crypto.digestFromByteString bs - = Just (oi, fRef) + extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference extractReference _ = Nothing injectOrDelete :: (Minio.Object, FileContentReference) - -> Handler (Sum Int64, Sum Int64) -- ^ Injected, Already existed - injectOrDelete (obj, fRef) = maybeT (return mempty) $ do - res <- hoist (startEvalMemoT . hoistStateCache (runDB . setSerializable)) $ do - alreadyInjected <- lift . lift $ exists [ FileContentHash ==. fRef ] - if | alreadyInjected -> return (mempty, Sum 1) - | otherwise -> do - content <- flip memo obj $ \obj' -> hoistMaybeM . runAppMinio . runMaybeT $ do - objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj' Minio.defaultGetObjectOptions - lift . runConduit $ Minio.gorObjectStream objRes .| C.fold + -> Handler (Sum Int64) -- ^ Injected + injectOrDelete (obj, fRef) = do + fRef' <- runDB . setSerializable $ do + chunkVar <- newEmptyTMVarIO + dbAsync <- allocateLinkedAsync $ do + atomically $ isEmptyTMVar chunkVar >>= guard . not + sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () - fmap ((, mempty) . Sum) . lift. lift . E.insertSelectCount $ - let isReferenced = E.any E.exists $ fileReferences (E.val fRef) - now' = E.unsafeSqlCastAs "TIMESTAMP WITH TIME ZONE" $ E.val now - in return $ FileContent E.<# E.val fRef E.<&> E.val content E.<&> E.bool (E.just now') E.nothing isReferenced - runAppMinio . maybeT (return ()) . catchIfMaybeT minioIsDoesNotExist $ Minio.removeObject uploadBucket obj - return res + didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do + objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions + lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just) + return True + if + | not didSend -> Nothing <$ cancel dbAsync + | otherwise -> do + atomically $ putTMVar chunkVar Nothing + Just <$> waitAsync dbAsync + let matchesFRef = is _Just $ assertM (== fRef) fRef' + if | matchesFRef -> + maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj + | otherwise -> + $logErrorS "InjectFiles" [st|Minio object “#{obj}”'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|] + return . bool mempty (Sum 1) $ is _Just fRef' - (Sum inj, Sum exc) <- + Sum inj <- runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) .| C.mapMaybe extractReference .| maybe (C.map id) (takeWhileTime . (/ 2)) interval @@ -118,7 +245,49 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do .| transPipe lift (C.mapM injectOrDelete) .| C.fold - when (exc > 0) $ - $logInfoS "InjectFiles" [st|Deleted #{exc} files from upload cache because they were already injected|] when (inj > 0) $ $logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|] + + +data RechunkFileException + = RechunkFileExceptionHashMismatch + { oldHash, newHash :: FileContentReference } + deriving (Eq, Ord, Show, Generic, Typeable) + deriving anyclass (Exception) + +dispatchJobRechunkFiles :: JobHandler UniWorX +dispatchJobRechunkFiles = JobHandlerAtomic . hoist lift $ do + interval <- getsYesod $ view _appRechunkFiles + let + getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do + E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do + E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased + + let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do + E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64)) + + return ( fileContentEntry E.^. FileContentEntryHash + , size + ) + + rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64) + rechunkFile fRef sz = do + fRef' <- sinkFileDB True $ sourceFileDB fRef + unless (fRef == fRef') $ + throwM $ RechunkFileExceptionHashMismatch fRef fRef' + return (Sum 1, Sum sz) + + (Sum rechunkedEntries, Sum rechunkedSize) <- runConduit $ + getEntryCandidates + .| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz) + .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + .| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64) + .| C.mapM (uncurry rechunkFile) + .| C.fold + + when (rechunkedEntries > 0 || rechunkedSize > 0) $ + $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedEntries} files in database (#{tshow rechunkedSize} bytes)|] diff --git a/src/Jobs/Handler/TransactionLog.hs b/src/Jobs/Handler/TransactionLog.hs index fb856ba2c..131ba2491 100644 --- a/src/Jobs/Handler/TransactionLog.hs +++ b/src/Jobs/Handler/TransactionLog.hs @@ -27,5 +27,5 @@ dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime let cutoff = addUTCTime (- retentionTime) now - n <- updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ] + n <- updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ] $logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|] diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index b5483a8c6..bb2faf762 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -82,7 +82,7 @@ writeJobCtlBlock = writeJobCtlBlock' writeJobCtl queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId) queueJobUnsafe queuedJobWriteLastExec job = do - $logInfoS "queueJob" $ tshow job + $logDebugS "queueJob" $ tshow job doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ] diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 065c806cc..59a15c99a 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -86,9 +86,13 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica , jDisplayEmail :: UserEmail } | JobPruneSessionFiles - | JobPruneUnreferencedFiles + | JobPruneUnreferencedFiles { jNumIterations + , jEpoch + , jIteration :: Natural + } | JobInjectFiles | JobPruneFallbackPersonalisedSheetFilesKeys + | JobRechunkFiles deriving (Eq, Ord, Show, Read, Generic, Typeable) data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId } | NotificationSheetActive { nSheet :: SheetId } @@ -223,6 +227,7 @@ newWorkerId = JobWorkerId <$> liftIO newUnique data JobContext = JobContext { jobCrontab :: TVar (Crontab JobCtl) , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) + , jobHeldLocks :: TVar (Set QueuedJobId) } @@ -251,6 +256,8 @@ jobNoQueueSame = \case JobPruneSessionFiles{} -> True JobPruneUnreferencedFiles{} -> True JobInjectFiles{} -> True + JobPruneFallbackPersonalisedSheetFilesKeys{} -> True + JobRechunkFiles{} -> True _ -> False diff --git a/src/Model/Migration.hs b/src/Model/Migration.hs index 711fffde2..0d77f6da3 100644 --- a/src/Model/Migration.hs +++ b/src/Model/Migration.hs @@ -7,6 +7,8 @@ module Model.Migration import Import.NoModel hiding (Max(..), Last(..)) import Model +import Settings +import Foundation.Type import Jobs.Types import Audit.Types import Model.Migration.Version @@ -40,6 +42,8 @@ import qualified Data.CaseInsensitive as CI import qualified Data.Aeson as Aeson import Web.ServerSession.Backend.Persistent.Memcached (migrateMemcachedSqlStorage) + +import Data.Conduit.Algorithms.FastCDC (FastCDCParameters(fastCDCMinBlockSize)) -- Database versions must follow https://pvp.haskell.org: -- - Breaking changes are instances where manual migration is necessary (via customMigrations; i.e. changing a columns format) @@ -80,6 +84,7 @@ migrateAll' = sequence_ migrateAll :: ( MonadLogger m , MonadResource m , MonadUnliftIO m + , MonadReader UniWorX m ) => ReaderT SqlBackend m () migrateAll = do @@ -112,7 +117,7 @@ requiresMigration = mapReaderT (exceptT return return) $ do $logInfoS "Migration" $ intercalate "; " initial throwError True - customs <- mapReaderT lift $ getMissingMigrations @_ @m + customs <- mapReaderT lift $ getMissingMigrations @_ @(ReaderT UniWorX m) unless (Map.null customs) $ do $logInfoS "Migration" . intercalate ", " . map tshow $ Map.keys customs throwError True @@ -134,6 +139,7 @@ getMissingMigrations :: forall m m'. ( MonadLogger m , MonadIO m , MonadResource m' + , MonadReader UniWorX m' ) => ReaderT SqlBackend m (Map (Key AppliedMigration) (ReaderT SqlBackend m' ())) getMissingMigrations = do @@ -180,7 +186,9 @@ migrateManual = do -} customMigrations :: forall m. - MonadResource m + ( MonadResource m + , MonadReader UniWorX m + ) => Map (Key AppliedMigration) (ReaderT SqlBackend m ()) customMigrations = Map.fromListWith (>>) [ ( AppliedMigrationKey [migrationVersion|initial|] [version|0.0.0|] @@ -915,13 +923,32 @@ customMigrations = Map.fromListWith (>>) ) , ( AppliedMigrationKey [migrationVersion|39.0.0|] [version|40.0.0|] - , whenM (tableExists "study_features") $ do + , whenM (tableExists "study_features") [executeQQ| ALTER TABLE study_features RENAME updated TO last_observed; ALTER TABLE study_features ADD COLUMN first_observed timestamp with time zone; UPDATE study_features SET first_observed = (SELECT MAX(last_observed) FROM study_features as other WHERE other."user" = study_features."user" AND other.degree = study_features.degree AND other.field = study_features.field AND other.type = study_features.type AND other.semester = study_features.semester - 1); |] ) + , ( AppliedMigrationKey [migrationVersion|40.0.0|] [version|41.0.0|] + , whenM (tableExists "file_content") $ do + chunkingParams <- lift $ view _appFileChunkingParams + + [executeQQ| + ALTER TABLE file_content RENAME TO file_content_chunk; + ALTER INDEX file_content_pkey RENAME TO file_content_chunk_pkey; + + CREATE TABLE file_content_chunk_unreferenced (id bigserial, hash bytea NOT NULL, since timestamp with time zone NOT NULL); + INSERT INTO file_content_chunk_unreferenced (since, hash) (SELECT unreferenced_since as since, hash FROM file_content_chunk WHERE NOT (unreferenced_since IS NULL)); + ALTER TABLE file_content_chunk DROP COLUMN unreferenced_since; + + ALTER TABLE file_content_chunk ADD COLUMN content_based boolean NOT NULL DEFAULT false; + UPDATE file_content_chunk SET content_based = true WHERE length(content) <= #{fastCDCMinBlockSize chunkingParams}; + + CREATE TABLE file_content_entry (hash bytea NOT NULL, ix bigint NOT NULL, chunk_hash bytea NOT NULL); + INSERT INTO file_content_entry (hash, chunk_hash, ix) (SELECT hash, hash as chunk_hash, 0 as ix FROM file_content_chunk); + |] + ) ] diff --git a/src/Model/Types/Common.hs b/src/Model/Types/Common.hs index ceb97f2a2..08783669c 100644 --- a/src/Model/Types/Common.hs +++ b/src/Model/Types/Common.hs @@ -54,5 +54,3 @@ type InstanceId = UUID type ClusterId = UUID type TokenId = UUID type TermCandidateIncidence = UUID - -type FileContentReference = Digest SHA3_512 diff --git a/src/Model/Types/File.hs b/src/Model/Types/File.hs index 33bdadbb7..5da04921b 100644 --- a/src/Model/Types/File.hs +++ b/src/Model/Types/File.hs @@ -1,23 +1,129 @@ module Model.Types.File - ( File(..), _fileTitle, _fileContent, _fileModified + ( FileContentChunkReference(..), FileContentReference(..) + , File(..), _fileTitle, _fileContent, _fileModified + , PureFile, toPureFile, fromPureFile, pureFileToFileReference, _pureFileContent + , transFile + , minioFileReference , FileReference(..), _fileReferenceTitle, _fileReferenceContent, _fileReferenceModified - , HasFileReference(..), IsFileReference(..), FileReferenceResidual(..) + , HasFileReference(..), IsFileReference(..), FileReferenceResidual(FileReferenceResidual, FileReferenceResidualEither, unFileReferenceResidualEither, FileReferenceResidualEntity, fileReferenceResidualEntityKey, fileReferenceResidualEntityResidual, unPureFileResidual) ) where import Import.NoModel -import Model.Types.Common (FileContentReference) + +import Database.Persist.Sql (PersistFieldSql) +import Web.HttpApiData (ToHttpApiData, FromHttpApiData) +import Data.ByteArray (ByteArrayAccess) + +import qualified Data.ByteString.Base64.URL as Base64 +import qualified Data.ByteArray as ByteArray +import qualified Network.Minio as Minio (Object) +import qualified Crypto.Hash as Crypto (digestFromByteString) +import qualified Crypto.Hash.Conduit as Crypto (sinkHash) import Utils.Lens.TH +import qualified Data.Conduit.Combinators as C -data File = File +import Text.Show + + + +newtype FileContentChunkReference = FileContentChunkReference (Digest SHA3_512) + deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable) + deriving newtype ( PersistField, PersistFieldSql + , PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON + , Hashable, NFData + , ByteArrayAccess + ) + +makeWrapped ''FileContentChunkReference + +newtype FileContentReference = FileContentReference (Digest SHA3_512) + deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable) + deriving newtype ( PersistField, PersistFieldSql + , PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON + , Hashable, NFData + , ByteArrayAccess + ) + +makeWrapped ''FileContentReference + + +minioFileReference :: Prism' Minio.Object FileContentReference +minioFileReference = prism' toObjectName fromObjectName + where toObjectName = decodeUtf8 . Base64.encodeUnpadded . ByteArray.convert + fromObjectName = fmap (review _Wrapped) . Crypto.digestFromByteString <=< preview _Right . Base64.decodeUnpadded . encodeUtf8 + + +data File m = File { fileTitle :: FilePath - , fileContent :: Maybe ByteString + , fileContent :: Maybe (ConduitT () ByteString m ()) , fileModified :: UTCTime - } deriving (Eq, Ord, Read, Show, Generic, Typeable) + } deriving (Generic, Typeable) makeLenses_ ''File +type PureFile = File Identity + +_pureFileContent :: forall bs. + ( IsSequence bs + , Element bs ~ Word8 + ) + => Lens' PureFile (Maybe bs) +_pureFileContent = lens getPureFileContent setPureFileContent + where + getPureFileContent = fmap (repack . runIdentity . runConduit . (.| C.fold)) . fileContent + setPureFileContent f bs = f { fileContent = yield . repack <$> bs } + +toPureFile :: Monad m => File m -> m PureFile +toPureFile File{..} = do + c <- for fileContent $ runConduit . (.| C.fold) + return File + { fileContent = fmap yield c + , .. + } + +fromPureFile :: Monad m => PureFile -> File m +fromPureFile = transFile generalize + +pureFileToFileReference :: PureFile -> FileReference +pureFileToFileReference File{..} = FileReference + { fileReferenceTitle = fileTitle + , fileReferenceContent = review _Wrapped . runIdentity . runConduit . (.| Crypto.sinkHash) <$> fileContent + , fileReferenceModified = fileModified + } + +instance Eq PureFile where + a == b = all (\f -> f a b) + [ (==) `on` fileTitle + , (==) `on` fileModified + , (==) `on` (view _pureFileContent :: PureFile -> Maybe ByteString) + ] +instance Ord PureFile where + compare = mconcat + [ comparing fileTitle + , comparing (view _pureFileContent :: PureFile -> Maybe ByteString) + , comparing fileModified + ] +instance Show PureFile where + showsPrec _ f@File{..} + = showString "File{" + . showString "fileTitle = " + . shows fileTitle + . showString ", " + . showString "fileContent = " + . (case f ^. _pureFileContent of + Nothing -> showString "Nothing" + Just c -> showString "Just $ yield " . showsPrec 11 (c :: ByteString) + ) + . showString ", " + . showString "fileModified = " + . shows fileModified + . showString "}" + +transFile :: Monad m => (forall a. m a -> n a) -> (File m -> File n) +transFile l File{..} = File{ fileContent = transPipe l <$> fileContent, .. } + data FileReference = FileReference { fileReferenceTitle :: FilePath , fileReferenceContent :: Maybe FileContentReference @@ -36,6 +142,24 @@ instance HasFileReference FileReference where data FileReferenceResidual FileReference = FileReferenceResidual _FileReference = iso (, FileReferenceResidual) $ view _1 +instance HasFileReference PureFile where + newtype FileReferenceResidual PureFile = PureFileResidual { unPureFileResidual :: Maybe ByteString } + deriving (Eq, Ord, Read, Show, Generic, Typeable) + + _FileReference = iso toFileReference fromFileReference + where + toFileReference File{..} = (FileReference{..}, PureFileResidual{..}) + where + fileReferenceTitle = fileTitle + (fileReferenceContent, unPureFileResidual) = ((,) <$> preview (_Just . _1) <*> preview (_Just . _2)) $ + over _1 (review _Wrapped) . runIdentity . runConduit . (.| getZipConduit ((,) <$> ZipConduit Crypto.sinkHash <*> ZipConduit C.fold)) <$> fileContent + fileReferenceModified = fileModified + fromFileReference (FileReference{..}, PureFileResidual{..}) = File + { fileTitle = fileReferenceTitle + , fileContent = yield <$> unPureFileResidual + , fileModified = fileReferenceModified + } + instance (HasFileReference a, HasFileReference b) => HasFileReference (Either a b) where newtype FileReferenceResidual (Either a b) = FileReferenceResidualEither { unFileReferenceResidualEither :: Either (FileReferenceResidual a) (FileReferenceResidual b) } _FileReference = iso doSplit doJoin diff --git a/src/Model/Types/Misc.hs b/src/Model/Types/Misc.hs index 916bd2df9..7eee837b9 100644 --- a/src/Model/Types/Misc.hs +++ b/src/Model/Types/Misc.hs @@ -267,6 +267,7 @@ instance Csv.FromField Sex where data TokenBucketIdent = TokenBucketInjectFiles | TokenBucketPruneFiles + | TokenBucketRechunkFiles deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite, Hashable) diff --git a/src/Settings.hs b/src/Settings.hs index 56bead944..acedff5c4 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -70,6 +70,8 @@ import Text.Show (showParen, showString) import qualified Data.List.PointedList as P import qualified Network.Minio as Minio + +import Data.Conduit.Algorithms.FastCDC -- | Runtime settings to configure this application. These settings can be @@ -113,7 +115,7 @@ data AppSettings = AppSettings , appMailSupport :: Address , appJobWorkers :: Natural , appJobFlushInterval :: Maybe NominalDiffTime - , appJobCronInterval :: NominalDiffTime + , appJobCronInterval :: Maybe NominalDiffTime , appJobStaleThreshold :: NominalDiffTime , appNotificationRateLimit :: NominalDiffTime , appNotificationCollateDelay :: NominalDiffTime @@ -140,8 +142,10 @@ data AppSettings = AppSettings , appLdapReTestFailover :: DiffTime , appSessionFilesExpire :: NominalDiffTime - , appPruneUnreferencedFiles :: Maybe NominalDiffTime , appKeepUnreferencedFiles :: NominalDiffTime + + , appPruneUnreferencedFilesWithin :: Maybe NominalDiffTime + , appPruneUnreferencedFilesInterval :: NominalDiffTime , appInitialLogSettings :: LogSettings @@ -172,6 +176,10 @@ data AppSettings = AppSettings , appUploadCacheConf :: Maybe Minio.ConnectInfo , appUploadCacheBucket :: Minio.Bucket , appInjectFiles :: Maybe NominalDiffTime + , appRechunkFiles :: Maybe NominalDiffTime + , appFileUploadDBChunksize :: Int + + , appFileChunkingParams :: FastCDCParameters , appFavouritesQuickActionsBurstsize , appFavouritesQuickActionsAvgInverseRate :: Word64 @@ -444,7 +452,7 @@ instance FromJSON AppSettings where appJobWorkers <- o .: "job-workers" appJobFlushInterval <- o .:? "job-flush-interval" - appJobCronInterval <- o .: "job-cron-interval" + appJobCronInterval <- o .:? "job-cron-interval" appJobStaleThreshold <- o .: "job-stale-threshold" appNotificationRateLimit <- o .: "notification-rate-limit" appNotificationCollateDelay <- o .: "notification-collate-delay" @@ -471,9 +479,17 @@ instance FromJSON AppSettings where appLdapReTestFailover <- o .: "ldap-re-test-failover" appSessionFilesExpire <- o .: "session-files-expire" - appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files" appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0 appInjectFiles <- o .:? "inject-files" + appRechunkFiles <- o .:? "rechunk-files" + appFileUploadDBChunksize <- o .: "file-upload-db-chunksize" + + appFileChunkingTargetExponent <- o .: "file-chunking-target-exponent" + appFileChunkingHashWindow <- o .: "file-chunking-hash-window" + appFileChunkingParams <- maybe (fail "Could not recommend FastCDCParameters") return $ recommendFastCDCParameters appFileChunkingTargetExponent appFileChunkingHashWindow + + appPruneUnreferencedFilesWithin <- o .: "prune-unreferenced-files-within" + appPruneUnreferencedFilesInterval <- o .: "prune-unreferenced-files-interval" appMaximumContentLength <- o .: "maximum-content-length" diff --git a/src/Utils.hs b/src/Utils.hs index 3a517d231..5f25e1ac3 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -56,7 +56,8 @@ import Control.Arrow as Utils ((>>>)) import Control.Monad.Trans.Except (ExceptT(..), throwE, runExceptT) import Control.Monad.Except (MonadError(..)) import Control.Monad.Trans.Maybe as Utils (MaybeT(..)) -import Control.Monad.Trans.Writer.Lazy (WriterT, execWriterT, tell) +import Control.Monad.Trans.Writer.Strict (execWriterT) +import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Catch import Control.Monad.Morph (hoist) import Control.Monad.Fail @@ -83,6 +84,9 @@ import qualified Crypto.Saltine.Class as Saltine import qualified Crypto.Data.PKCS7 as PKCS7 import Crypto.MAC.KMAC (KMAC, HashSHAKE) import qualified Crypto.MAC.KMAC as KMAC +import qualified Crypto.Hash as Crypto +import Crypto.Hash (HashAlgorithm, Digest) +import Crypto.Hash.Instances () import Data.ByteArray (ByteArrayAccess) @@ -843,7 +847,7 @@ diffTimeout timeoutLength timeoutRes act = fromMaybe timeoutRes <$> timeout time = let (MkFixed micro :: Micro) = realToFrac timeoutLength in fromInteger micro -tellM :: (Monad m, Monoid x) => m x -> WriterT x m () +tellM :: (MonadTrans t, MonadWriter x (t m), Monad m) => m x -> t m () tellM = tell <=< lift ------------- @@ -856,6 +860,19 @@ peekN n = do mapM_ leftover peeked return peeked +peekWhile :: forall a o m. Monad m => (a -> Bool) -> ConduitT a o m [a] +peekWhile p = do + let go acc = do + next <- await + case next of + Nothing -> return (reverse acc, Nothing) + Just x + | p x -> go $ x : acc + | otherwise -> return (reverse acc, Just x) + (peeked, failed) <- go [] + mapM_ leftover $ peeked ++ hoistMaybe failed + return peeked + anyMC, allMC :: forall a o m. Monad m => (a -> m Bool) -> ConduitT a o m Bool anyMC f = C.mapM f .| orC allMC f = C.mapM f .| andC @@ -1057,6 +1074,12 @@ kmaclazy :: forall a string key ba chunk. -> KMAC a kmaclazy str k = KMAC.finalize . KMAC.updates (KMAC.initialize @a str k) . toChunks +emptyHash :: forall a. HashAlgorithm a => Q (TExp (Digest a)) +-- ^ Hash of `mempty` +-- +-- Computationally preferrable to computing the hash at runtime +emptyHash = TH.liftTyped $ Crypto.hashFinalize Crypto.hashInit + ------------- -- Caching -- ------------- diff --git a/src/Utils/Files.hs b/src/Utils/Files.hs index 0138a93c4..4749f46c5 100644 --- a/src/Utils/Files.hs +++ b/src/Utils/Files.hs @@ -3,6 +3,7 @@ module Utils.Files , sinkFile', sinkFiles' , FileUploads , replaceFileReferences, replaceFileReferences' + , sinkFileDB ) where import Import.NoFoundation @@ -11,31 +12,54 @@ import Handler.Utils.Minio import qualified Network.Minio as Minio import qualified Crypto.Hash as Crypto (hash) +import qualified Crypto.Hash.Conduit as Crypto (sinkHash) import qualified Data.Conduit.Combinators as C - -import qualified Data.ByteString.Base64.URL as Base64 -import qualified Data.ByteArray as ByteArray +import qualified Data.Conduit.List as C (unfoldM) import qualified Data.Map.Lazy as Map import qualified Data.Set as Set import Control.Monad.State.Class (modify) +import qualified Data.Sequence as Seq + import Database.Persist.Sql (deleteWhereCount) import Control.Monad.Trans.Resource (allocate) +import qualified Data.UUID.V4 as UUID -sinkFiles :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT File FileReference (SqlPersistT m) () -sinkFiles = C.mapM sinkFile +import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.Utils as E + +import Data.Conduit.Algorithms.FastCDC (fastCDC) + + +sinkFileDB :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) + => Bool -- ^ Replace? Use only in serializable transaction + -> ConduitT () ByteString (SqlPersistT m) () + -> SqlPersistT m FileContentReference +sinkFileDB doReplace fileContentContent = do + chunkingParams <- getsYesod $ view _appFileChunkingParams + + let sinkChunk fileContentChunkContent = do + fileChunkLockTime <- liftIO getCurrentTime + fileChunkLockInstance <- getsYesod appInstanceID + + tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. } + existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash] + let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased] + if | existsChunk -> lift setContentBased + | otherwise -> lift . handleIfSql isUniqueConstraintViolation (const setContentBased) $ + insert_ FileContentChunk{..} + return $ FileContentChunkKey fileContentChunkHash + where fileContentChunkHash = _Wrapped # Crypto.hash fileContentChunkContent + ((review _Wrapped -> fileContentHash, fileContentChunks), chunkLocks) <- runConduit . runWriterC $ + transPipe lift fileContentContent + .| fastCDC chunkingParams + .| C.mapM (\c -> (c, ) <$> sinkChunk c) + .| transPipe lift (getZipConduit $ (,) <$> ZipConduit (C.map (view _1) .| Crypto.sinkHash) <*> ZipConduit (C.foldMap $ views _2 Seq.singleton)) -sinkFile :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File -> SqlPersistT m FileReference -sinkFile File{ fileContent = Nothing, .. } = return FileReference - { fileReferenceContent = Nothing - , fileReferenceTitle = fileTitle - , fileReferenceModified = fileModified - } -sinkFile File{ fileContent = Just fileContentContent, .. } = do void . withUnliftIO $ \UnliftIO{..} -> let takeLock = do fileLockTime <- liftIO getCurrentTime @@ -44,35 +68,86 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do releaseLock lId = liftHandler . runDB $ (withReaderT projectBackend $ setSerializable (delete lId :: SqlPersistT (HandlerFor UniWorX) ()) :: YesodDB UniWorX ()) in unliftIO $ allocate (unliftIO takeLock) (unliftIO . releaseLock) - inDB <- exists [ FileContentHash ==. fileContentHash ] + deleteWhere [ FileChunkLockId <-. Set.toList chunkLocks ] - let sinkFileDB = unless inDB $ repsert (FileContentKey fileContentHash) FileContent{ fileContentUnreferencedSince = Nothing, .. } - maybeT sinkFileDB $ do - let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash - uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket - unless inDB . runAppMinio $ do - uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket uploadName Minio.defaultGetObjectOptions - unless uploadExists $ do - let - pooOptions = Minio.defaultPutObjectOptions - { Minio.pooCacheControl = Just "immutable" - } - Minio.putObject uploadBucket uploadName (C.sourceLazy $ fromStrict fileContentContent) (Just . fromIntegral $ olength fileContentContent) pooOptions - -- Note that MinIO does not accept length zero uploads without an explicit length specification (not `Nothing` in the line above for the api we use) + let entryExists = E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileContentHash + insertEntries = handleIfSql isUniqueConstraintViolation (const $ return ()) . void $ insertMany_ + [ FileContentEntry{ fileContentEntryHash = fileContentHash, .. } + | fileContentEntryChunkHash <- otoList fileContentChunks + | fileContentEntryIx <- [0..] + ] + if | not doReplace -> unlessM entryExists insertEntries + | otherwise -> do + deleteWhere [ FileContentEntryHash ==. fileContentHash ] + insertEntries + + + return fileContentHash + where fileContentChunkContentBased = True + + +sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) () +sinkFiles = C.mapM sinkFile + +sinkFile :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => File (SqlPersistT m) -> SqlPersistT m FileReference +sinkFile File{ fileContent = Nothing, .. } = return FileReference + { fileReferenceContent = Nothing + , fileReferenceTitle = fileTitle + , fileReferenceModified = fileModified + } +sinkFile File{ fileContent = Just fileContentContent, .. } = do + (unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE + + fileContentHash <- if + | not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ do + uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket + chunk <- liftIO newEmptyMVar + let putChunks = do + nextChunk <- await + case nextChunk of + Nothing + -> putMVar chunk Nothing + Just nextChunk' + -> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks + sinkAsync <- lift . allocateLinkedAsync . runConduit + $ fileContentContent' + .| putChunks + .| Crypto.sinkHash + + runAppMinio $ do + tmpUUID <- liftIO UUID.nextRandom + let uploadName = ".tmp." <> toPathPiece tmpUUID + pooOptions = Minio.defaultPutObjectOptions + { Minio.pooCacheControl = Just "immutable" + } + Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions + fileContentHash <- review _Wrapped <$> waitAsync sinkAsync + let dstName = minioFileReference # fileContentHash + copySrc = Minio.defaultSourceInfo + { Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName + } + copyDst = Minio.defaultDestinationInfo + { Minio.dstBucket = uploadBucket + , Minio.dstObject = dstName + } + uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions + unless uploadExists $ + Minio.copyObject copyDst copySrc + Minio.removeObject uploadBucket uploadName + return fileContentHash + | otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash)) return FileReference { fileReferenceContent = Just fileContentHash , fileReferenceTitle = fileTitle , fileReferenceModified = fileModified } - where - fileContentHash = Crypto.hash fileContentContent -sinkFiles' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File, FileReferenceResidual record) record (SqlPersistT m) () +sinkFiles' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => ConduitT (File (SqlPersistT m), FileReferenceResidual record) record (SqlPersistT m) () sinkFiles' = C.mapM $ uncurry sinkFile' -sinkFile' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File -> FileReferenceResidual record -> SqlPersistT m record +sinkFile' :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX, HasFileReference record) => File (SqlPersistT m) -> FileReferenceResidual record -> SqlPersistT m record sinkFile' file residual = do reference <- sinkFile file return $ _FileReference # (reference, residual) diff --git a/src/Utils/Sql.hs b/src/Utils/Sql.hs index 2a03a76da..ad51820d0 100644 --- a/src/Utils/Sql.hs +++ b/src/Utils/Sql.hs @@ -1,22 +1,32 @@ module Utils.Sql ( setSerializable, setSerializable' + , catchSql, handleSql + , isUniqueConstraintViolation + , catchIfSql, handleIfSql ) where -import ClassyPrelude.Yesod +import ClassyPrelude.Yesod hiding (handle) import Numeric.Natural import Settings.Log -import Database.PostgreSQL.Simple (SqlError) +import Database.PostgreSQL.Simple (SqlError(..)) import Database.PostgreSQL.Simple.Errors (isSerializationError) -import Control.Monad.Catch (MonadMask) +import Control.Monad.Catch import Database.Persist.Sql import Database.Persist.Sql.Raw.QQ +import qualified Data.ByteString as ByteString + import Control.Retry import Control.Lens ((&)) +import qualified Data.UUID as UUID +import Control.Monad.Random.Class (MonadRandom(getRandom)) + +import Text.Shakespeare.Text (st) + setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 @@ -54,5 +64,29 @@ setSerializable' policy act = do transactionSaveWithIsolation ReadCommitted return res +catchSql :: forall m a. (MonadCatch m, MonadIO m) => SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a +catchSql = flip handleSql +handleSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a +handleSql recover act = do + savepointName <- liftIO $ UUID.toString <$> getRandom + let recover' :: SqlError -> SqlPersistT m a + recover' exc = do + rawExecute [st|ROLLBACK TO SAVEPOINT "#{savepointName}"|] [] + recover exc + + handle recover' $ do + rawExecute [st|SAVEPOINT "#{savepointName}"|] [] + res <- act + rawExecute [st|RELEASE SAVEPOINT "#{savepointName}"|] [] + return res + +catchIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> SqlPersistT m a -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a +catchIfSql p = flip $ handleIfSql p + +handleIfSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> Bool) -> (SqlError -> SqlPersistT m a) -> SqlPersistT m a -> SqlPersistT m a +handleIfSql p recover = handleSql (\err -> bool throwM recover (p err) err) + +isUniqueConstraintViolation :: SqlError -> Bool +isUniqueConstraintViolation SqlError{..} = "duplicate key value violates unique constraint" `ByteString.isPrefixOf` sqlErrorMsg diff --git a/stack.yaml b/stack.yaml index bedc224c5..fbbcd4aaa 100644 --- a/stack.yaml +++ b/stack.yaml @@ -47,6 +47,15 @@ extra-deps: - filepath-crypto - uuid-crypto + - git: https://github.com/gkleen/FastCDC.git + commit: 7326e2931454282df9081105dad812845db5c530 + subdirs: + - gearhash + - fastcdc + + - git: git@gitlab2.rz.ifi.lmu.de:uni2work/zip-stream.git + commit: 843683d024f767de236f74d24a3348f69181a720 + - generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524 # manual downgrade; won't compile with >=2.0.0.0 - acid-state-0.16.0.1@sha256:d43f6ee0b23338758156c500290c4405d769abefeb98e9bc112780dae09ece6f,6207 @@ -65,6 +74,7 @@ extra-deps: - tz-0.1.3.4@sha256:bd311e202b8bdd15bcd6a4ca182e69794949d3b3b9f4aa835e9ccff011284979,5086 - unidecode-0.1.0.4@sha256:99581ee1ea334a4596a09ae3642e007808457c66893b587e965b31f15cbf8c4d,1144 - wai-middleware-prometheus-1.0.0@sha256:1625792914fb2139f005685be8ce519111451cfb854816e430fbf54af46238b4,1314 + - primitive-0.7.1.0@sha256:6a237bb338bcc43193077ff8e8c0f0ce2de14c652231496a15672e8b563a07e2,2604 resolver: nightly-2020-08-08 compiler: ghc-8.10.2 diff --git a/stack.yaml.lock b/stack.yaml.lock index 9dd64cf44..053ebc2d9 100644 --- a/stack.yaml.lock +++ b/stack.yaml.lock @@ -5,9 +5,6 @@ packages: - completed: - cabal-file: - size: 4229 - sha256: 0dcfe3c4a67be4e96e1ae2e3c4b8744bc11e094853005a32f6074ab776caa3a9 name: encoding version: 0.8.2 git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git @@ -19,9 +16,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/encoding.git commit: 22fc3bb14841d8d50997aa47f1be3852e666f787 - completed: - cabal-file: - size: 2399 - sha256: 20cdf97602abb8fd7356c1a64c69fa857e34ab4cfe7834460d2ad783f7e4e4e3 name: memcached-binary version: 0.2.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git @@ -33,9 +27,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/memcached-binary.git commit: b7071df50bad3a251a544b984e4bf98fa09b8fae - completed: - cabal-file: - size: 1423 - sha256: 49818ee0de2d55cbfbc15ca4de1761c3adac6ba3dfcdda960b413cad4f4fa47f name: conduit-resumablesink version: '0.3' git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git @@ -47,9 +38,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/conduit-resumablesink.git commit: cbea6159c2975d42f948525e03e12fc390da53c5 - completed: - cabal-file: - size: 2069 - sha256: 9192ac19ea5da3cd4b8c86a4266592aff7b9256311aa5f42ae6de94ccacf1366 name: HaskellNet version: 0.5.1 git: git://github.com/jtdaugherty/HaskellNet.git @@ -61,9 +49,6 @@ packages: git: git://github.com/jtdaugherty/HaskellNet.git commit: 5aa1f3b009253b02c4822005ac59ee208a10a347 - completed: - cabal-file: - size: 1934 - sha256: 9fbe7c3681e963eea213ab38be17966bb690788c1c55a67257916b677d7d2ec2 name: HaskellNet-SSL version: 0.3.4.1 git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git @@ -75,9 +60,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/HaskellNet-SSL.git commit: 40393c938111ac78232dc2c7eec5edb4a22d03e8 - completed: - cabal-file: - size: 2208 - sha256: 48f6e03d8f812bd24e2601497ffe9c8a78907fa2266ba05abeefdfe99221617d name: ldap-client version: 0.4.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/ldap-client.git @@ -90,9 +72,6 @@ packages: commit: 01afaf599ba6f8a9d804c269e91d3190b249d3f0 - completed: subdir: serversession - cabal-file: - size: 2081 - sha256: a958ff0007e5084e3e4c2a33acc9860c31186105f02f8ab99ecb847a7a8f9497 name: serversession version: 1.0.1 git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git @@ -106,9 +85,6 @@ packages: commit: 1c95b0100471279413485411032d639881012a5e - completed: subdir: serversession-backend-acid-state - cabal-file: - size: 1875 - sha256: 6cc9d29e788334670bc102213a8aae73bc1b8b0a00c416f06d232376750443b7 name: serversession-backend-acid-state version: 1.0.3 git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git @@ -121,9 +97,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/serversession.git commit: 1c95b0100471279413485411032d639881012a5e - completed: - cabal-file: - size: 1966 - sha256: 92d6d7be95a1ee6cb9783deb35e0d4e4959c7de20d518a45370084b57e20ba51 name: xss-sanitize version: 0.3.6 git: git@gitlab2.rz.ifi.lmu.de:uni2work/xss-sanitize.git @@ -136,9 +109,6 @@ packages: commit: 074ed7c8810aca81f60f2c535f9e7bad67e9d95a - completed: subdir: colonnade - cabal-file: - size: 2020 - sha256: 28f603d097aee65ddf8fe032e7e0f87523a58c516253cba196922027c8fd54d5 name: colonnade version: 1.2.0.2 git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git @@ -151,9 +121,6 @@ packages: git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git commit: f8170266ab25b533576e96715bedffc5aa4f19fa - completed: - cabal-file: - size: 9845 - sha256: 674630347209bc5f7984e8e9d93293510489921f2d2d6092ad1c9b8c61b6560a name: minio-hs version: 1.5.2 git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git @@ -166,9 +133,6 @@ packages: commit: 42103ab247057c04c8ce7a83d9d4c160713a3df1 - completed: subdir: cryptoids-class - cabal-file: - size: 1155 - sha256: 1fa96858ded816798f8e1c77d7945185c0d7ceb2536185d39fc72496da8a0125 name: cryptoids-class version: 0.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -182,9 +146,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: cryptoids-types - cabal-file: - size: 1214 - sha256: ee8966212554a156f2de236d4f005ff3a9d3098778ff6cc3f114ccaa0aff8825 name: cryptoids-types version: 1.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -198,9 +159,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: cryptoids - cabal-file: - size: 1505 - sha256: fcf07cd0dca21db976c25cbdf4dcc5c747cebcb7bf14c05804c8ae14223f6046 name: cryptoids version: 0.5.1.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -214,9 +172,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: filepath-crypto - cabal-file: - size: 1716 - sha256: 218da063bb7b00e3728deebf830904174b2b78bc29b3f203e6824b8caac92788 name: filepath-crypto version: 0.1.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -230,9 +185,6 @@ packages: commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f - completed: subdir: uuid-crypto - cabal-file: - size: 1460 - sha256: 1db54db1b85303e50cec3c99ddb8de6c9bedc388fa9ce5a1fce61520023b9ee5 name: uuid-crypto version: 1.4.0.0 git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git @@ -244,6 +196,43 @@ packages: subdir: uuid-crypto git: git@gitlab2.rz.ifi.lmu.de:uni2work/cryptoids.git commit: 4d91394475b144ea5bf7ba111f93756cc0de8a3f +- completed: + subdir: gearhash + name: gearhash + version: 0.0.0 + git: https://github.com/gkleen/FastCDC.git + pantry-tree: + size: 504 + sha256: 61a08cdd003dc8a418f410dacbcc3a91adea4c23864f8ddbaba3b762e4c2924d + commit: 7326e2931454282df9081105dad812845db5c530 + original: + subdir: gearhash + git: https://github.com/gkleen/FastCDC.git + commit: 7326e2931454282df9081105dad812845db5c530 +- completed: + subdir: fastcdc + name: fastcdc + version: 0.0.0 + git: https://github.com/gkleen/FastCDC.git + pantry-tree: + size: 244 + sha256: 80ae3d79344f7c6a73a8d7adf07ff2b7f0736fd34760b4b8a15e26f32d9773f6 + commit: 7326e2931454282df9081105dad812845db5c530 + original: + subdir: fastcdc + git: https://github.com/gkleen/FastCDC.git + commit: 7326e2931454282df9081105dad812845db5c530 +- completed: + name: zip-stream + version: 0.2.0.1 + git: git@gitlab2.rz.ifi.lmu.de:uni2work/zip-stream.git + pantry-tree: + size: 812 + sha256: 0da8bc38d73034962d2e2d1a7586b6dee848a629319fce9cbbf578348c61118c + commit: 843683d024f767de236f74d24a3348f69181a720 + original: + git: git@gitlab2.rz.ifi.lmu.de:uni2work/zip-stream.git + commit: 843683d024f767de236f74d24a3348f69181a720 - completed: hackage: generic-lens-1.2.0.0@sha256:b19e7970c93743a46bc3702331512a96d163de4356472f2d51a2945887aefe8c,6524 pantry-tree: @@ -363,6 +352,13 @@ packages: sha256: 6d64803c639ed4c7204ea6fab0536b97d3ee16cdecb9b4a883cd8e56d3c61402 original: hackage: wai-middleware-prometheus-1.0.0@sha256:1625792914fb2139f005685be8ce519111451cfb854816e430fbf54af46238b4,1314 +- completed: + hackage: primitive-0.7.1.0@sha256:6a237bb338bcc43193077ff8e8c0f0ce2de14c652231496a15672e8b563a07e2,2604 + pantry-tree: + size: 1376 + sha256: 924e88629b493abb6b2f3c3029cef076554a2b627091e3bb6887ec03487a707d + original: + hackage: primitive-0.7.1.0@sha256:6a237bb338bcc43193077ff8e8c0f0ce2de14c652231496a15672e8b563a07e2,2604 snapshots: - completed: size: 524392 diff --git a/test/Database/Fill.hs b/test/Database/Fill.hs index 4c1d4a3ee..0292308da 100644 --- a/test/Database/Fill.hs +++ b/test/Database/Fill.hs @@ -5,7 +5,6 @@ module Database.Fill import "uniworx" Import hiding (Option(..), currentYear) import Handler.Utils.Form (SheetGrading'(..), SheetType'(..), SheetGroup'(..)) -import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import qualified Data.Text as Text -- import Data.Text.IO (hPutStrLn) @@ -30,6 +29,8 @@ import qualified Data.Csv as Csv import Crypto.Random (getRandomBytes) import Data.List (genericLength) +import qualified Data.Conduit.Combinators as C + testdataDir :: FilePath testdataDir = "testdata" @@ -37,7 +38,7 @@ testdataDir = "testdata" insertFile :: ( HasFileReference fRef, PersistRecordBackend fRef SqlBackend ) => FileReferenceResidual fRef -> FilePath -> DB (Key fRef) insertFile residual fileTitle = do - fileContent <- liftIO . fmap Just . BS.readFile $ testdataDir fileTitle + let fileContent = Just . C.sourceFile $ testdataDir fileTitle fileModified <- liftIO getCurrentTime sinkFile' File{..} residual >>= insert diff --git a/test/Handler/Sheet/PersonalisedFilesSpec.hs b/test/Handler/Sheet/PersonalisedFilesSpec.hs index 8dacb3eca..bd41ca2e0 100644 --- a/test/Handler/Sheet/PersonalisedFilesSpec.hs +++ b/test/Handler/Sheet/PersonalisedFilesSpec.hs @@ -16,8 +16,6 @@ import qualified Data.Conduit.Combinators as C import Control.Lens.Extras import Control.Monad.Trans.Maybe -import qualified Crypto.Hash as Crypto (hash) - import System.FilePath (dropDrive) import Data.Time.Clock (diffUTCTime) @@ -25,6 +23,8 @@ import Data.Char (chr) import Database.Persist.Sql (transactionUndo) +import Data.Bitraversable + instance Arbitrary (FileReferenceResidual PersonalisedSheetFile) where arbitrary = PersonalisedSheetFileResidual @@ -59,7 +59,7 @@ spec = withApp . describe "Personalised sheet file zip encoding" $ do lift (insertUnique user) >>= maybe userLoop return in userLoop let res = res' { personalisedSheetFileResidualSheet = shid, personalisedSheetFileResidualUser = uid } - fRef <- lift (sinkFile f :: DB FileReference) + fRef <- lift (sinkFile (transFile generalize f) :: DB FileReference) now <- liftIO getCurrentTime void . lift . insert $ CourseParticipant cid (personalisedSheetFileResidualUser res) now Nothing CourseParticipantActive void . lift . insert $ _FileReference # (fRef, res) @@ -68,30 +68,38 @@ spec = withApp . describe "Personalised sheet file zip encoding" $ do anonMode <- liftIO $ generate arbitrary let - fpL :: Lens' (Either PersonalisedSheetFile File) FilePath + fpL :: forall m. Lens' (Either PersonalisedSheetFile (File m)) FilePath fpL = lens (either personalisedSheetFileTitle fileTitle) $ \f' path -> case f' of Left pf -> Left pf { personalisedSheetFileTitle = path } Right f -> Right f { fileTitle = path } isDirectory = either (is _Nothing . personalisedSheetFileContent) (is _Nothing . fileContent) + loadFile :: Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile DBFile)) (Either PersonalisedSheetFile DBFile, FileReferenceResidual PersonalisedSheetFile) + -> DB (Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile PureFile)) (Either PersonalisedSheetFile PureFile, FileReferenceResidual PersonalisedSheetFile)) + loadFile = bitraverse loadUnresolved loadResolved + where + loadUnresolved = traverse $ traverse toPureFile + loadResolved (f, fRes) = (, fRes) <$> traverse toPureFile f + recoveredFiles <- runConduit $ sourcePersonalisedSheetFiles cid (Just shid) Nothing anonMode .| resolvePersonalisedSheetFiles fpL isDirectory cid shid + .| C.mapM loadFile .| C.foldMap pure let - checkFile :: Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile File)) (Either PersonalisedSheetFile File, FileReferenceResidual PersonalisedSheetFile) - -> (File, FileReferenceResidual PersonalisedSheetFile) + checkFile :: Either (PersonalisedSheetFileUnresolved (Either PersonalisedSheetFile PureFile)) (Either PersonalisedSheetFile PureFile, FileReferenceResidual PersonalisedSheetFile) + -> (PureFile, FileReferenceResidual PersonalisedSheetFile) -> Bool checkFile (Left _) _ = False checkFile (Right (recFile, recResidual)) (file, residual) = recResidual == residual && case recFile of - Right f -> file == f - Left pf -> dropDrive (fileTitle file) == dropDrive (personalisedSheetFileTitle pf) - && abs (fileModified file `diffUTCTime` personalisedSheetFileModified pf) < 1e-6 -- Precision is a PostgreSQL limitation - && fmap Crypto.hash (fileContent file) == personalisedSheetFileContent pf + Right f -> f == file + Left pf -> dropDrive (fileTitle file) == dropDrive (personalisedSheetFileTitle pf) + && abs (fileModified file `diffUTCTime` personalisedSheetFileModified pf) < 1e-6 -- Precision is a PostgreSQL limitation + && fileReferenceContent (pureFileToFileReference file) == personalisedSheetFileContent pf errors = go [] sheetFiles recoveredFiles where go acc xs [] = reverse acc ++ map Left xs diff --git a/test/Handler/Utils/RatingSpec.hs b/test/Handler/Utils/RatingSpec.hs index 23b674f57..64b3c6f8f 100644 --- a/test/Handler/Utils/RatingSpec.hs +++ b/test/Handler/Utils/RatingSpec.hs @@ -15,6 +15,8 @@ import Text.Shakespeare.I18N (renderMessage) import Utils.Lens (_ratingValues, _ratingPoints) +import qualified Data.Conduit.Combinators as C + spec :: Spec spec = describe "Rating file parsing/pretty-printing" $ do @@ -33,7 +35,7 @@ spec = describe "Rating file parsing/pretty-printing" $ do mr' = MsgRenderer $ renderMessage (error "foundation inspected" :: site) [] parseRating' :: LBS.ByteString -> Maybe Rating' - parseRating' = either (\(_ :: SomeException) -> Nothing) (Just . fst) . parseRating . flip (File "bewertung.txt") time . Just . LBS.toStrict + parseRating' = either (\(_ :: SomeException) -> Nothing) (Just . fst) . parseRating . flip (File "bewertung.txt") time . Just . C.sourceLazy time = UTCTime systemEpochDay 0 mRating rating = rating { ratingValues = mRating' rating $ ratingValues rating } diff --git a/test/Handler/Utils/ZipSpec.hs b/test/Handler/Utils/ZipSpec.hs index 667edad18..7c33dea28 100644 --- a/test/Handler/Utils/ZipSpec.hs +++ b/test/Handler/Utils/ZipSpec.hs @@ -2,10 +2,11 @@ module Handler.Utils.ZipSpec where import TestImport +import Utils (()) import Handler.Utils.Zip import Data.Conduit -import qualified Data.Conduit.List as Conduit +import qualified Data.Conduit.Combinators as C import Data.List (dropWhileEnd) @@ -14,16 +15,59 @@ import ModelSpec () import System.FilePath import Data.Time +import Data.Universe + + +data ZipConsumptionStrategy + = ZipConsumeInterleaved + | ZipConsumeBuffered + deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable) + deriving anyclass (Universe, Finite) + +data ZipProductionStrategy + = ZipProduceInterleaved + | ZipProduceBuffered + deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable) + deriving anyclass (Universe, Finite) + + spec :: Spec spec = describe "Zip file handling" $ do - it "has compatible encoding/decoding to/from zip files" . property $ do - zipFiles <- listOf $ scale (`div` 2) arbitrary - return . property $ do - zipFiles' <- runConduit $ Conduit.sourceList zipFiles .| produceZip def .| void consumeZip .| Conduit.consume - forM_ (zipFiles `zip` zipFiles') $ \(file, file') -> do - let acceptableFilenameChanges - = makeValid . dropWhile isPathSeparator . bool (dropWhileEnd isPathSeparator) addTrailingPathSeparator (isNothing $ fileContent file) . normalise . makeValid + describe "has compatible encoding to, and decoding from zip files" . forM_ universeF $ \strategy -> + modifyMaxSuccess (bool id (37 *) $ strategy == (ZipProduceInterleaved, ZipConsumeInterleaved)) . it (show strategy) . property $ do + zipFiles <- listOf arbitrary :: Gen [PureFile] + return . property $ do + + let zipProduceBuffered + = evaluate . force <=< runConduitRes $ zipProduceInterleaved .| C.sinkLazy + zipProduceInterleaved + = C.yieldMany zipFiles .| C.map fromPureFile .| produceZip def + zipConsumeBuffered zipProd + = mapM toPureFile <=< runConduitRes $ void (consumeZip zipProd) .| C.foldMap pure + zipConsumeInterleaved zipProd + = void (consumeZip zipProd) .| C.mapM toPureFile .| C.foldMap pure + zipFiles' <- case strategy of + (ZipProduceBuffered, ZipConsumeInterleaved) -> + runConduitRes . zipConsumeInterleaved . C.sourceLazy =<< zipProduceBuffered + (ZipProduceBuffered, ZipConsumeBuffered) -> + zipConsumeBuffered . C.sourceLazy =<< zipProduceBuffered + (ZipProduceInterleaved, ZipConsumeInterleaved) -> + runConduitRes $ zipConsumeInterleaved zipProduceInterleaved + (ZipProduceInterleaved, ZipConsumeBuffered) -> + zipConsumeBuffered zipProduceInterleaved + + let acceptableFilenameChanges file + = "." fileTitle file + & normalise + & makeValid + & dropWhile isPathSeparator + & dropWhileEnd isPathSeparator + & bool id addTrailingPathSeparator (isNothing $ fileContent file) + & normalise + & makeValid acceptableTimeDifference t1 t2 = abs (diffUTCTime t1 t2) <= 2 - (shouldBe `on` acceptableFilenameChanges) (fileTitle file') (fileTitle file) - (fileModified file', fileModified file) `shouldSatisfy` uncurry acceptableTimeDifference - (fileContent file') `shouldBe` (fileContent file) + + forM_ (zipFiles `zip` zipFiles') $ \(file, file') -> do + (shouldBe `on` acceptableFilenameChanges) file' file + (fileModified file', fileModified file) `shouldSatisfy` uncurry acceptableTimeDifference + (view _pureFileContent file' :: Maybe ByteString) `shouldBe` (view _pureFileContent file) diff --git a/test/ModelSpec.hs b/test/ModelSpec.hs index 588c4a532..1e7dd6a8a 100644 --- a/test/ModelSpec.hs +++ b/test/ModelSpec.hs @@ -34,6 +34,10 @@ import Control.Monad.Catch.Pure (Catch, runCatch) import System.IO.Unsafe (unsafePerformIO) +import qualified Data.Conduit.Combinators as C + +import Data.Ratio ((%)) + instance Arbitrary EmailAddress where arbitrary = do @@ -137,12 +141,18 @@ instance Arbitrary User where return User{..} shrink = genericShrink -instance Arbitrary File where +scaleRatio :: Rational -> Int -> Int +scaleRatio r = ceiling . (* r) . fromIntegral + +instance Monad m => Arbitrary (File m) where arbitrary = do - fileTitle <- scale (`div` 2) $ (joinPath <$> arbitrary) `suchThat` (any $ not . isPathSeparator) + fileTitle <- scale (scaleRatio $ 1 % 8) $ (joinPath <$> arbitrary) `suchThat` (any $ not . isPathSeparator) date <- addDays <$> arbitrary <*> pure (fromGregorian 2043 7 2) fileModified <- (addUTCTime <$> arbitrary <*> pure (UTCTime date 0)) `suchThat` inZipRange - fileContent <- arbitrary + fileContent <- oneof + [ pure Nothing + , Just . C.sourceLazy <$> scale (scaleRatio $ 7 % 8) arbitrary + ] return File{..} where inZipRange :: UTCTime -> Bool @@ -152,7 +162,6 @@ instance Arbitrary File where = True | otherwise = False - shrink = genericShrink instance Arbitrary School where arbitrary = do @@ -177,8 +186,8 @@ spec = do parallel $ do lawsCheckHspec (Proxy @User) [ eqLaws, jsonLaws ] - lawsCheckHspec (Proxy @File) - [ eqLaws ] + lawsCheckHspec (Proxy @PureFile) + [ eqLaws, ordLaws ] lawsCheckHspec (Proxy @School) [ eqLaws ] lawsCheckHspec (Proxy @Term) diff --git a/test/TestImport.hs b/test/TestImport.hs index d4cc61787..510b1e9c0 100644 --- a/test/TestImport.hs +++ b/test/TestImport.hs @@ -22,6 +22,7 @@ import Yesod.Auth as X import Yesod.Test as X import Yesod.Core.Unsafe (fakeHandlerGetLogger) import Test.QuickCheck as X +import Test.Hspec.QuickCheck as X hiding (prop) import Test.QuickCheck.Gen as X import Data.Default as X import Test.QuickCheck.Instances as X () @@ -65,6 +66,8 @@ import Handler.Utils (runAppLoggingT) import Web.PathPieces (toPathPiece) import Utils.Parameters (GlobalPostParam(PostLoginDummy)) +import Control.Monad.Morph as X (generalize) + runDB :: SqlPersistM a -> YesodExample UniWorX a runDB query = do