refactor(memcached): remove ARC cache and LRU logic some more

more leftover dead code was removed, especially cache prewarm options that no longer had an effect on a non-existing ARC cache
This commit is contained in:
Steffen Jost 2024-10-04 12:19:27 +02:00
parent f17d89c21e
commit e625dca6ea
12 changed files with 37 additions and 569 deletions

View File

@ -315,17 +315,6 @@ fallback-personalised-sheet-files-keys-expire: 2419200
download-token-expire: 604801 download-token-expire: 604801
file-source-arc:
maximum-ghost: 512
maximum-weight: 1073741824 # 1GiB
file-source-prewarm:
maximum-weight: 1073741824 # 1GiB
start: 1800 # 30m
end: 600 # 10m
inhibit: 3600 # 60m
steps: 20
max-speedup: 3
bot-mitigations: bot-mitigations:
- only-logged-in-table-sorting - only-logged-in-table-sorting
- unauthorized-form-honeypots - unauthorized-form-honeypots

View File

@ -115,8 +115,6 @@ import GHC.RTS.Flags (getRTSFlags)
import qualified Prometheus import qualified Prometheus
import qualified Data.IntervalMap.Strict as IntervalMap
import qualified Utils.Pool as Custom import qualified Utils.Pool as Custom
import qualified System.Clock as Clock import qualified System.Clock as Clock
@ -216,14 +214,6 @@ makeFoundation appSettings''@AppSettings{..} = do
appJobState <- liftIO newEmptyTMVarIO appJobState <- liftIO newEmptyTMVarIO
appHealthReport <- liftIO $ newTVarIO Set.empty appHealthReport <- liftIO $ newTVarIO Set.empty
appFileSourcePrewarm <- for appFileSourcePrewarmConf $ \PrewarmCacheConf{..} -> do
lh <- initLRUHandle precMaximumWeight
void . Prometheus.register $ lruMetrics LRUFileSourcePrewarm lh
return lh
appFileInjectInhibit <- liftIO $ newTVarIO IntervalMap.empty
for_ (guardOnM (isn't _JobsOffload appJobMode) appInjectFiles) $ \_ ->
void . Prometheus.register $ injectInhibitMetrics appFileInjectInhibit
appStartTime <- liftIO getCurrentTime appStartTime <- liftIO getCurrentTime
-- We need a log function to create a connection pool. We need a connection -- We need a log function to create a connection pool. We need a connection
-- pool to create our foundation. And we need our foundation to get a -- pool to create our foundation. And we need our foundation to get a

View File

@ -30,8 +30,6 @@ import qualified Jose.Jwk as Jose
import qualified Database.Memcached.Binary.IO as Memcached import qualified Database.Memcached.Binary.IO as Memcached
import Network.Minio (MinioConn) import Network.Minio (MinioConn)
import Data.IntervalMap.Strict (IntervalMap)
import qualified Utils.Pool as Custom import qualified Utils.Pool as Custom
import Utils.Metrics (DBConnUseState) import Utils.Metrics (DBConnUseState)
@ -84,8 +82,6 @@ data UniWorX = UniWorX
, appUploadCache :: Maybe MinioConn , appUploadCache :: Maybe MinioConn
, appVerpSecret :: VerpSecret , appVerpSecret :: VerpSecret
, appAuthKey :: Auth.Key , appAuthKey :: Auth.Key
, appFileSourcePrewarm :: Maybe (LRUHandle (FileContentChunkReference, (Int, Int)) UTCTime Int ByteString)
, appFileInjectInhibit :: TVar (IntervalMap UTCTime (Set FileContentReference))
, appPersonalisedSheetFilesSeedKey :: PersonalisedSheetFilesSeedKey , appPersonalisedSheetFilesSeedKey :: PersonalisedSheetFilesSeedKey
, appVolatileClusterSettingsCache :: TVar VolatileClusterSettingsCache , appVolatileClusterSettingsCache :: TVar VolatileClusterSettingsCache
, appStartTime :: UTCTime -- for Status Page , appStartTime :: UTCTime -- for Status Page

View File

@ -42,36 +42,17 @@ data SourceFilesException
makePrisms ''SourceFilesException makePrisms ''SourceFilesException
fileChunk :: ( MonadHandler m fileChunk :: ( MonadHandler m )
, HandlerSite m ~ UniWorX => m (Maybe (ByteString, Maybe FileChunkStorage))
)
=> (FileContentChunkReference, (Int, Int))
-> m (Maybe (ByteString, Maybe FileChunkStorage))
-> m (Maybe ByteString) -> m (Maybe ByteString)
fileChunk k getChunkDB' = do fileChunk getChunkDB' = do
prewarm <- getsYesod appFileSourcePrewarm
-- NOTE: crude surgery happened here to remove ARC caching; useless artifacts may have remained -- NOTE: crude surgery happened here to remove ARC caching; useless artifacts may have remained
case prewarm of chunk' <- getChunkDB'
Nothing -> do for chunk' $ \(chunk, mStorage) -> chunk <$ do
chunk' <- getChunkDB' $logDebugS "fileChunkARC" "No prewarm"
for chunk' $ \(chunk, mStorage) -> chunk <$ do for_ mStorage $ \storage ->
$logDebugS "fileChunkARC" "No prewarm" let w = length chunk
for_ mStorage $ \storage -> in liftIO $ observeSourcedChunk storage w
let w = length chunk
in liftIO $ observeSourcedChunk storage w
Just lh -> do
chunkRes <- lookupLRUHandle lh k
case chunkRes of
Just (chunk, w) -> Just chunk <$ do
$logDebugS "fileChunkARC" "Prewarm hit"
liftIO $ observeSourcedChunk StoragePrewarm w
Nothing -> do
chunk' <- getChunkDB'
for chunk' $ \(chunk, mStorage) -> chunk <$ do
$logDebugS "fileChunkARC" "Prewarm miss"
for_ mStorage $ \storage ->
let w = length chunk
in liftIO $ observeSourcedChunk storage w
sourceFileDB :: forall m. sourceFileDB :: forall m.
@ -100,7 +81,7 @@ sourceFileChunks cont chunkHash = transPipe (withReaderT $ projectBackend @SqlRe
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
getChunkMinio = fmap (, StorageMinio) . catchIfMaybeT (is _SourceFilesContentUnavailable) . runConduit $ sourceMinio (Left chunkHash) (Just $ ByteRangeFromTo (fromIntegral $ pred start) (fromIntegral . pred $ pred start + dbChunksize)) .| C.fold getChunkMinio = fmap (, StorageMinio) . catchIfMaybeT (is _SourceFilesContentUnavailable) . runConduit $ sourceMinio (Left chunkHash) (Just $ ByteRangeFromTo (fromIntegral $ pred start) (fromIntegral . pred $ pred start + dbChunksize)) .| C.fold
in getChunkDB' <|> getChunkMinio in getChunkDB' <|> getChunkMinio
chunk <- fileChunk (chunkHash, (start, dbChunksize)) getChunkDB chunk <- fileChunk getChunkDB
case chunk of case chunk of
Just c | olength c <= 0 -> return Nothing Just c | olength c <= 0 -> return Nothing
Just c -> do Just c -> do
@ -232,7 +213,7 @@ respondFileConditional representationLastModified cType FileReference{..} = do
let getChunkDB = fmap (fmap $ (, Just StorageDB) . E.unValue) . E.selectOne . E.from $ \fileContentChunk -> do let getChunkDB = fmap (fmap $ (, Just StorageDB) . E.unValue) . E.selectOne . E.from $ \fileContentChunk -> do
E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val $ min cLength' dbChunksize) return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val $ min cLength' dbChunksize)
chunk <- fileChunk (chunkHash, (fromIntegral start, fromIntegral $ min cLength' dbChunksize)) getChunkDB chunk <- fileChunk getChunkDB
case chunk of case chunk of
Nothing -> throwM SourceFilesContentUnavailable Nothing -> throwM SourceFilesContentUnavailable
Just c -> do Just c -> do

View File

@ -18,7 +18,6 @@ import Jobs.Offload
import Jobs.Crontab import Jobs.Crontab
import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (mapMaybe)
import qualified Data.Text.Lazy as LT import qualified Data.Text.Lazy as LT
@ -52,15 +51,6 @@ import Control.Concurrent.STM.Delay
import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay) import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay)
import qualified Database.Esqueleto.Legacy as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.ByteString as ByteString
import Handler.Utils.Files (sourceFileChunks, _SourceFilesContentUnavailable)
import qualified Data.IntervalMap.Strict as IntervalMap
import Jobs.Handler.SendNotification import Jobs.Handler.SendNotification
import Jobs.Handler.SendTestEmail import Jobs.Handler.SendTestEmail
import Jobs.Handler.QueueNotification import Jobs.Handler.QueueNotification
@ -91,7 +81,7 @@ import Type.Reflection (typeOf)
import System.Clock import System.Clock
data JobQueueException = JInvalid QueuedJobId QueuedJob data JobQueueException = JInvalid QueuedJobId QueuedJob
| JLocked QueuedJobId InstanceId UTCTime | JLocked QueuedJobId InstanceId UTCTime
| JNonexistant QueuedJobId | JNonexistant QueuedJobId
@ -188,7 +178,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
let let
routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ()) routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ())
routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask' routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask'
actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask') actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask')
let handleExc e = do let handleExc e = do
@ -196,12 +186,12 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
atomically $ do atomically $ do
jState <- tryReadTMVar appJobState jState <- tryReadTMVar appJobState
for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown () for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown ()
void $ wait actAsync void $ wait actAsync
throwM e throwM e
unmask (wait actAsync) `catchAll` handleExc unmask (wait actAsync) `catchAll` handleExc
num :: Int num :: Int
num = fromIntegral $ foundation ^. _appJobWorkers num = fromIntegral $ foundation ^. _appJobWorkers
@ -209,7 +199,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
spawnMissingWorkers = do spawnMissingWorkers = do
shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard $ not shouldTerminate' guard $ not shouldTerminate'
oldState <- takeTMVar appJobState oldState <- takeTMVar appJobState
let missing = num - Map.size (jobWorkers oldState) let missing = num - Map.size (jobWorkers oldState)
guard $ missing > 0 guard $ missing > 0
@ -266,7 +256,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!) atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!)
go go
in go in go
terminateGracefully :: (() -> ContT () m ()) -> STM (ContT () m ()) terminateGracefully :: (() -> ContT () m ()) -> STM (ContT () m ())
terminateGracefully terminate = do terminateGracefully terminate = do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
@ -329,7 +319,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
respawn <$ case cOffload of respawn <$ case cOffload of
Nothing -> return () Nothing -> return ()
Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler
stopJobCtl :: MonadUnliftIO m => UniWorX -> m () stopJobCtl :: MonadUnliftIO m => UniWorX -> m ()
-- ^ Stop all worker threads currently running -- ^ Stop all worker threads currently running
@ -388,7 +378,7 @@ execCrontab = do
let doJob = mapRWST (liftHandler . runDBJobs) $ do let doJob = mapRWST (liftHandler . runDBJobs) $ do
-- newCrontab <- lift $ hoist lift determineCrontab' -- newCrontab <- lift $ hoist lift determineCrontab'
-- when (newCrontab /= currentCrontab) $ -- when (newCrontab /= currentCrontab) $
-- mapRWST (liftIO . atomically) $ -- mapRWST (liftIO . atomically) $
-- liftBase . flip writeTVar newCrontab =<< asks (jobCrontab . jobContext) -- liftBase . flip writeTVar newCrontab =<< asks (jobCrontab . jobContext)
newCrontab <- liftIO . readTVarIO =<< asks (jobCrontab . jobContext) newCrontab <- liftIO . readTVarIO =<< asks (jobCrontab . jobContext)
@ -407,7 +397,7 @@ execCrontab = do
case jobCtl of case jobCtl of
JobCtlQueue job -> lift $ queueDBJobCron job JobCtlQueue job -> lift $ queueDBJobCron job
other -> runReaderT ?? foundation $ writeJobCtl other other -> runReaderT ?? foundation $ writeJobCtl other
case nextMatch of case nextMatch of
MatchAsap -> doJob MatchAsap -> doJob
MatchNone -> return () MatchNone -> return ()
@ -497,7 +487,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
#endif #endif
, Exc.Handler $ \(e :: SomeException) -> return $ Left e , Exc.Handler $ \(e :: SomeException) -> return $ Left e
] . fmap Right ] . fmap Right
handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
@ -586,7 +576,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
liftHandler . runDB $ pruneLastExecs newCTab liftHandler . runDB $ pruneLastExecs newCTab
$logInfoS logIdent "PruneLastExecs" $logInfoS logIdent "PruneLastExecs"
-- logDebugS logIdent $ tshow newCTab -- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $ mapReaderT (liftIO . atomically) $
lift . flip writeTVar newCTab =<< asks jobCrontab lift . flip writeTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport hrStorage <- getsYesod appHealthReport
@ -596,7 +586,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
$logInfoS logIdent [st|#{tshow kind}: #{toPathPiece newStatus}|] $logInfoS logIdent [st|#{tshow kind}: #{toPathPiece newStatus}|]
unless (newStatus > HealthFailure) $ do unless (newStatus > HealthFailure) $ do
$logErrorS logIdent [st|#{tshow kind}: #{tshow newReport}|] $logErrorS logIdent [st|#{tshow kind}: #{tshow newReport}|]
liftIO $ do liftIO $ do
now <- getCurrentTime now <- getCurrentTime
let updateReports = Set.insert (now, newReport) let updateReports = Set.insert (now, newReport)
@ -606,69 +596,6 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
$logInfoS logIdent [st|Sleeping #{tshow secs}s...|] $logInfoS logIdent [st|Sleeping #{tshow secs}s...|]
threadDelay msecs threadDelay msecs
$logInfoS logIdent [st|Slept #{tshow secs}s.|] $logInfoS logIdent [st|Slept #{tshow secs}s.|]
handleCmd JobCtlPrewarmCache{..} = do
prewarm <- getsYesod appFileSourcePrewarm
for_ prewarm $ \lh -> lift . runDBRead $
runConduit $ sourceFileChunkIds .| C.map E.unValue
.| awaitForever (\cRef -> handleC handleUnavailable $ sourceFileChunks (withLRU lh cRef) cRef .| C.map (cRef, ))
.| C.mapM_ (sinkChunkCache lh)
where
handleUnavailable e
| is _SourceFilesContentUnavailable e = return ()
| otherwise = throwM e
withLRU lh cRef range getChunk = do
touched <- touchLRUHandle lh (cRef, range) jcTargetTime
case touched of
Just (bs, _) -> return $ Just (bs, Nothing)
Nothing -> over (mapped . _2) Just <$> getChunk
(minBoundDgst, maxBoundDgst) = jcChunkInterval
sourceFileChunkIds = E.selectSource . E.from $ \fileContentEntry -> do
let cRef = fileContentEntry E.^. FileContentEntryChunkHash
eRef = fileContentEntry E.^. FileContentEntryHash
E.where_ . E.and $ catMaybes
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
, maxBoundDgst <&> \b -> cRef E.<. E.val b
]
E.where_ $ matchesPrewarmSource eRef jcPrewarmSource
return cRef
sinkChunkCache lh (cRef, (c, range)) = insertLRUHandle lh (cRef, range) jcTargetTime (c, ByteString.length c)
handleCmd JobCtlInhibitInject{..} = maybeT_ $ do
PrewarmCacheConf{..} <- MaybeT . getsYesod $ view _appFileSourcePrewarmConf
let inhibitInterval = IntervalMap.ClosedInterval
(addUTCTime (-precStart) jcTargetTime)
(addUTCTime (precInhibit - precStart) jcTargetTime)
sourceFileReferences = prewarmSourceReferences jcPrewarmSource
refs <- lift . lift . runDBRead . runConduit $ sourceFileReferences .| C.foldl (flip Set.insert) Set.empty
guard . not $ null refs
inhibitTVar <- getsYesod appFileInjectInhibit
atomically . modifyTVar' inhibitTVar $ force . IntervalMap.insertWith Set.union inhibitInterval refs
matchesPrewarmSource :: E.SqlExpr (E.Value FileContentReference) -> JobCtlPrewarmSource -> E.SqlExpr (E.Value Bool)
matchesPrewarmSource eRef = \case
JobCtlPrewarmSheetFile{..} -> E.or
[ E.exists . E.from $ \sheetFile ->
E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet
E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType
E.&&. sheetFile E.^. SheetFileContent E.==. E.just eRef
, E.exists . E.from $ \personalisedSheetFile ->
E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileContent E.==. E.just eRef
]
prewarmSourceReferences :: JobCtlPrewarmSource -> ConduitT () FileContentReference (ReaderT SqlReadBackend (HandlerFor UniWorX)) ()
prewarmSourceReferences = \case
JobCtlPrewarmSheetFile{..} -> (.| C.mapMaybe E.unValue) $ do
E.selectSource . E.from $ \sheetFile -> do
E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet
E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType
E.where_ . E.isJust $ sheetFile E.^. SheetFileContent
return $ sheetFile E.^. SheetFileContent
E.selectSource . E.from $ \personalisedSheetFile -> do
E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType
E.where_ . E.isJust $ personalisedSheetFile E.^. PersonalisedSheetFileContent
return $ personalisedSheetFile E.^. PersonalisedSheetFileContent
jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a
jLocked jId act = flip evalStateT False $ do jLocked jId act = flip evalStateT False $ do
@ -707,7 +634,7 @@ jLocked jId act = flip evalStateT False $ do
update jId' [ QueuedJobLockInstance =. Nothing update jId' [ QueuedJobLockInstance =. Nothing
, QueuedJobLockTime =. Nothing , QueuedJobLockTime =. Nothing
] ]
bracket lock unlock $ lift . act bracket lock unlock $ lift . act
@ -723,7 +650,7 @@ pruneLastExecs crontab = do
ensureCrontab (Entity leId CronLastExec{..}) = maybeT (return mempty) $ do ensureCrontab (Entity leId CronLastExec{..}) = maybeT (return mempty) $ do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval
if if
| abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2 | abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2
-> return mempty -> return mempty

View File

@ -27,17 +27,6 @@ import qualified Data.Conduit.List as C
import qualified Database.Esqueleto.Legacy as E import qualified Database.Esqueleto.Legacy as E
import qualified Database.Esqueleto.Utils as E import qualified Database.Esqueleto.Utils as E
import Jobs.Handler.Intervals.Utils
import System.IO.Unsafe
import Crypto.Hash (hashDigestSize, digestFromByteString)
import Data.List (iterate)
{-# NOINLINE prewarmCacheIntervalsCache #-}
prewarmCacheIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
prewarmCacheIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
determineCrontab :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Crontab JobCtl) determineCrontab :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Crontab JobCtl)
-- ^ Extract all future jobs from the database (sheet deadlines, ...) -- ^ Extract all future jobs from the database (sheet deadlines, ...)
@ -66,51 +55,9 @@ determineCrontab = execWriterT $ do
} }
Nothing -> mempty Nothing -> mempty
let
tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (ReaderT SqlReadBackend (HandlerFor UniWorX)) ()
tellPrewarmJobs jcPrewarmSource jcTargetTime = maybeT_ $ do
PrewarmCacheConf{..} <- hoistMaybe appFileSourcePrewarmConf
let
chunkHashBytes :: forall h.
( Unwrapped FileContentChunkReference ~ Digest h )
=> Integer
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
intervals <- mkIntervalsCached prewarmCacheIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) precSteps
let step = realToFrac $ toRational (precStart - precEnd) / toRational precSteps
step' = realToFrac $ toRational step / precMaxSpeedup
mapM_ tell
[ HashMap.singleton
JobCtlPrewarmCache{..}
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ ts
, cronRepeat = CronRepeatOnChange
, cronRateLimit = step'
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ ts'
}
| jcChunkInterval <- intervals
| ts <- iterate (addUTCTime step) $ addUTCTime (-precStart) jcTargetTime
| ts' <- iterate (addUTCTime step') $ addUTCTime (subtract precStart . realToFrac $ toRational (precStart - precEnd) * (1 - recip precMaxSpeedup)) jcTargetTime
]
lift . maybeT_ $ do
injectInterval <- fmap abs . MaybeT . getsYesod $ view _appInjectFiles
tell $ HashMap.singleton
JobCtlInhibitInject{..}
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (negate $ precStart + injectInterval + 10) jcTargetTime
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = injectInterval / 2
, cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (precInhibit - precStart) jcTargetTime
}
let let
sheetJobs (Entity nSheet Sheet{..}) = do sheetJobs (Entity nSheet Sheet{..}) = do
for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> do for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom ->
tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetExercise) aFrom
when (isn't _JobsOffload appJobMode) $ do when (isn't _JobsOffload appJobMode) $ do
tell $ HashMap.singleton tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..})
@ -120,9 +67,7 @@ determineCrontab = execWriterT $ do
, cronRateLimit = appNotificationRateLimit , cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
} }
for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> do for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom ->
tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetHint) hFrom
when (isn't _JobsOffload appJobMode) . maybeT_ $ do when (isn't _JobsOffload appJobMode) . maybeT_ $ do
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom
guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom)
@ -136,9 +81,7 @@ determineCrontab = execWriterT $ do
, cronRateLimit = appNotificationRateLimit , cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
} }
for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> do for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom ->
tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetSolution) sFrom
when (isn't _JobsOffload appJobMode) . maybeT_ $ do when (isn't _JobsOffload appJobMode) . maybeT_ $ do
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom
guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]

View File

@ -44,13 +44,6 @@ import qualified Data.Sequence as Seq
import Jobs.Handler.Intervals.Utils import Jobs.Handler.Intervals.Utils
import Data.IntervalMap.Strict (IntervalMap)
import qualified Data.IntervalMap.Strict as IntervalMap
import Control.Concurrent.STM.TVar (stateTVar)
import qualified Data.Foldable as F
import qualified Control.Monad.State.Class as State import qualified Control.Monad.State.Class as State
import Jobs.Types import Jobs.Types
@ -96,7 +89,7 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin
missingDb <- runConduit . execStateC Map.empty $ do missingDb <- runConduit . execStateC Map.empty $ do
let insertRef refKind ref = State.modify' $ Map.alter (Just . Set.insert ref . fromMaybe Set.empty) refKind let insertRef refKind ref = State.modify' $ Map.alter (Just . Set.insert ref . fromMaybe Set.empty) refKind
iforM_ trackedReferences $ \refKind refQuery -> do iforM_ trackedReferences $ \refKind refQuery -> do
let fileReferencesQuery = do let fileReferencesQuery = do
ref <- refQuery ref <- refQuery
@ -152,7 +145,7 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin
, (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent ) , (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent )
, (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent ) , (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent )
] ]
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-} {-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
@ -208,12 +201,12 @@ dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtom
let unreferencedChunkHash = E.unKey $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash let unreferencedChunkHash = E.unKey $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. unreferencedChunkHash E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. unreferencedChunkHash
return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs
E.where_ $ chunkIdFilter unreferencedChunkHash E.where_ $ chunkIdFilter unreferencedChunkHash
unmarkRefSource refSource = runConduit $ refSource .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkSourceFiles unmarkRefSource refSource = runConduit $ refSource .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkSourceFiles
chunkSize = 100 chunkSize = 100
unmarkRefSource jobFileReferences unmarkRefSource jobFileReferences
let let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
@ -277,16 +270,7 @@ dispatchJobInjectFiles :: JobHandler UniWorX
dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket uploadBucket <- getsYesod $ view _appUploadCacheBucket
interval <- getsYesod $ view _appInjectFiles interval <- getsYesod $ view _appInjectFiles
-- NOTE: crude surgery happened here to remove ARC caching; useless artifacts may have remained
now <- liftIO getCurrentTime
let
extractInhibited :: IntervalMap UTCTime (Set FileContentReference)
-> (Set FileContentReference, IntervalMap UTCTime (Set FileContentReference))
extractInhibited cState = (F.fold current, IntervalMap.union current upcoming)
where
(_, current, upcoming) = IntervalMap.splitIntersecting cState $ IntervalMap.OpenInterval (addUTCTime (-2) now) (addUTCTime 2 now)
inhibited <- atomically . flip stateTVar extractInhibited =<< getsYesod appFileInjectInhibit
let let
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
extractReference _ = Nothing extractReference _ = Nothing
@ -296,7 +280,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do
injectOrDelete (objInfo, fRef) = do injectOrDelete (objInfo, fRef) = do
let obj = Minio.oiObject objInfo let obj = Minio.oiObject objInfo
sz = fromIntegral $ max 1 $ Minio.oiSize objInfo sz = fromIntegral $ max 1 $ Minio.oiSize objInfo
fRef' <- runDB $ do fRef' <- runDB $ do
logger <- askLoggerIO logger <- askLoggerIO
@ -352,7 +336,6 @@ dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do
(Sum injectedFiles, Sum injectedSize) <- (Sum injectedFiles, Sum injectedSize) <-
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
.| C.mapMaybe extractReference .| C.mapMaybe extractReference
.| C.filter (views _2 (`Set.notMember` inhibited))
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1) .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1)
@ -368,7 +351,7 @@ data RechunkFileException
{ oldHash, newHash :: FileContentReference } { oldHash, newHash :: FileContentReference }
deriving (Eq, Ord, Show, Generic) deriving (Eq, Ord, Show, Generic)
deriving anyclass (Exception) deriving anyclass (Exception)
dispatchJobRechunkFiles :: JobHandler UniWorX dispatchJobRechunkFiles :: JobHandler UniWorX
dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin
where where

View File

@ -9,8 +9,7 @@ module Jobs.Types
( Job(..), Notification(..) ( Job(..), Notification(..)
, JobChildren , JobChildren
, classifyJob , classifyJob
, JobCtlPrewarmSource(..), _jcpsSheet, _jcpsSheetFileType , JobCtl(..)
, JobCtl(..), _jcPrewarmSource, _jcChunkInterval
, classifyJobCtl , classifyJobCtl
, YesodJobDB , YesodJobDB
, JobHandler(..), _JobHandlerAtomic, _JobHandlerException , JobHandler(..), _JobHandlerAtomic, _JobHandlerException
@ -218,34 +217,8 @@ classifyJob job = unpack tag
Aeson.String tag = obj HashMap.! "job" Aeson.String tag = obj HashMap.! "job"
data JobCtlPrewarmSource
= JobCtlPrewarmSheetFile
{ jcpsSheet :: SheetId
, jcpsSheetFileType :: SheetFileType
}
deriving (Eq, Ord, Read, Show, Generic)
deriving anyclass (Hashable, NFData)
makeLenses_ ''JobCtlPrewarmSource
deriveJSON defaultOptions
{ constructorTagModifier = camelToPathPiece' 3
, fieldLabelModifier = camelToPathPiece' 1
, tagSingleConstructors = True
, sumEncoding = TaggedObject "source" "data"
} ''JobCtlPrewarmSource
data JobCtl = JobCtlFlush data JobCtl = JobCtlFlush
| JobCtlPerform QueuedJobId | JobCtlPerform QueuedJobId
| JobCtlPrewarmCache
{ jcPrewarmSource :: JobCtlPrewarmSource
, jcTargetTime :: UTCTime
, jcChunkInterval :: (Maybe FileContentChunkReference, Maybe FileContentChunkReference)
}
| JobCtlInhibitInject
{ jcPrewarmSource :: JobCtlPrewarmSource
, jcTargetTime :: UTCTime
}
| JobCtlDetermineCrontab | JobCtlDetermineCrontab
| JobCtlQueue Job | JobCtlQueue Job
| JobCtlGenerateHealthReport HealthCheck | JobCtlGenerateHealthReport HealthCheck

View File

@ -238,9 +238,6 @@ data AppSettings = AppSettings
, appJobLmsQualificationsEnqueueHour :: Maybe Natural , appJobLmsQualificationsEnqueueHour :: Maybe Natural
, appJobLmsQualificationsDequeueHour :: Maybe Natural , appJobLmsQualificationsDequeueHour :: Maybe Natural
, appFileSourceARCConf :: Maybe (ARCConf Int)
, appFileSourcePrewarmConf :: Maybe PrewarmCacheConf
, appBotMitigations :: Set SettingBotMitigation , appBotMitigations :: Set SettingBotMitigation
, appVolatileClusterSettingsCacheTime :: DiffTime , appVolatileClusterSettingsCacheTime :: DiffTime
@ -420,18 +417,6 @@ data VerpMode = VerpNone
| Verp { verpPrefix :: Text, verpSeparator :: Char } | Verp { verpPrefix :: Text, verpSeparator :: Char }
deriving (Eq, Show, Read, Generic) deriving (Eq, Show, Read, Generic)
data ARCConf w = ARCConf
{ arccMaximumGhost :: Int
, arccMaximumWeight :: w
} deriving (Eq, Ord, Read, Show, Generic)
data PrewarmCacheConf = PrewarmCacheConf
{ precMaximumWeight :: Int
, precStart, precEnd, precInhibit :: NominalDiffTime -- ^ Prewarming cache starts at @t - precStart@ and should be finished by @t - precEnd@; injecting from minio to database is inhibited from @t - precStart@ until @t - precStart + precInhibit@
, precSteps :: Natural
, precMaxSpeedup :: Rational
} deriving (Eq, Ord, Read, Show, Generic)
data SettingBotMitigation data SettingBotMitigation
= SettingBotMitigationOnlyLoggedInTableSorting = SettingBotMitigationOnlyLoggedInTableSorting
| SettingBotMitigationUnauthorizedFormHoneypots | SettingBotMitigationUnauthorizedFormHoneypots
@ -475,16 +460,6 @@ deriveJSON defaultOptions
, constructorTagModifier = camelToPathPiece' 1 , constructorTagModifier = camelToPathPiece' 1
} ''JobMode } ''JobMode
deriveJSON defaultOptions
{ fieldLabelModifier = camelToPathPiece' 1
} ''ARCConf
deriveJSON defaultOptions
{ fieldLabelModifier = camelToPathPiece' 1
} ''PrewarmCacheConf
makeLenses_ ''PrewarmCacheConf
nullaryPathPiece ''SettingBotMitigation $ camelToPathPiece' 3 nullaryPathPiece ''SettingBotMitigation $ camelToPathPiece' 3
pathPieceJSON ''SettingBotMitigation pathPieceJSON ''SettingBotMitigation
pathPieceJSONKey ''SettingBotMitigation pathPieceJSONKey ''SettingBotMitigation
@ -823,17 +798,6 @@ instance FromJSON AppSettings where
appJobLmsQualificationsEnqueueHour <- o .:? "job-lms-qualifications-enqueue-hour" appJobLmsQualificationsEnqueueHour <- o .:? "job-lms-qualifications-enqueue-hour"
appJobLmsQualificationsDequeueHour <- o .:? "job-lms-qualifications-dequeue-hour" appJobLmsQualificationsDequeueHour <- o .:? "job-lms-qualifications-dequeue-hour"
appFileSourceARCConf <- assertM isValidARCConf <$> o .:? "file-source-arc"
let isValidPrewarmConf PrewarmCacheConf{..} = and
[ precMaximumWeight > 0
, precStart >= 0
, precEnd >= 0, precEnd <= precStart
, precSteps > 0
, precMaxSpeedup >= 1
]
appFileSourcePrewarmConf <- over (_Just . _precInhibit) (max 0) . assertM isValidPrewarmConf <$> o .:? "file-source-prewarm"
appBotMitigations <- o .:? "bot-mitigations" .!= Set.empty appBotMitigations <- o .:? "bot-mitigations" .!= Set.empty
appVolatileClusterSettingsCacheTime <- o .: "volatile-cluster-settings-cache-time" appVolatileClusterSettingsCacheTime <- o .: "volatile-cluster-settings-cache-time"
@ -846,7 +810,6 @@ instance FromJSON AppSettings where
appLegalExternal <- o .: "legal-external" appLegalExternal <- o .: "legal-external"
return AppSettings{..} return AppSettings{..}
where isValidARCConf ARCConf{..} = arccMaximumWeight > 0
makeClassy_ ''AppSettings makeClassy_ ''AppSettings

View File

@ -44,7 +44,6 @@ import Utils.I18n as Utils
import Utils.NTop as Utils import Utils.NTop as Utils
import Utils.HttpConditional as Utils import Utils.HttpConditional as Utils
import Utils.Persist as Utils import Utils.Persist as Utils
import Utils.LRU as Utils
import Utils.Set as Utils import Utils.Set as Utils
import Text.Blaze (Markup, ToMarkup(..)) import Text.Blaze (Markup, ToMarkup(..))

View File

@ -1,217 +0,0 @@
-- SPDX-FileCopyrightText: 2022 Gregor Kleen <gregor.kleen@ifi.lmu.de>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
module Utils.LRU
( LRUTick
, LRU, initLRU
, insertLRU, lookupLRU, touchLRU, timeoutLRU
, LRUHandle, initLRUHandle
, insertLRUHandle, lookupLRUHandle, touchLRUHandle, timeoutLRUHandle
, readLRUHandle
, lruStoreSize
, getLRUWeight
, describeLRU
) where
import ClassyPrelude
import Data.OrdPSQ (OrdPSQ)
import qualified Data.OrdPSQ as OrdPSQ
import Control.Lens
-- https://jaspervdj.be/posts/2015-02-24-lru-cache.html
newtype LRUTick = LRUTick { _getLRUTick :: Word64 }
deriving (Eq, Ord, Show)
deriving newtype (NFData)
makeLenses ''LRUTick
data LRU k t w v = LRU
{ lruStore :: !(OrdPSQ k (t, LRUTick) (v, w))
, lruWeight :: !w
, lruMaximumWeight :: !w
}
instance (NFData k, NFData t, NFData w, NFData v) => NFData (LRU k t w v) where
rnf LRU{..} = rnf lruStore
`seq` rnf lruWeight
`seq` rnf lruMaximumWeight
describeLRU :: Show w
=> LRU k t w v
-> String
describeLRU LRU{..} = intercalate ", "
[ "lruStore: " <> show (OrdPSQ.size lruStore)
, "lruWeight: " <> show lruWeight
, "lruMaximumWeight: " <> show lruMaximumWeight
]
lruStoreSize :: LRU k t w v -> Int
lruStoreSize = OrdPSQ.size . lruStore
getLRUWeight :: LRU k t w v -> w
getLRUWeight = lruWeight
initialLRUTick, maximumLRUTick :: LRUTick
initialLRUTick = LRUTick 0
maximumLRUTick = LRUTick maxBound
initLRU :: forall k t w v.
Integral w
=> w -- ^ @lruMaximumWeight@
-> (LRU k t w v, LRUTick)
initLRU lruMaximumWeight
| lruMaximumWeight < 0 = error "initLRU given negative maximum weight"
| otherwise = (lru, initialLRUTick)
where lru = LRU { lruStore = OrdPSQ.empty
, lruWeight = 0
, lruMaximumWeight
}
insertLRU :: forall k t w v.
( Ord k, Ord t
, Integral w
)
=> k
-> t
-> Maybe (v, w)
-> LRU k t w v
-> LRUTick -> (LRU k t w v, LRUTick)
insertLRU k t newVal oldLRU@LRU{..} now
| later <= initialLRUTick = uncurry (insertLRU k t newVal) $ initLRU lruMaximumWeight
| Just (_, w) <- newVal, w > lruMaximumWeight = (oldLRU, now)
| Just (_, w) <- newVal = (, later) $
let (lruStore', lruWeight') = evictToSize (lruMaximumWeight - w) lruStore lruWeight
(fromMaybe 0 . preview (_Just . _2 . _2) -> oldWeight, lruStore'')
= OrdPSQ.alter (, ((t, now), ) <$> newVal) k lruStore'
in oldLRU { lruStore = lruStore''
, lruWeight = lruWeight' - oldWeight + w
}
| Just (_, (_, w), lruStore') <- OrdPSQ.deleteView k lruStore =
let lru = oldLRU { lruStore = lruStore'
, lruWeight = lruWeight - w
}
in (lru, now)
| otherwise = (oldLRU, now)
where
later :: LRUTick
later = over getLRUTick succ now
evictToSize :: w -> OrdPSQ k (t, LRUTick) (v, w) -> w -> (OrdPSQ k (t, LRUTick) (v, w), w)
evictToSize tSize c cSize
| cSize <= tSize = (c, cSize)
| Just (_, _, (_, w'), c') <- OrdPSQ.minView c = evictToSize tSize c' (cSize - w')
| otherwise = error "evictToSize: cannot reach required size through eviction"
lookupLRU :: forall k t w v.
Ord k
=> k
-> LRU k t w v
-> Maybe (v, w)
lookupLRU k LRU{..} = view _2 <$> OrdPSQ.lookup k lruStore
touchLRU :: forall k t w v.
( Ord k, Ord t
, Integral w
)
=> k
-> t
-> LRU k t w v
-> LRUTick -> ((LRU k t w v, LRUTick), Maybe (v, w))
touchLRU k t oldLRU@LRU{..} now
| (Just (_, v), _) <- altered
, later <= initialLRUTick = (, Just v) . uncurry (insertLRU k t $ Just v) $ initLRU lruMaximumWeight
| (Just (_, v), lruStore') <- altered = ((oldLRU{ lruStore = lruStore' }, later), Just v)
| otherwise = ((oldLRU, now), Nothing)
where
altered = OrdPSQ.alter (\oldVal -> (oldVal, over _1 (max (t, later)) <$> oldVal)) k lruStore
later :: LRUTick
later = over getLRUTick succ now
timeoutLRU :: forall k t w v.
( Ord k, Ord t
, Integral w
)
=> t
-> LRU k t w v
-> LRU k t w v
timeoutLRU t oldLRU@LRU{..} = oldLRU
{ lruStore = lruStore'
, lruWeight = lruWeight - evictedWeight
}
where
(evicted, lruStore') = OrdPSQ.atMostView (t, maximumLRUTick) lruStore
evictedWeight = sumOf (folded . _3 . _2) evicted
newtype LRUHandle k t w v = LRUHandle { _getLRUHandle :: IORef (LRU k t w v, LRUTick) }
deriving (Eq)
initLRUHandle :: forall k t w v m.
( MonadIO m
, Integral w
)
=> w -- ^ @lruMaximumWeight@
-> m (LRUHandle k t w v)
initLRUHandle maxWeight = fmap LRUHandle . newIORef $ initLRU maxWeight
insertLRUHandle :: forall k t w v m.
( MonadIO m
, Ord k, Ord t
, Integral w
, NFData k, NFData t, NFData w, NFData v
)
=> LRUHandle k t w v
-> k
-> t
-> (v, w)
-> m ()
insertLRUHandle (LRUHandle lruVar) k t newVal
= modifyIORef' lruVar $ force . uncurry (insertLRU k t $ Just newVal)
lookupLRUHandle :: forall k t w v m.
( MonadIO m
, Ord k
)
=> LRUHandle k t w v
-> k
-> m (Maybe (v, w))
lookupLRUHandle (LRUHandle lruVar) k
= views _1 (lookupLRU k) <$> readIORef lruVar
touchLRUHandle :: forall k t w v m.
( MonadIO m
, Ord k, Ord t
, Integral w
, NFData k, NFData t, NFData w, NFData v
)
=> LRUHandle k t w v
-> k
-> t
-> m (Maybe (v, w))
touchLRUHandle (LRUHandle lruVar) k t = do
oldLRU <- readIORef lruVar
let (newLRU, touched) = uncurry (touchLRU k t) oldLRU
force newLRU `seq` writeIORef lruVar newLRU
return touched
timeoutLRUHandle :: forall k t w v m.
( MonadIO m
, Ord k, Ord t
, Integral w
, NFData k, NFData t, NFData w, NFData v
)
=> LRUHandle k t w v
-> t
-> m ()
timeoutLRUHandle (LRUHandle lruVar) t
= modifyIORef' lruVar $ force . over _1 (timeoutLRU t)
readLRUHandle :: MonadIO m
=> LRUHandle k t w v
-> m (LRU k t w v, LRUTick)
readLRUHandle (LRUHandle lruVar) = readIORef lruVar

View File

@ -19,9 +19,6 @@ module Utils.Metrics
, observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles , observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles
, registerJobWorkerQueueDepth , registerJobWorkerQueueDepth
, observeMissingFiles , observeMissingFiles
, LRUMetrics, LRULabel(..)
, lruMetrics
, InjectInhibitMetrics, injectInhibitMetrics
, PoolMetrics, PoolLabel(..) , PoolMetrics, PoolLabel(..)
, poolMetrics , poolMetrics
, observeDatabaseConnectionOpened, observeDatabaseConnectionClosed , observeDatabaseConnectionOpened, observeDatabaseConnectionClosed
@ -53,11 +50,6 @@ import Jobs.Types
import qualified Data.Aeson as Aeson import qualified Data.Aeson as Aeson
import qualified Data.HashMap.Strict as HashMap import qualified Data.HashMap.Strict as HashMap
import Data.IntervalMap.Strict (IntervalMap)
import qualified Data.IntervalMap.Strict as IntervalMap
import qualified Data.Foldable as F
import qualified Utils.Pool as Custom import qualified Utils.Pool as Custom
import GHC.Stack import GHC.Stack
@ -272,57 +264,6 @@ relabel :: Text -> Text
-> SampleGroup -> SampleGroup -> SampleGroup -> SampleGroup
relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v
data LRUMetrics = LRUMetrics
data LRULabel = LRUFileSourcePrewarm
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic)
deriving anyclass (Universe, Finite)
nullaryPathPiece ''LRULabel $ camelToPathPiece' 1
lruMetrics :: Integral w
=> LRULabel
-> LRUHandle k t w v
-> Metric LRUMetrics
lruMetrics lbl lh = Metric $ return (LRUMetrics, collectLRUMetrics)
where
labelLru = relabel "lru"
collectLRUMetrics = map (labelLru $ toPathPiece lbl) <$> do
(lru, _) <- readLRUHandle lh
return
[ SampleGroup sizeInfo GaugeType
[ Sample "lru_size" [] . encodeUtf8 . tshow $ lruStoreSize lru
]
, SampleGroup weightInfo GaugeType
[ Sample "lru_weight" [] . encodeUtf8 . tshow . toInteger $ getLRUWeight lru
]
]
sizeInfo = Info "lru_size"
"Number of entries in the LRU"
weightInfo = Info "lru_weight"
"Sum of weights of entries in the LRU"
data InjectInhibitMetrics = InjectInhibitMetrics
injectInhibitMetrics :: TVar (IntervalMap UTCTime (Set FileContentReference))
-> Metric InjectInhibitMetrics
injectInhibitMetrics tvar = Metric $ return (InjectInhibitMetrics, collectInjectInhibitMetrics)
where
collectInjectInhibitMetrics = do
inhibits <- readTVarIO tvar
return
[ SampleGroup intervalsInfo GaugeType
[ Sample "uni2work_inject_inhibited_intervals_count" [] . encodeUtf8 . tshow $ IntervalMap.size inhibits
]
, SampleGroup hashesInfo GaugeType
[ Sample "uni2work_inject_inhibited_hashes_count" [] . encodeUtf8 . tshow . Set.size $ F.fold inhibits
]
]
intervalsInfo = Info "uni2work_inject_inhibited_intervals_count"
"Number of distinct time intervals in which we don't transfer some files from upload cache to db"
hashesInfo = Info "uni2work_inject_inhibited_hashes_count"
"Number of files which we don't transfer from upload cache to db during some interval"
data PoolMetrics = PoolMetrics data PoolMetrics = PoolMetrics