From 8d1f216b5b6ee2a59c3fb80f5dd4a701d9dad5ef Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 17 Feb 2021 14:31:30 +0100 Subject: [PATCH] feat(caching): introduce cache prewarming --- config/settings.yml | 7 ++ package.yaml | 1 + src/Application.hs | 9 ++ src/Database/Esqueleto/Utils.hs | 9 ++ src/Foundation/Type.hs | 48 +++---- src/Handler/Utils/Files.hs | 78 +++++++++--- src/Jobs.hs | 74 ++++++++++- src/Jobs/Crontab.hs | 192 ++++++++++++++++++---------- src/Jobs/Handler/Files.hs | 17 +++ src/Jobs/Types.hs | 37 +++++- src/Model/Types/Sheet.hs | 6 +- src/Settings.hs | 28 ++++- src/Utils.hs | 1 + src/Utils/ARC.hs | 13 ++ src/Utils/LRU.hs | 213 ++++++++++++++++++++++++++++++++ src/Utils/Metrics.hs | 68 +++++++++- 16 files changed, 682 insertions(+), 119 deletions(-) create mode 100644 src/Utils/LRU.hs diff --git a/config/settings.yml b/config/settings.yml index 54116ed5f..7c6181cc9 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -274,3 +274,10 @@ memcache-auth: true file-source-arc: maximum-ghost: 512 maximum-weight: 1073741824 # 1GiB +file-source-prewarm: + maximum-weight: 1073741824 # 1GiB + start: 1800 # 30m + end: 600 # 10m + inhibit: 3600 # 60m + steps: 20 + max-speedup: 3 diff --git a/package.yaml b/package.yaml index 6e31c8eca..a0409a43b 100644 --- a/package.yaml +++ b/package.yaml @@ -160,6 +160,7 @@ dependencies: - network-uri - psqueues - nonce + - IntervalMap other-extensions: - GeneralizedNewtypeDeriving - IncoherentInstances diff --git a/src/Application.hs b/src/Application.hs index e0f1da397..feb2980ec 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -106,6 +106,8 @@ import GHC.RTS.Flags (getRTSFlags) import qualified Prometheus +import qualified Data.IntervalMap.Strict as IntervalMap + -- Import all relevant handler modules here. -- (HPack takes care to add new modules to our cabal file nowadays.) import Handler.News @@ -187,6 +189,13 @@ makeFoundation appSettings''@AppSettings{..} = do ah <- initARCHandle arccMaximumGhost arccMaximumWeight void . Prometheus.register $ arcMetrics ARCFileSource ah return ah + appFileSourcePrewarm <- for appFileSourcePrewarmConf $ \PrewarmCacheConf{..} -> do + lh <- initLRUHandle precMaximumWeight + void . Prometheus.register $ lruMetrics LRUFileSourcePrewarm lh + return lh + appFileInjectInhibit <- liftIO $ newTVarIO IntervalMap.empty + for_ (guardOnM (isn't _JobsOffload appJobMode) appInjectFiles) $ \_ -> + void . Prometheus.register $ injectInhibitMetrics appFileInjectInhibit -- We need a log function to create a connection pool. We need a connection -- pool to create our foundation. And we need our foundation to get a diff --git a/src/Database/Esqueleto/Utils.hs b/src/Database/Esqueleto/Utils.hs index 9a5412cfa..ca7bb0c46 100644 --- a/src/Database/Esqueleto/Utils.hs +++ b/src/Database/Esqueleto/Utils.hs @@ -27,6 +27,7 @@ module Database.Esqueleto.Utils , SqlProject(..) , (->.), (#>>.) , fromSqlKey + , unKey , selectCountRows , selectMaybe , day, diffDays @@ -50,6 +51,8 @@ import qualified Data.ByteString.Lazy as Lazy (ByteString) import Crypto.Hash (Digest, SHA256) +import Data.Coerce (Coercible) + {-# ANN any ("HLint: ignore Use any" :: String) #-} {-# ANN all ("HLint: ignore Use all" :: String) #-} @@ -392,6 +395,12 @@ infixl 8 #>>. fromSqlKey :: (ToBackendKey SqlBackend entity, PersistField (Key entity)) => E.SqlExpr (E.Value (Key entity)) -> E.SqlExpr (E.Value Int64) fromSqlKey = E.veryUnsafeCoerceSqlExprValue + +unKey :: ( Coercible (Key entity) a + , PersistField (Key entity), PersistField a + ) + => E.SqlExpr (E.Value (Key entity)) -> E.SqlExpr (E.Value a) +unKey = E.veryUnsafeCoerceSqlExprValue selectCountRows :: (Num a, PersistField a, MonadIO m) => E.SqlQuery ignored -> E.SqlReadT m a diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index 19a1f5d65..9c525dfd6 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -25,6 +25,8 @@ import qualified Jose.Jwk as Jose import qualified Database.Memcached.Binary.IO as Memcached import Network.Minio (MinioConn) +import Data.IntervalMap.Strict (IntervalMap) + type SMTPPool = Pool SMTPConnection @@ -39,28 +41,30 @@ makePrisms ''SomeSessionStorage -- starts running, such as database connections. Every handler will have -- access to the data present here. data UniWorX = UniWorX - { appSettings' :: AppSettings - , appStatic :: EmbeddedStatic -- ^ Settings for static file serving. - , appConnPool :: ConnectionPool -- ^ Database connection pool. - , appSmtpPool :: Maybe SMTPPool - , appLdapPool :: Maybe (Failover (LdapConf, LdapPool)) - , appWidgetMemcached :: Maybe Memcached.Connection -- ^ Actually a proper pool - , appHttpManager :: Manager - , appLogger :: (ReleaseKey, TVar Logger) - , appLogSettings :: TVar LogSettings - , appCryptoIDKey :: CryptoIDKey - , appClusterID :: ClusterId - , appInstanceID :: InstanceId - , appJobState :: TMVar JobState - , appSessionStore :: SomeSessionStorage - , appSecretBoxKey :: SecretBox.Key - , appJSONWebKeySet :: Jose.JwkSet - , appHealthReport :: TVar (Set (UTCTime, HealthReport)) - , appMemcached :: Maybe (AEAD.Key, Memcached.Connection) - , appUploadCache :: Maybe MinioConn - , appVerpSecret :: VerpSecret - , appAuthKey :: Auth.Key - , appFileSourceARC :: Maybe (ARCHandle (FileContentChunkReference, (Int, Int)) Int ByteString) + { appSettings' :: AppSettings + , appStatic :: EmbeddedStatic -- ^ Settings for static file serving. + , appConnPool :: ConnectionPool -- ^ Database connection pool. + , appSmtpPool :: Maybe SMTPPool + , appLdapPool :: Maybe (Failover (LdapConf, LdapPool)) + , appWidgetMemcached :: Maybe Memcached.Connection -- ^ Actually a proper pool + , appHttpManager :: Manager + , appLogger :: (ReleaseKey, TVar Logger) + , appLogSettings :: TVar LogSettings + , appCryptoIDKey :: CryptoIDKey + , appClusterID :: ClusterId + , appInstanceID :: InstanceId + , appJobState :: TMVar JobState + , appSessionStore :: SomeSessionStorage + , appSecretBoxKey :: SecretBox.Key + , appJSONWebKeySet :: Jose.JwkSet + , appHealthReport :: TVar (Set (UTCTime, HealthReport)) + , appMemcached :: Maybe (AEAD.Key, Memcached.Connection) + , appUploadCache :: Maybe MinioConn + , appVerpSecret :: VerpSecret + , appAuthKey :: Auth.Key + , appFileSourceARC :: Maybe (ARCHandle (FileContentChunkReference, (Int, Int)) Int ByteString) + , appFileSourcePrewarm :: Maybe (LRUHandle (FileContentChunkReference, (Int, Int)) UTCTime Int ByteString) + , appFileInjectInhibit :: TVar (IntervalMap UTCTime (Set FileContentReference)) } makeLenses_ ''UniWorX diff --git a/src/Handler/Utils/Files.hs b/src/Handler/Utils/Files.hs index 8d4e60bf3..1d1425204 100644 --- a/src/Handler/Utils/Files.hs +++ b/src/Handler/Utils/Files.hs @@ -1,16 +1,18 @@ module Handler.Utils.Files ( sourceFile, sourceFile' , sourceFiles, sourceFiles' - , SourceFilesException(..) - , sourceFileDB, sourceFileMinio + , SourceFilesException(..), _SourceFilesMismatchedHashes, _SourceFilesContentUnavailable + , sourceFileDB, sourceFileDBChunks, sourceFileMinio , acceptFile , respondFileConditional ) where -import Import.NoFoundation +import Import.NoFoundation hiding (First(..)) import Foundation.Type import Utils.Metrics +import Data.Monoid (First(..)) + import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.List as C (unfoldM) @@ -23,6 +25,8 @@ import qualified Database.Esqueleto.Utils as E import System.FilePath (normalise, makeValid) import Data.List (dropWhileEnd) +import qualified Data.ByteString as ByteString + data SourceFilesException = SourceFilesMismatchedHashes @@ -30,53 +34,87 @@ data SourceFilesException deriving (Eq, Ord, Read, Show, Generic, Typeable) deriving anyclass (Exception) +makePrisms ''SourceFilesException + fileChunkARC :: ( MonadHandler m , HandlerSite m ~ UniWorX ) - => (FileContentChunkReference, (Int, Int)) + => Maybe Int + -> (FileContentChunkReference, (Int, Int)) -> m (Maybe ByteString) -> m (Maybe ByteString) -fileChunkARC k getChunkDB = do +fileChunkARC altSize k@(ref, (s, l)) getChunkDB' = do + prewarm <- getsYesod appFileSourcePrewarm + let getChunkDB = case prewarm of + Nothing -> getChunkDB' + Just lh -> do + chunkRes <- lookupLRUHandle lh k + case chunkRes of + Just (chunk, w) -> Just chunk <$ do + $logDebugS "fileChunkARC" "Prewarm hit" + liftIO $ observeSourcedChunk StoragePrewarm w + Nothing -> do + chunk' <- getChunkDB' + for chunk' $ \chunk -> chunk <$ do + let w = length chunk + $logDebugS "fileChunkARC" "Prewarm miss" + liftIO $ observeSourcedChunk StorageDB w + arc <- getsYesod appFileSourceARC case arc of Nothing -> getChunkDB Just ah -> do cachedARC' ah k $ \case Nothing -> do - chunk' <- getChunkDB + chunk' <- case assertM (> l) altSize of + -- This optimization works for the somewhat common case that cdc chunks are smaller than db chunks and start of the requested range is aligned with a db chunk boundary + Just altSize' + -> fmap getFirst . execWriterT . cachedARC' ah (ref, (s, altSize')) $ \x -> x <$ case x of + Nothing -> tellM $ First <$> getChunkDB + Just (v, _) -> tell . First . Just $ ByteString.take l v + Nothing -> getChunkDB for chunk' $ \chunk -> do let w = length chunk $logDebugS "fileChunkARC" "ARC miss" - liftIO $ observeSourcedChunk StorageDB w return (chunk, w) Just x@(_, w) -> do $logDebugS "fileChunkARC" "ARC hit" liftIO $ Just x <$ observeSourcedChunk StorageARC w -sourceFileDB :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) + +sourceFileDB :: forall m. + (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => FileContentReference -> ConduitT () ByteString (SqlPersistT m) () -sourceFileDB fileReference = do +sourceFileDB fileReference = chunkHashes + .| awaitForever (sourceFileDBChunks (const id) . unFileContentChunkKey . E.unValue) + .| C.map (view _1) + where + chunkHashes :: ConduitT () (E.Value FileContentChunkId) (SqlPersistT m) () + chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ] + return $ fileContentEntry E.^. FileContentEntryChunkHash + +sourceFileDBChunks :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, BackendCompatible SqlReadBackend backend) + => ((Int, Int) -> ReaderT SqlReadBackend m (Maybe ByteString) -> ReaderT SqlReadBackend m (Maybe ByteString)) -> FileContentChunkReference -> ConduitT i (ByteString, (Int, Int)) (ReaderT backend m) () +sourceFileDBChunks cont chunkHash = transPipe (withReaderT $ projectBackend @SqlReadBackend) $ do dbChunksize <- getsYesod $ view _appFileUploadDBChunksize - let retrieveChunk chunkHash = \case + let retrieveChunk = \case Nothing -> return Nothing Just start -> do - let getChunkDB = fmap (fmap E.unValue) . E.selectMaybe . E.from $ \fileContentChunk -> do - E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash + let getChunkDB = cont (start, dbChunksize) . fmap (fmap E.unValue) . E.selectMaybe . E.from $ \fileContentChunk -> do + E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) - chunk <- fileChunkARC (unFileContentChunkKey chunkHash, (start, dbChunksize)) getChunkDB + chunk <- fileChunkARC Nothing (chunkHash, (start, dbChunksize)) getChunkDB case chunk of Nothing -> throwM SourceFilesContentUnavailable Just c -> do - return . Just . (c, ) $ if + return . Just . ((c, (start, dbChunksize)), ) $ if | olength c >= dbChunksize -> Just $ start + dbChunksize | otherwise -> Nothing - chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do - E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference - E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ] - return $ fileContentEntry E.^. FileContentEntryChunkHash - chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int)) + C.unfoldM retrieveChunk $ Just (1 :: Int) sourceFileMinio :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m) @@ -212,7 +250,7 @@ respondFileConditional representationLastModified cType FileReference{..} = do let getChunkDB = fmap (fmap E.unValue) . E.selectMaybe . E.from $ \fileContentChunk -> do E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val $ min cLength' dbChunksize) - chunk <- fileChunkARC (chunkHash, (fromIntegral start, fromIntegral $ min cLength' dbChunksize)) getChunkDB + chunk <- fileChunkARC (Just $ fromIntegral dbChunksize) (chunkHash, (fromIntegral start, fromIntegral $ min cLength' dbChunksize)) getChunkDB case chunk of Nothing -> throwM SourceFilesContentUnavailable Just c -> do diff --git a/src/Jobs.hs b/src/Jobs.hs index 00fe2b1ef..b6de74e24 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -13,7 +13,8 @@ import Jobs.Queue import Jobs.Offload import Jobs.Crontab -import qualified Data.Conduit.List as C +import qualified Data.Conduit.Combinators as C +import qualified Data.Conduit.List as C (mapMaybe) import qualified Data.Text.Lazy as LT @@ -47,6 +48,14 @@ import Control.Concurrent.STM.Delay import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay) +import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.Utils as E + +import qualified Data.ByteString as ByteString + +import Handler.Utils.Files (sourceFileDBChunks, _SourceFilesContentUnavailable) + +import qualified Data.IntervalMap.Strict as IntervalMap import Jobs.Handler.SendNotification import Jobs.Handler.SendTestEmail @@ -577,6 +586,69 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker $logInfoS logIdent [st|Sleeping #{tshow secs}s...|] threadDelay msecs $logInfoS logIdent [st|Slept #{tshow secs}s.|] + handleCmd JobCtlPrewarmCache{..} = do + prewarm <- getsYesod appFileSourcePrewarm + for_ prewarm $ \lh -> lift . runDBRead $ + runConduit $ sourceFileChunkIds .| C.map E.unValue + .| awaitForever (\cRef -> handleC handleUnavailable $ sourceFileDBChunks (withLRU lh cRef) cRef .| C.map (cRef, )) + .| C.mapM_ (sinkChunkCache lh) + where + handleUnavailable e + | is _SourceFilesContentUnavailable e = return () + | otherwise = throwM e + withLRU lh cRef range getChunk = do + touched <- touchLRUHandle lh (cRef, range) jcTargetTime + case touched of + Just (bs, _) -> return $ Just bs + Nothing -> getChunk + (minBoundDgst, maxBoundDgst) = jcChunkInterval + sourceFileChunkIds = E.selectSource . E.from $ \fileContentEntry -> do + let cRef = E.unKey $ fileContentEntry E.^. FileContentEntryChunkHash + eRef = fileContentEntry E.^. FileContentEntryHash + E.where_ . E.and $ catMaybes + [ minBoundDgst <&> \b -> cRef E.>=. E.val b + , maxBoundDgst <&> \b -> cRef E.<. E.val b + ] + E.where_ $ matchesPrewarmSource eRef jcPrewarmSource + return cRef + sinkChunkCache lh (cRef, (c, range)) = insertLRUHandle lh (cRef, range) jcTargetTime (c, ByteString.length c) + handleCmd JobCtlInhibitInject{..} = maybeT (return ()) $ do + PrewarmCacheConf{..} <- MaybeT . getsYesod $ view _appFileSourcePrewarmConf + let inhibitInterval = IntervalMap.ClosedInterval + (addUTCTime (-precStart) jcTargetTime) + (addUTCTime (precInhibit - precStart) jcTargetTime) + sourceFileReferences = prewarmSourceReferences jcPrewarmSource + refs <- lift . lift . runDBRead . runConduit $ sourceFileReferences .| C.foldl (flip Set.insert) Set.empty + guard . not $ null refs + inhibitTVar <- getsYesod appFileInjectInhibit + atomically . modifyTVar' inhibitTVar $ force . IntervalMap.insertWith Set.union inhibitInterval refs + +matchesPrewarmSource :: E.SqlExpr (E.Value FileContentReference) -> JobCtlPrewarmSource -> E.SqlExpr (E.Value Bool) +matchesPrewarmSource eRef = \case + JobCtlPrewarmSheetFile{..} -> E.or + [ E.exists . E.from $ \sheetFile -> + E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet + E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType + E.&&. sheetFile E.^. SheetFileContent E.==. E.just eRef + , E.exists . E.from $ \personalisedSheetFile -> + E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet + E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType + E.&&. personalisedSheetFile E.^. PersonalisedSheetFileContent E.==. E.just eRef + ] + +prewarmSourceReferences :: JobCtlPrewarmSource -> ConduitT () FileContentReference (ReaderT SqlReadBackend (HandlerFor UniWorX)) () +prewarmSourceReferences = \case + JobCtlPrewarmSheetFile{..} -> (.| C.mapMaybe E.unValue) $ do + E.selectSource . E.from $ \sheetFile -> do + E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet + E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType + E.where_ . E.isJust $ sheetFile E.^. SheetFileContent + return $ sheetFile E.^. SheetFileContent + E.selectSource . E.from $ \personalisedSheetFile -> do + E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet + E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType + E.where_ . E.isJust $ personalisedSheetFile E.^. PersonalisedSheetFileContent + return $ personalisedSheetFile E.^. PersonalisedSheetFileContent jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a jLocked jId act = flip evalStateT False $ do diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index 79adf7a33..f29d5e03d 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -21,6 +21,17 @@ import qualified Data.Conduit.List as C import qualified Database.Esqueleto as E +import Jobs.Handler.Intervals.Utils + +import System.IO.Unsafe + +import Crypto.Hash (hashDigestSize, digestFromByteString) + +import Data.List (iterate) + +{-# NOINLINE prewarmCacheIntervalsCache #-} +prewarmCacheIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)]) +prewarmCacheIntervalsCache = unsafePerformIO $ newTVarIO Map.empty determineCrontab :: DB (Crontab JobCtl) -- ^ Extract all future jobs from the database (sheet deadlines, ...) @@ -49,6 +60,122 @@ determineCrontab = execWriterT $ do } Nothing -> mempty + let + tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (YesodDB UniWorX) () + tellPrewarmJobs jcPrewarmSource jcTargetTime = maybeT (return ()) $ do + PrewarmCacheConf{..} <- hoistMaybe appFileSourcePrewarmConf + + let + chunkHashBytes :: forall h. + ( Unwrapped FileContentChunkReference ~ Digest h ) + => Integer + chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h)) + intervals <- mkIntervalsCached prewarmCacheIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) precSteps + + let step = realToFrac $ toRational (precStart - precEnd) / toRational precSteps + step' = realToFrac $ toRational step / precMaxSpeedup + + mapM_ tell + [ HashMap.singleton + JobCtlPrewarmCache{..} + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ ts + , cronRepeat = CronRepeatOnChange + , cronRateLimit = step' + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ ts' + } + | jcChunkInterval <- intervals + | ts <- iterate (addUTCTime step) $ addUTCTime (-precStart) jcTargetTime + | ts' <- iterate (addUTCTime step') $ addUTCTime (subtract precStart . realToFrac $ toRational (precStart - precEnd) * (1 - recip precMaxSpeedup)) jcTargetTime + ] + + lift . maybeT (return ()) $ do + injectInterval <- fmap abs . MaybeT . getsYesod $ view _appInjectFiles + tell $ HashMap.singleton + JobCtlInhibitInject{..} + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (negate $ precStart + injectInterval + 10) jcTargetTime + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = injectInterval / 2 + , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (precInhibit - precStart) jcTargetTime + } + + let + sheetJobs (Entity nSheet Sheet{..}) = do + for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> do + tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetExercise) aFrom + + when (isn't _JobsOffload appJobMode) $ do + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo + } + for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> do + tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetHint) hFrom + + when (isn't _JobsOffload appJobMode) . maybeT (return ()) $ do + guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom + guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) + (fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]) + guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet] + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetHint{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo + } + for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> do + tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetSolution) sFrom + + when (isn't _JobsOffload appJobMode) . maybeT (return ()) $ do + guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom + guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left nominalDay + } + when (isn't _JobsOffload appJobMode) $ do + for_ sheetActiveTo $ \aTo -> do + whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo' + , cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo + } + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + when sheetAutoDistribute $ + tell $ HashMap.singleton + (JobCtlQueue $ JobDistributeCorrections nSheet) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo + , cronRepeat = CronRepeatNever + , cronRateLimit = 3600 -- Irrelevant due to `cronRepeat` + , cronNotAfter = Left nominalDay + } + + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs + + when (isn't _JobsOffload appJobMode) $ do case appJobFlushInterval of Just interval -> tell $ HashMap.singleton @@ -233,71 +360,6 @@ determineCrontab = execWriterT $ do , cronNotAfter = Left within } - let - sheetJobs (Entity nSheet Sheet{..}) = do - for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo - } - for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do - guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom - guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) - (fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]) - guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet] - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetHint{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo - } - for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do - guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom - guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left nominalDay - } - for_ sheetActiveTo $ \aTo -> do - whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo' - , cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo - } - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - when sheetAutoDistribute $ - tell $ HashMap.singleton - (JobCtlQueue $ JobDistributeCorrections nSheet) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo - , cronRepeat = CronRepeatNever - , cronRateLimit = 3600 -- Irrelevant due to `cronRepeat` - , cronNotAfter = Left nominalDay - } - - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs - let correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB () correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 0415a1083..10330e158 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -40,6 +40,13 @@ import Jobs.Queue (YesodJobDB) import Jobs.Handler.Intervals.Utils +import Data.IntervalMap.Strict (IntervalMap) +import qualified Data.IntervalMap.Strict as IntervalMap + +import Control.Concurrent.STM.TVar (stateTVar) + +import qualified Data.Foldable as F + dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin @@ -256,6 +263,15 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do uploadBucket <- getsYesod $ view _appUploadCacheBucket interval <- getsYesod $ view _appInjectFiles + now <- liftIO getCurrentTime + let + extractInhibited :: IntervalMap UTCTime (Set FileContentReference) + -> (Set FileContentReference, IntervalMap UTCTime (Set FileContentReference)) + extractInhibited cState = (F.fold current, IntervalMap.union current upcoming) + where + (_, current, upcoming) = IntervalMap.splitIntersecting cState $ IntervalMap.OpenInterval (addUTCTime (-2) now) (addUTCTime 2 now) + inhibited <- atomically . flip stateTVar extractInhibited =<< getsYesod appFileInjectInhibit + let extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference extractReference _ = Nothing @@ -321,6 +337,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do (Sum injectedFiles, Sum injectedSize) <- runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) .| C.mapMaybe extractReference + .| C.filter (views _2 (`Set.notMember` inhibited)) .| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1) diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index d8ce7f470..f83311d34 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -5,7 +5,8 @@ module Jobs.Types ( Job(..), Notification(..) , JobChildren , classifyJob - , JobCtl(..) + , JobCtlPrewarmSource(..), _jcpsSheet, _jcpsSheetFileType + , JobCtl(..), _jcPrewarmSource, _jcChunkInterval , classifyJobCtl , YesodJobDB , JobHandler(..), _JobHandlerAtomic, _JobHandlerException @@ -45,6 +46,8 @@ import GHC.Conc (unsafeIOToSTM) import Data.Generics.Product.Types (Children, ChGeneric) +{-# ANN module ("HLint: ignore Use newtype instead of data" :: String) #-} + data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } @@ -178,22 +181,48 @@ classifyJob job = unpack tag Aeson.String tag = obj HashMap.! "job" +data JobCtlPrewarmSource + = JobCtlPrewarmSheetFile + { jcpsSheet :: SheetId + , jcpsSheetFileType :: SheetFileType + } + deriving (Eq, Ord, Read, Show, Generic, Typeable) + deriving anyclass (Hashable, NFData) + +makeLenses_ ''JobCtlPrewarmSource + +deriveJSON defaultOptions + { constructorTagModifier = camelToPathPiece' 3 + , fieldLabelModifier = camelToPathPiece' 1 + , tagSingleConstructors = True + , sumEncoding = TaggedObject "source" "data" + } ''JobCtlPrewarmSource + data JobCtl = JobCtlFlush | JobCtlPerform QueuedJobId + | JobCtlPrewarmCache + { jcPrewarmSource :: JobCtlPrewarmSource + , jcTargetTime :: UTCTime + , jcChunkInterval :: (Maybe FileContentChunkReference, Maybe FileContentChunkReference) + } + | JobCtlInhibitInject + { jcPrewarmSource :: JobCtlPrewarmSource + , jcTargetTime :: UTCTime + } | JobCtlDetermineCrontab | JobCtlQueue Job | JobCtlGenerateHealthReport HealthCheck | JobCtlTest | JobCtlSleep Micro -- | For debugging deriving (Eq, Ord, Read, Show, Generic, Typeable) + deriving anyclass (Hashable, NFData) makePrisms ''JobCtl - -instance Hashable JobCtl -instance NFData JobCtl +makeLenses_ ''JobCtl deriveJSON defaultOptions { constructorTagModifier = camelToPathPiece' 2 + , fieldLabelModifier = camelToPathPiece' 1 , tagSingleConstructors = True , sumEncoding = TaggedObject "instruction" "data" } ''JobCtl diff --git a/src/Model/Types/Sheet.hs b/src/Model/Types/Sheet.hs index 4e115f056..0528cb454 100644 --- a/src/Model/Types/Sheet.hs +++ b/src/Model/Types/Sheet.hs @@ -176,12 +176,10 @@ makeLenses_ ''SheetGroup data SheetFileType = SheetExercise | SheetHint | SheetSolution | SheetMarking deriving (Show, Read, Eq, Ord, Enum, Bounded, Generic) + deriving anyclass (Hashable, NFData, Universe, Finite) derivePersistField "SheetFileType" - -instance Universe SheetFileType -instance Finite SheetFileType - finitePathPiece ''SheetFileType ["file", "hint", "solution", "marking"] +pathPieceJSON ''SheetFileType sheetFile2markup :: SheetFileType -> Markup sheetFile2markup SheetExercise = iconSFTQuestion diff --git a/src/Settings.hs b/src/Settings.hs index e87cedfd4..83846b1aa 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -75,6 +75,8 @@ import qualified Network.Minio as Minio import Data.Conduit.Algorithms.FastCDC +import Utils.Lens.TH + -- | Runtime settings to configure this application. These settings can be -- loaded from various sources: defaults, environment variables, config files, @@ -214,6 +216,7 @@ data AppSettings = AppSettings , appMemcacheAuth :: Bool , appFileSourceARCConf :: Maybe (ARCConf Int) + , appFileSourcePrewarmConf :: Maybe PrewarmCacheConf } deriving Show data JobMode = JobsLocal { jobsAcceptOffload :: Bool } @@ -342,6 +345,13 @@ data ARCConf w = ARCConf , arccMaximumWeight :: w } deriving (Eq, Ord, Read, Show, Generic, Typeable) +data PrewarmCacheConf = PrewarmCacheConf + { precMaximumWeight :: Int + , precStart, precEnd, precInhibit :: NominalDiffTime -- ^ Prewarming cache starts at @t - precStart@ and should be finished by @t - precEnd@; injecting from minio to database is inhibited from @t - precStart@ until @t - precStart + precInhibit@ + , precSteps :: Natural + , precMaxSpeedup :: Rational + } deriving (Eq, Ord, Read, Show, Generic, Typeable) + nullaryPathPiece ''ApprootScope $ camelToPathPiece' 1 pathPieceJSON ''ApprootScope pathPieceJSONKey ''ApprootScope @@ -371,6 +381,12 @@ deriveJSON defaultOptions deriveJSON defaultOptions { fieldLabelModifier = camelToPathPiece' 1 } ''ARCConf + +deriveJSON defaultOptions + { fieldLabelModifier = camelToPathPiece' 1 + } ''PrewarmCacheConf + +makeLenses_ ''PrewarmCacheConf instance FromJSON LdapConf where parseJSON = withObject "LdapConf" $ \o -> do @@ -632,7 +648,17 @@ instance FromJSON AppSettings where appStudyFeaturesRecacheRelevanceWithin <- o .:? "study-features-recache-relevance-within" appStudyFeaturesRecacheRelevanceInterval <- o .: "study-features-recache-relevance-interval" - appFileSourceARCConf <- assertM ((||) <$> ((> 0) . arccMaximumGhost) <*> ((> 0) . arccMaximumWeight)) <$> o .:? "file-source-arc" + let isValidARCConf ARCConf{..} = arccMaximumWeight > 0 + appFileSourceARCConf <- assertM isValidARCConf <$> o .:? "file-source-arc" + + let isValidPrewarmConf PrewarmCacheConf{..} = and + [ precMaximumWeight > 0 + , precStart >= 0 + , precEnd >= 0, precEnd <= precStart + , precSteps > 0 + , precMaxSpeedup >= 1 + ] + appFileSourcePrewarmConf <- over (_Just . _precInhibit) (max 0) . assertM isValidPrewarmConf <$> o .:? "file-source-prewarm" return AppSettings{..} diff --git a/src/Utils.hs b/src/Utils.hs index 274acfe5f..b76a9b669 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -39,6 +39,7 @@ import Utils.NTop as Utils import Utils.HttpConditional as Utils import Utils.Persist as Utils import Utils.ARC as Utils +import Utils.LRU as Utils import Text.Blaze (Markup, ToMarkup(..)) diff --git a/src/Utils/ARC.hs b/src/Utils/ARC.hs index eb86b6134..0f1b94ee5 100644 --- a/src/Utils/ARC.hs +++ b/src/Utils/ARC.hs @@ -3,6 +3,7 @@ module Utils.ARC , ARC, initARC , arcAlterF, lookupARC, insertARC , ARCHandle, initARCHandle, cachedARC, cachedARC' + , lookupARCHandle , readARCHandle , arcRecentSize, arcFrequentSize, arcGhostRecentSize, arcGhostFrequentSize , getARCRecentWeight, getARCFrequentWeight @@ -17,6 +18,7 @@ import qualified Data.OrdPSQ as OrdPSQ import Control.Lens -- https://web.archive.org/web/20210115184012/https://dbs.uni-leipzig.de/file/ARC.pdf +-- https://jaspervdj.be/posts/2015-02-24-lru-cache.html newtype ARCTick = ARCTick { _getARCTick :: Word64 } @@ -268,6 +270,17 @@ cachedARC :: forall k w v m. -> (Maybe (v, w) -> m (v, w)) -> m v cachedARC h k f = fromMaybe (error "cachedARC: cachedARC' returned Nothing") <$> cachedARC' h k (fmap Just . f) + +lookupARCHandle :: forall k w v m. + ( MonadIO m + , Ord k + , Integral w + ) + => ARCHandle k w v + -> k + -> m (Maybe (v, w)) +lookupARCHandle (ARCHandle arcVar) k = lookupARC k <$> readIORef arcVar + readARCHandle :: MonadIO m => ARCHandle k w v diff --git a/src/Utils/LRU.hs b/src/Utils/LRU.hs new file mode 100644 index 000000000..19ad3e0c5 --- /dev/null +++ b/src/Utils/LRU.hs @@ -0,0 +1,213 @@ +module Utils.LRU + ( LRUTick + , LRU, initLRU + , insertLRU, lookupLRU, touchLRU, timeoutLRU + , LRUHandle, initLRUHandle + , insertLRUHandle, lookupLRUHandle, touchLRUHandle, timeoutLRUHandle + , readLRUHandle + , lruStoreSize + , getLRUWeight + , describeLRU + ) where + +import ClassyPrelude + +import Data.OrdPSQ (OrdPSQ) +import qualified Data.OrdPSQ as OrdPSQ + +import Control.Lens + +-- https://jaspervdj.be/posts/2015-02-24-lru-cache.html + + +newtype LRUTick = LRUTick { _getLRUTick :: Word64 } + deriving (Eq, Ord, Show, Typeable) + deriving newtype (NFData) + +makeLenses ''LRUTick + +data LRU k t w v = LRU + { lruStore :: !(OrdPSQ k (t, LRUTick) (v, w)) + , lruWeight :: !w + , lruMaximumWeight :: !w + } + +instance (NFData k, NFData t, NFData w, NFData v) => NFData (LRU k t w v) where + rnf LRU{..} = rnf lruStore + `seq` rnf lruWeight + `seq` rnf lruMaximumWeight + +describeLRU :: Show w + => LRU k t w v + -> String +describeLRU LRU{..} = intercalate ", " + [ "lruStore: " <> show (OrdPSQ.size lruStore) + , "lruWeight: " <> show lruWeight + , "lruMaximumWeight: " <> show lruMaximumWeight + ] + +lruStoreSize :: LRU k t w v -> Int +lruStoreSize = OrdPSQ.size . lruStore + +getLRUWeight :: LRU k t w v -> w +getLRUWeight = lruWeight + +initialLRUTick, maximumLRUTick :: LRUTick +initialLRUTick = LRUTick 0 +maximumLRUTick = LRUTick maxBound + +initLRU :: forall k t w v. + Integral w + => w -- ^ @lruMaximumWeight@ + -> (LRU k t w v, LRUTick) +initLRU lruMaximumWeight + | lruMaximumWeight < 0 = error "initLRU given negative maximum weight" + | otherwise = (, initialLRUTick) LRU + { lruStore = OrdPSQ.empty + , lruWeight = 0 + , lruMaximumWeight + } + +insertLRU :: forall k t w v. + ( Ord k, Ord t + , Integral w + ) + => k + -> t + -> Maybe (v, w) + -> LRU k t w v + -> LRUTick -> (LRU k t w v, LRUTick) +insertLRU k t newVal oldLRU@LRU{..} now + | later <= initialLRUTick = uncurry (insertLRU k t newVal) $ initLRU lruMaximumWeight + | Just (_, w) <- newVal, w > lruMaximumWeight = (oldLRU, now) + | Just (_, w) <- newVal = (, later) $ + let (lruStore', lruWeight') = evictToSize (lruMaximumWeight - w) lruStore lruWeight + ((fromMaybe 0 . preview (_Just . _2 . _2) -> oldWeight), lruStore'') + = OrdPSQ.alter (, ((t, now), ) <$> newVal) k lruStore' + in oldLRU + { lruStore = lruStore'' + , lruWeight = lruWeight' - oldWeight + w + } + | Just (_, (_, w), lruStore') <- OrdPSQ.deleteView k lruStore = (, now) oldLRU + { lruStore = lruStore' + , lruWeight = lruWeight - w + } + | otherwise = (oldLRU, now) + where + later :: LRUTick + later = over getLRUTick succ now + + evictToSize :: w -> OrdPSQ k (t, LRUTick) (v, w) -> w -> (OrdPSQ k (t, LRUTick) (v, w), w) + evictToSize tSize c cSize + | cSize <= tSize = (c, cSize) + | Just (_, _, (_, w'), c') <- OrdPSQ.minView c = evictToSize tSize c' (cSize - w') + | otherwise = error "evictToSize: cannot reach required size through eviction" + +lookupLRU :: forall k t w v. + Ord k + => k + -> LRU k t w v + -> Maybe (v, w) +lookupLRU k LRU{..} = view _2 <$> OrdPSQ.lookup k lruStore + +touchLRU :: forall k t w v. + ( Ord k, Ord t + , Integral w + ) + => k + -> t + -> LRU k t w v + -> LRUTick -> ((LRU k t w v, LRUTick), Maybe (v, w)) +touchLRU k t oldLRU@LRU{..} now + | (Just (_, v), _) <- altered + , later <= initialLRUTick = (, Just v) . uncurry (insertLRU k t $ Just v) $ initLRU lruMaximumWeight + | (Just (_, v), lruStore') <- altered = ((oldLRU{ lruStore = lruStore' }, later), Just v) + | otherwise = ((oldLRU, now), Nothing) + where + altered = OrdPSQ.alter (\oldVal -> (oldVal, over _1 (max (t, later)) <$> oldVal)) k lruStore + + later :: LRUTick + later = over getLRUTick succ now + +timeoutLRU :: forall k t w v. + ( Ord k, Ord t + , Integral w + ) + => t + -> LRU k t w v + -> LRU k t w v +timeoutLRU t oldLRU@LRU{..} = oldLRU + { lruStore = lruStore' + , lruWeight = lruWeight - evictedWeight + } + where + (evicted, lruStore') = OrdPSQ.atMostView (t, maximumLRUTick) lruStore + evictedWeight = sumOf (folded . _3 . _2) evicted + +newtype LRUHandle k t w v = LRUHandle { _getLRUHandle :: IORef (LRU k t w v, LRUTick) } + deriving (Eq, Typeable) + +initLRUHandle :: forall k t w v m. + ( MonadIO m + , Integral w + ) + => w -- ^ @lruMaximumWeight@ + -> m (LRUHandle k t w v) +initLRUHandle maxWeight = fmap LRUHandle . newIORef $ initLRU maxWeight + +insertLRUHandle :: forall k t w v m. + ( MonadIO m + , Ord k, Ord t + , Integral w + , NFData k, NFData t, NFData w, NFData v + ) + => LRUHandle k t w v + -> k + -> t + -> (v, w) + -> m () +insertLRUHandle (LRUHandle lruVar) k t newVal + = modifyIORef' lruVar $ force . uncurry (insertLRU k t $ Just newVal) + +lookupLRUHandle :: forall k t w v m. + ( MonadIO m + , Ord k + ) + => LRUHandle k t w v + -> k + -> m (Maybe (v, w)) +lookupLRUHandle (LRUHandle lruVar) k + = views _1 (lookupLRU k) <$> readIORef lruVar + +touchLRUHandle :: forall k t w v m. + ( MonadIO m + , Ord k, Ord t + , Integral w + , NFData k, NFData t, NFData w, NFData v + ) + => LRUHandle k t w v + -> k + -> t + -> m (Maybe (v, w)) +touchLRUHandle (LRUHandle lruVar) k t = do + oldLRU <- readIORef lruVar + let (newLRU, touched) = uncurry (touchLRU k t) oldLRU + force newLRU `seq` writeIORef lruVar newLRU + return touched + +timeoutLRUHandle :: forall k t w v m. + ( MonadIO m + , Ord k, Ord t + , Integral w + , NFData k, NFData t, NFData w, NFData v + ) + => LRUHandle k t w v + -> t + -> m () +timeoutLRUHandle (LRUHandle lruVar) t + = modifyIORef' lruVar $ force . over _1 (timeoutLRU t) + +readLRUHandle :: MonadIO m + => LRUHandle k t w v + -> m (LRU k t w v, LRUTick) +readLRUHandle (LRUHandle lruVar) = readIORef lruVar diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 27503acf9..19f51277b 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -17,6 +17,9 @@ module Utils.Metrics , observeMissingFiles , ARCMetrics, ARCLabel(..) , arcMetrics + , LRUMetrics, LRULabel(..) + , lruMetrics + , InjectInhibitMetrics, injectInhibitMetrics ) where import Import.NoModel hiding (Vector, Info) @@ -42,6 +45,11 @@ import Jobs.Types import qualified Data.Aeson as Aeson import qualified Data.HashMap.Strict as HashMap +import Data.IntervalMap.Strict (IntervalMap) +import qualified Data.IntervalMap.Strict as IntervalMap + +import qualified Data.Foldable as F + {-# ANN module ("HLint: ignore Use even" :: String) #-} @@ -230,6 +238,10 @@ missingFiles = unsafeRegister . vector "ref" $ gauge info where info = Info "uni2work_missing_files_count" "Number of files referenced from within database that are missing" +relabel :: Text -> Text + -> SampleGroup -> SampleGroup +relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v + data ARCMetrics = ARCMetrics data ARCLabel = ARCFileSource @@ -244,7 +256,6 @@ arcMetrics :: Integral w -> Metric ARCMetrics arcMetrics lbl ah = Metric $ return (ARCMetrics, collectARCMetrics) where - relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v labelArc = relabel "arc" collectARCMetrics = map (labelArc $ toPathPiece lbl) <$> do @@ -266,6 +277,59 @@ arcMetrics lbl ah = Metric $ return (ARCMetrics, collectARCMetrics) weightInfo = Info "arc_weight" "Sum of weights of entries in the ARC LRUs" +data LRUMetrics = LRUMetrics + +data LRULabel = LRUFileSourcePrewarm + deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) + deriving anyclass (Universe, Finite) + +nullaryPathPiece ''LRULabel $ camelToPathPiece' 1 + +lruMetrics :: Integral w + => LRULabel + -> LRUHandle k t w v + -> Metric LRUMetrics +lruMetrics lbl lh = Metric $ return (LRUMetrics, collectLRUMetrics) + where + labelLru = relabel "lru" + + collectLRUMetrics = map (labelLru $ toPathPiece lbl) <$> do + (lru, _) <- readLRUHandle lh + return + [ SampleGroup sizeInfo GaugeType + [ Sample "lru_size" [] . encodeUtf8 . tshow $ lruStoreSize lru + ] + , SampleGroup weightInfo GaugeType + [ Sample "lru_weight" [] . encodeUtf8 . tshow . toInteger $ getLRUWeight lru + ] + ] + sizeInfo = Info "lru_size" + "Number of entries in the LRU" + weightInfo = Info "lru_weight" + "Sum of weights of entries in the LRU" + +data InjectInhibitMetrics = InjectInhibitMetrics + +injectInhibitMetrics :: TVar (IntervalMap UTCTime (Set FileContentReference)) + -> Metric InjectInhibitMetrics +injectInhibitMetrics tvar = Metric $ return (InjectInhibitMetrics, collectInjectInhibitMetrics) + where + collectInjectInhibitMetrics = do + inhibits <- readTVarIO tvar + return + [ SampleGroup intervalsInfo GaugeType + [ Sample "uni2work_inject_inhibited_intervals_count" [] . encodeUtf8 . tshow $ IntervalMap.size inhibits + ] + , SampleGroup hashesInfo GaugeType + [ Sample "uni2work_inject_inhibited_hashes_count" [] . encodeUtf8 . tshow . Set.size $ F.fold inhibits + ] + ] + intervalsInfo = Info "uni2work_inject_inhibited_intervals_count" + "Number of distinct time intervals in which we don't transfer some files from upload cache to db" + hashesInfo = Info "uni2work_inject_inhibited_hashes_count" + "Number of files which we don't transfer from upload cache to db during some interval" + + withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport withHealthReportMetrics act = do before <- liftIO getPOSIXTime @@ -373,7 +437,7 @@ observeLoginOutcome plugin outcome registerJobHeldLocksCount :: MonadIO m => TVar (Set QueuedJobId) -> m () registerJobHeldLocksCount = liftIO . void . register . jobHeldLocksCount -data FileChunkStorage = StorageMinio | StorageDB | StorageARC +data FileChunkStorage = StorageMinio | StorageDB | StorageARC | StoragePrewarm deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite) nullaryPathPiece ''FileChunkStorage $ camelToPathPiece' 1