feat(caching): introduce cache prewarming
This commit is contained in:
parent
abf59017a0
commit
8d1f216b5b
@ -274,3 +274,10 @@ memcache-auth: true
|
||||
file-source-arc:
|
||||
maximum-ghost: 512
|
||||
maximum-weight: 1073741824 # 1GiB
|
||||
file-source-prewarm:
|
||||
maximum-weight: 1073741824 # 1GiB
|
||||
start: 1800 # 30m
|
||||
end: 600 # 10m
|
||||
inhibit: 3600 # 60m
|
||||
steps: 20
|
||||
max-speedup: 3
|
||||
|
||||
@ -160,6 +160,7 @@ dependencies:
|
||||
- network-uri
|
||||
- psqueues
|
||||
- nonce
|
||||
- IntervalMap
|
||||
other-extensions:
|
||||
- GeneralizedNewtypeDeriving
|
||||
- IncoherentInstances
|
||||
|
||||
@ -106,6 +106,8 @@ import GHC.RTS.Flags (getRTSFlags)
|
||||
|
||||
import qualified Prometheus
|
||||
|
||||
import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
-- Import all relevant handler modules here.
|
||||
-- (HPack takes care to add new modules to our cabal file nowadays.)
|
||||
import Handler.News
|
||||
@ -187,6 +189,13 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
ah <- initARCHandle arccMaximumGhost arccMaximumWeight
|
||||
void . Prometheus.register $ arcMetrics ARCFileSource ah
|
||||
return ah
|
||||
appFileSourcePrewarm <- for appFileSourcePrewarmConf $ \PrewarmCacheConf{..} -> do
|
||||
lh <- initLRUHandle precMaximumWeight
|
||||
void . Prometheus.register $ lruMetrics LRUFileSourcePrewarm lh
|
||||
return lh
|
||||
appFileInjectInhibit <- liftIO $ newTVarIO IntervalMap.empty
|
||||
for_ (guardOnM (isn't _JobsOffload appJobMode) appInjectFiles) $ \_ ->
|
||||
void . Prometheus.register $ injectInhibitMetrics appFileInjectInhibit
|
||||
|
||||
-- We need a log function to create a connection pool. We need a connection
|
||||
-- pool to create our foundation. And we need our foundation to get a
|
||||
|
||||
@ -27,6 +27,7 @@ module Database.Esqueleto.Utils
|
||||
, SqlProject(..)
|
||||
, (->.), (#>>.)
|
||||
, fromSqlKey
|
||||
, unKey
|
||||
, selectCountRows
|
||||
, selectMaybe
|
||||
, day, diffDays
|
||||
@ -50,6 +51,8 @@ import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
|
||||
import Crypto.Hash (Digest, SHA256)
|
||||
|
||||
import Data.Coerce (Coercible)
|
||||
|
||||
{-# ANN any ("HLint: ignore Use any" :: String) #-}
|
||||
{-# ANN all ("HLint: ignore Use all" :: String) #-}
|
||||
|
||||
@ -392,6 +395,12 @@ infixl 8 #>>.
|
||||
|
||||
fromSqlKey :: (ToBackendKey SqlBackend entity, PersistField (Key entity)) => E.SqlExpr (E.Value (Key entity)) -> E.SqlExpr (E.Value Int64)
|
||||
fromSqlKey = E.veryUnsafeCoerceSqlExprValue
|
||||
|
||||
unKey :: ( Coercible (Key entity) a
|
||||
, PersistField (Key entity), PersistField a
|
||||
)
|
||||
=> E.SqlExpr (E.Value (Key entity)) -> E.SqlExpr (E.Value a)
|
||||
unKey = E.veryUnsafeCoerceSqlExprValue
|
||||
|
||||
|
||||
selectCountRows :: (Num a, PersistField a, MonadIO m) => E.SqlQuery ignored -> E.SqlReadT m a
|
||||
|
||||
@ -25,6 +25,8 @@ import qualified Jose.Jwk as Jose
|
||||
import qualified Database.Memcached.Binary.IO as Memcached
|
||||
import Network.Minio (MinioConn)
|
||||
|
||||
import Data.IntervalMap.Strict (IntervalMap)
|
||||
|
||||
|
||||
type SMTPPool = Pool SMTPConnection
|
||||
|
||||
@ -39,28 +41,30 @@ makePrisms ''SomeSessionStorage
|
||||
-- starts running, such as database connections. Every handler will have
|
||||
-- access to the data present here.
|
||||
data UniWorX = UniWorX
|
||||
{ appSettings' :: AppSettings
|
||||
, appStatic :: EmbeddedStatic -- ^ Settings for static file serving.
|
||||
, appConnPool :: ConnectionPool -- ^ Database connection pool.
|
||||
, appSmtpPool :: Maybe SMTPPool
|
||||
, appLdapPool :: Maybe (Failover (LdapConf, LdapPool))
|
||||
, appWidgetMemcached :: Maybe Memcached.Connection -- ^ Actually a proper pool
|
||||
, appHttpManager :: Manager
|
||||
, appLogger :: (ReleaseKey, TVar Logger)
|
||||
, appLogSettings :: TVar LogSettings
|
||||
, appCryptoIDKey :: CryptoIDKey
|
||||
, appClusterID :: ClusterId
|
||||
, appInstanceID :: InstanceId
|
||||
, appJobState :: TMVar JobState
|
||||
, appSessionStore :: SomeSessionStorage
|
||||
, appSecretBoxKey :: SecretBox.Key
|
||||
, appJSONWebKeySet :: Jose.JwkSet
|
||||
, appHealthReport :: TVar (Set (UTCTime, HealthReport))
|
||||
, appMemcached :: Maybe (AEAD.Key, Memcached.Connection)
|
||||
, appUploadCache :: Maybe MinioConn
|
||||
, appVerpSecret :: VerpSecret
|
||||
, appAuthKey :: Auth.Key
|
||||
, appFileSourceARC :: Maybe (ARCHandle (FileContentChunkReference, (Int, Int)) Int ByteString)
|
||||
{ appSettings' :: AppSettings
|
||||
, appStatic :: EmbeddedStatic -- ^ Settings for static file serving.
|
||||
, appConnPool :: ConnectionPool -- ^ Database connection pool.
|
||||
, appSmtpPool :: Maybe SMTPPool
|
||||
, appLdapPool :: Maybe (Failover (LdapConf, LdapPool))
|
||||
, appWidgetMemcached :: Maybe Memcached.Connection -- ^ Actually a proper pool
|
||||
, appHttpManager :: Manager
|
||||
, appLogger :: (ReleaseKey, TVar Logger)
|
||||
, appLogSettings :: TVar LogSettings
|
||||
, appCryptoIDKey :: CryptoIDKey
|
||||
, appClusterID :: ClusterId
|
||||
, appInstanceID :: InstanceId
|
||||
, appJobState :: TMVar JobState
|
||||
, appSessionStore :: SomeSessionStorage
|
||||
, appSecretBoxKey :: SecretBox.Key
|
||||
, appJSONWebKeySet :: Jose.JwkSet
|
||||
, appHealthReport :: TVar (Set (UTCTime, HealthReport))
|
||||
, appMemcached :: Maybe (AEAD.Key, Memcached.Connection)
|
||||
, appUploadCache :: Maybe MinioConn
|
||||
, appVerpSecret :: VerpSecret
|
||||
, appAuthKey :: Auth.Key
|
||||
, appFileSourceARC :: Maybe (ARCHandle (FileContentChunkReference, (Int, Int)) Int ByteString)
|
||||
, appFileSourcePrewarm :: Maybe (LRUHandle (FileContentChunkReference, (Int, Int)) UTCTime Int ByteString)
|
||||
, appFileInjectInhibit :: TVar (IntervalMap UTCTime (Set FileContentReference))
|
||||
}
|
||||
|
||||
makeLenses_ ''UniWorX
|
||||
|
||||
@ -1,16 +1,18 @@
|
||||
module Handler.Utils.Files
|
||||
( sourceFile, sourceFile'
|
||||
, sourceFiles, sourceFiles'
|
||||
, SourceFilesException(..)
|
||||
, sourceFileDB, sourceFileMinio
|
||||
, SourceFilesException(..), _SourceFilesMismatchedHashes, _SourceFilesContentUnavailable
|
||||
, sourceFileDB, sourceFileDBChunks, sourceFileMinio
|
||||
, acceptFile
|
||||
, respondFileConditional
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
import Import.NoFoundation hiding (First(..))
|
||||
import Foundation.Type
|
||||
import Utils.Metrics
|
||||
|
||||
import Data.Monoid (First(..))
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (unfoldM)
|
||||
|
||||
@ -23,6 +25,8 @@ import qualified Database.Esqueleto.Utils as E
|
||||
import System.FilePath (normalise, makeValid)
|
||||
import Data.List (dropWhileEnd)
|
||||
|
||||
import qualified Data.ByteString as ByteString
|
||||
|
||||
|
||||
data SourceFilesException
|
||||
= SourceFilesMismatchedHashes
|
||||
@ -30,53 +34,87 @@ data SourceFilesException
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
deriving anyclass (Exception)
|
||||
|
||||
makePrisms ''SourceFilesException
|
||||
|
||||
|
||||
fileChunkARC :: ( MonadHandler m
|
||||
, HandlerSite m ~ UniWorX
|
||||
)
|
||||
=> (FileContentChunkReference, (Int, Int))
|
||||
=> Maybe Int
|
||||
-> (FileContentChunkReference, (Int, Int))
|
||||
-> m (Maybe ByteString)
|
||||
-> m (Maybe ByteString)
|
||||
fileChunkARC k getChunkDB = do
|
||||
fileChunkARC altSize k@(ref, (s, l)) getChunkDB' = do
|
||||
prewarm <- getsYesod appFileSourcePrewarm
|
||||
let getChunkDB = case prewarm of
|
||||
Nothing -> getChunkDB'
|
||||
Just lh -> do
|
||||
chunkRes <- lookupLRUHandle lh k
|
||||
case chunkRes of
|
||||
Just (chunk, w) -> Just chunk <$ do
|
||||
$logDebugS "fileChunkARC" "Prewarm hit"
|
||||
liftIO $ observeSourcedChunk StoragePrewarm w
|
||||
Nothing -> do
|
||||
chunk' <- getChunkDB'
|
||||
for chunk' $ \chunk -> chunk <$ do
|
||||
let w = length chunk
|
||||
$logDebugS "fileChunkARC" "Prewarm miss"
|
||||
liftIO $ observeSourcedChunk StorageDB w
|
||||
|
||||
arc <- getsYesod appFileSourceARC
|
||||
case arc of
|
||||
Nothing -> getChunkDB
|
||||
Just ah -> do
|
||||
cachedARC' ah k $ \case
|
||||
Nothing -> do
|
||||
chunk' <- getChunkDB
|
||||
chunk' <- case assertM (> l) altSize of
|
||||
-- This optimization works for the somewhat common case that cdc chunks are smaller than db chunks and start of the requested range is aligned with a db chunk boundary
|
||||
Just altSize'
|
||||
-> fmap getFirst . execWriterT . cachedARC' ah (ref, (s, altSize')) $ \x -> x <$ case x of
|
||||
Nothing -> tellM $ First <$> getChunkDB
|
||||
Just (v, _) -> tell . First . Just $ ByteString.take l v
|
||||
Nothing -> getChunkDB
|
||||
for chunk' $ \chunk -> do
|
||||
let w = length chunk
|
||||
$logDebugS "fileChunkARC" "ARC miss"
|
||||
liftIO $ observeSourcedChunk StorageDB w
|
||||
return (chunk, w)
|
||||
Just x@(_, w) -> do
|
||||
$logDebugS "fileChunkARC" "ARC hit"
|
||||
liftIO $ Just x <$ observeSourcedChunk StorageARC w
|
||||
|
||||
|
||||
sourceFileDB :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX)
|
||||
|
||||
sourceFileDB :: forall m.
|
||||
(MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX)
|
||||
=> FileContentReference -> ConduitT () ByteString (SqlPersistT m) ()
|
||||
sourceFileDB fileReference = do
|
||||
sourceFileDB fileReference = chunkHashes
|
||||
.| awaitForever (sourceFileDBChunks (const id) . unFileContentChunkKey . E.unValue)
|
||||
.| C.map (view _1)
|
||||
where
|
||||
chunkHashes :: ConduitT () (E.Value FileContentChunkId) (SqlPersistT m) ()
|
||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||
return $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
|
||||
sourceFileDBChunks :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, BackendCompatible SqlReadBackend backend)
|
||||
=> ((Int, Int) -> ReaderT SqlReadBackend m (Maybe ByteString) -> ReaderT SqlReadBackend m (Maybe ByteString)) -> FileContentChunkReference -> ConduitT i (ByteString, (Int, Int)) (ReaderT backend m) ()
|
||||
sourceFileDBChunks cont chunkHash = transPipe (withReaderT $ projectBackend @SqlReadBackend) $ do
|
||||
dbChunksize <- getsYesod $ view _appFileUploadDBChunksize
|
||||
let retrieveChunk chunkHash = \case
|
||||
let retrieveChunk = \case
|
||||
Nothing -> return Nothing
|
||||
Just start -> do
|
||||
let getChunkDB = fmap (fmap E.unValue) . E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkId E.==. E.val chunkHash
|
||||
let getChunkDB = cont (start, dbChunksize) . fmap (fmap E.unValue) . E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash
|
||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
||||
chunk <- fileChunkARC (unFileContentChunkKey chunkHash, (start, dbChunksize)) getChunkDB
|
||||
chunk <- fileChunkARC Nothing (chunkHash, (start, dbChunksize)) getChunkDB
|
||||
case chunk of
|
||||
Nothing -> throwM SourceFilesContentUnavailable
|
||||
Just c -> do
|
||||
return . Just . (c, ) $ if
|
||||
return . Just . ((c, (start, dbChunksize)), ) $ if
|
||||
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
||||
| otherwise -> Nothing
|
||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||
return $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int))
|
||||
C.unfoldM retrieveChunk $ Just (1 :: Int)
|
||||
|
||||
|
||||
sourceFileMinio :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m)
|
||||
@ -212,7 +250,7 @@ respondFileConditional representationLastModified cType FileReference{..} = do
|
||||
let getChunkDB = fmap (fmap E.unValue) . E.selectMaybe . E.from $ \fileContentChunk -> do
|
||||
E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash
|
||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val $ min cLength' dbChunksize)
|
||||
chunk <- fileChunkARC (chunkHash, (fromIntegral start, fromIntegral $ min cLength' dbChunksize)) getChunkDB
|
||||
chunk <- fileChunkARC (Just $ fromIntegral dbChunksize) (chunkHash, (fromIntegral start, fromIntegral $ min cLength' dbChunksize)) getChunkDB
|
||||
case chunk of
|
||||
Nothing -> throwM SourceFilesContentUnavailable
|
||||
Just c -> do
|
||||
|
||||
74
src/Jobs.hs
74
src/Jobs.hs
@ -13,7 +13,8 @@ import Jobs.Queue
|
||||
import Jobs.Offload
|
||||
import Jobs.Crontab
|
||||
|
||||
import qualified Data.Conduit.List as C
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as C (mapMaybe)
|
||||
|
||||
import qualified Data.Text.Lazy as LT
|
||||
|
||||
@ -47,6 +48,14 @@ import Control.Concurrent.STM.Delay
|
||||
|
||||
import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay)
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
import qualified Data.ByteString as ByteString
|
||||
|
||||
import Handler.Utils.Files (sourceFileDBChunks, _SourceFilesContentUnavailable)
|
||||
|
||||
import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
import Jobs.Handler.SendNotification
|
||||
import Jobs.Handler.SendTestEmail
|
||||
@ -577,6 +586,69 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
|
||||
$logInfoS logIdent [st|Sleeping #{tshow secs}s...|]
|
||||
threadDelay msecs
|
||||
$logInfoS logIdent [st|Slept #{tshow secs}s.|]
|
||||
handleCmd JobCtlPrewarmCache{..} = do
|
||||
prewarm <- getsYesod appFileSourcePrewarm
|
||||
for_ prewarm $ \lh -> lift . runDBRead $
|
||||
runConduit $ sourceFileChunkIds .| C.map E.unValue
|
||||
.| awaitForever (\cRef -> handleC handleUnavailable $ sourceFileDBChunks (withLRU lh cRef) cRef .| C.map (cRef, ))
|
||||
.| C.mapM_ (sinkChunkCache lh)
|
||||
where
|
||||
handleUnavailable e
|
||||
| is _SourceFilesContentUnavailable e = return ()
|
||||
| otherwise = throwM e
|
||||
withLRU lh cRef range getChunk = do
|
||||
touched <- touchLRUHandle lh (cRef, range) jcTargetTime
|
||||
case touched of
|
||||
Just (bs, _) -> return $ Just bs
|
||||
Nothing -> getChunk
|
||||
(minBoundDgst, maxBoundDgst) = jcChunkInterval
|
||||
sourceFileChunkIds = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
let cRef = E.unKey $ fileContentEntry E.^. FileContentEntryChunkHash
|
||||
eRef = fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . E.and $ catMaybes
|
||||
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
|
||||
, maxBoundDgst <&> \b -> cRef E.<. E.val b
|
||||
]
|
||||
E.where_ $ matchesPrewarmSource eRef jcPrewarmSource
|
||||
return cRef
|
||||
sinkChunkCache lh (cRef, (c, range)) = insertLRUHandle lh (cRef, range) jcTargetTime (c, ByteString.length c)
|
||||
handleCmd JobCtlInhibitInject{..} = maybeT (return ()) $ do
|
||||
PrewarmCacheConf{..} <- MaybeT . getsYesod $ view _appFileSourcePrewarmConf
|
||||
let inhibitInterval = IntervalMap.ClosedInterval
|
||||
(addUTCTime (-precStart) jcTargetTime)
|
||||
(addUTCTime (precInhibit - precStart) jcTargetTime)
|
||||
sourceFileReferences = prewarmSourceReferences jcPrewarmSource
|
||||
refs <- lift . lift . runDBRead . runConduit $ sourceFileReferences .| C.foldl (flip Set.insert) Set.empty
|
||||
guard . not $ null refs
|
||||
inhibitTVar <- getsYesod appFileInjectInhibit
|
||||
atomically . modifyTVar' inhibitTVar $ force . IntervalMap.insertWith Set.union inhibitInterval refs
|
||||
|
||||
matchesPrewarmSource :: E.SqlExpr (E.Value FileContentReference) -> JobCtlPrewarmSource -> E.SqlExpr (E.Value Bool)
|
||||
matchesPrewarmSource eRef = \case
|
||||
JobCtlPrewarmSheetFile{..} -> E.or
|
||||
[ E.exists . E.from $ \sheetFile ->
|
||||
E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet
|
||||
E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType
|
||||
E.&&. sheetFile E.^. SheetFileContent E.==. E.just eRef
|
||||
, E.exists . E.from $ \personalisedSheetFile ->
|
||||
E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet
|
||||
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType
|
||||
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileContent E.==. E.just eRef
|
||||
]
|
||||
|
||||
prewarmSourceReferences :: JobCtlPrewarmSource -> ConduitT () FileContentReference (ReaderT SqlReadBackend (HandlerFor UniWorX)) ()
|
||||
prewarmSourceReferences = \case
|
||||
JobCtlPrewarmSheetFile{..} -> (.| C.mapMaybe E.unValue) $ do
|
||||
E.selectSource . E.from $ \sheetFile -> do
|
||||
E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet
|
||||
E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType
|
||||
E.where_ . E.isJust $ sheetFile E.^. SheetFileContent
|
||||
return $ sheetFile E.^. SheetFileContent
|
||||
E.selectSource . E.from $ \personalisedSheetFile -> do
|
||||
E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet
|
||||
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType
|
||||
E.where_ . E.isJust $ personalisedSheetFile E.^. PersonalisedSheetFileContent
|
||||
return $ personalisedSheetFile E.^. PersonalisedSheetFileContent
|
||||
|
||||
jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a
|
||||
jLocked jId act = flip evalStateT False $ do
|
||||
|
||||
@ -21,6 +21,17 @@ import qualified Data.Conduit.List as C
|
||||
|
||||
import qualified Database.Esqueleto as E
|
||||
|
||||
import Jobs.Handler.Intervals.Utils
|
||||
|
||||
import System.IO.Unsafe
|
||||
|
||||
import Crypto.Hash (hashDigestSize, digestFromByteString)
|
||||
|
||||
import Data.List (iterate)
|
||||
|
||||
{-# NOINLINE prewarmCacheIntervalsCache #-}
|
||||
prewarmCacheIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
|
||||
prewarmCacheIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
|
||||
|
||||
determineCrontab :: DB (Crontab JobCtl)
|
||||
-- ^ Extract all future jobs from the database (sheet deadlines, ...)
|
||||
@ -49,6 +60,122 @@ determineCrontab = execWriterT $ do
|
||||
}
|
||||
Nothing -> mempty
|
||||
|
||||
let
|
||||
tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (YesodDB UniWorX) ()
|
||||
tellPrewarmJobs jcPrewarmSource jcTargetTime = maybeT (return ()) $ do
|
||||
PrewarmCacheConf{..} <- hoistMaybe appFileSourcePrewarmConf
|
||||
|
||||
let
|
||||
chunkHashBytes :: forall h.
|
||||
( Unwrapped FileContentChunkReference ~ Digest h )
|
||||
=> Integer
|
||||
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
|
||||
intervals <- mkIntervalsCached prewarmCacheIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) precSteps
|
||||
|
||||
let step = realToFrac $ toRational (precStart - precEnd) / toRational precSteps
|
||||
step' = realToFrac $ toRational step / precMaxSpeedup
|
||||
|
||||
mapM_ tell
|
||||
[ HashMap.singleton
|
||||
JobCtlPrewarmCache{..}
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ ts
|
||||
, cronRepeat = CronRepeatOnChange
|
||||
, cronRateLimit = step'
|
||||
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ ts'
|
||||
}
|
||||
| jcChunkInterval <- intervals
|
||||
| ts <- iterate (addUTCTime step) $ addUTCTime (-precStart) jcTargetTime
|
||||
| ts' <- iterate (addUTCTime step') $ addUTCTime (subtract precStart . realToFrac $ toRational (precStart - precEnd) * (1 - recip precMaxSpeedup)) jcTargetTime
|
||||
]
|
||||
|
||||
lift . maybeT (return ()) $ do
|
||||
injectInterval <- fmap abs . MaybeT . getsYesod $ view _appInjectFiles
|
||||
tell $ HashMap.singleton
|
||||
JobCtlInhibitInject{..}
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (negate $ precStart + injectInterval + 10) jcTargetTime
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = injectInterval / 2
|
||||
, cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (precInhibit - precStart) jcTargetTime
|
||||
}
|
||||
|
||||
let
|
||||
sheetJobs (Entity nSheet Sheet{..}) = do
|
||||
for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> do
|
||||
tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetExercise) aFrom
|
||||
|
||||
when (isn't _JobsOffload appJobMode) $ do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetActive{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
|
||||
}
|
||||
for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> do
|
||||
tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetHint) hFrom
|
||||
|
||||
when (isn't _JobsOffload appJobMode) . maybeT (return ()) $ do
|
||||
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom
|
||||
guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom)
|
||||
(fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet])
|
||||
guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet]
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetHint{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
|
||||
}
|
||||
for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> do
|
||||
tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetSolution) sFrom
|
||||
|
||||
when (isn't _JobsOffload appJobMode) . maybeT (return ()) $ do
|
||||
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom
|
||||
guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = Left nominalDay
|
||||
}
|
||||
when (isn't _JobsOffload appJobMode) $ do
|
||||
for_ sheetActiveTo $ \aTo -> do
|
||||
whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo'
|
||||
, cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo
|
||||
}
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
|
||||
, cronRepeat = CronRepeatOnChange
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = Left appNotificationExpiration
|
||||
}
|
||||
when sheetAutoDistribute $
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobDistributeCorrections nSheet)
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = 3600 -- Irrelevant due to `cronRepeat`
|
||||
, cronNotAfter = Left nominalDay
|
||||
}
|
||||
|
||||
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs
|
||||
|
||||
|
||||
when (isn't _JobsOffload appJobMode) $ do
|
||||
case appJobFlushInterval of
|
||||
Just interval -> tell $ HashMap.singleton
|
||||
@ -233,71 +360,6 @@ determineCrontab = execWriterT $ do
|
||||
, cronNotAfter = Left within
|
||||
}
|
||||
|
||||
let
|
||||
sheetJobs (Entity nSheet Sheet{..}) = do
|
||||
for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom ->
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetActive{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
|
||||
}
|
||||
for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do
|
||||
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom
|
||||
guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom)
|
||||
(fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet])
|
||||
guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet]
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetHint{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
|
||||
}
|
||||
for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do
|
||||
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom
|
||||
guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = Left nominalDay
|
||||
}
|
||||
for_ sheetActiveTo $ \aTo -> do
|
||||
whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo'
|
||||
, cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo
|
||||
}
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
|
||||
, cronRepeat = CronRepeatOnChange
|
||||
, cronRateLimit = appNotificationRateLimit
|
||||
, cronNotAfter = Left appNotificationExpiration
|
||||
}
|
||||
when sheetAutoDistribute $
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue $ JobDistributeCorrections nSheet)
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = 3600 -- Irrelevant due to `cronRepeat`
|
||||
, cronNotAfter = Left nominalDay
|
||||
}
|
||||
|
||||
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs
|
||||
|
||||
let
|
||||
correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB ()
|
||||
correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton
|
||||
|
||||
@ -40,6 +40,13 @@ import Jobs.Queue (YesodJobDB)
|
||||
|
||||
import Jobs.Handler.Intervals.Utils
|
||||
|
||||
import Data.IntervalMap.Strict (IntervalMap)
|
||||
import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
import Control.Concurrent.STM.TVar (stateTVar)
|
||||
|
||||
import qualified Data.Foldable as F
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
|
||||
@ -256,6 +263,15 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
uploadBucket <- getsYesod $ view _appUploadCacheBucket
|
||||
interval <- getsYesod $ view _appInjectFiles
|
||||
|
||||
now <- liftIO getCurrentTime
|
||||
let
|
||||
extractInhibited :: IntervalMap UTCTime (Set FileContentReference)
|
||||
-> (Set FileContentReference, IntervalMap UTCTime (Set FileContentReference))
|
||||
extractInhibited cState = (F.fold current, IntervalMap.union current upcoming)
|
||||
where
|
||||
(_, current, upcoming) = IntervalMap.splitIntersecting cState $ IntervalMap.OpenInterval (addUTCTime (-2) now) (addUTCTime 2 now)
|
||||
inhibited <- atomically . flip stateTVar extractInhibited =<< getsYesod appFileInjectInhibit
|
||||
|
||||
let
|
||||
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
|
||||
extractReference _ = Nothing
|
||||
@ -321,6 +337,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
(Sum injectedFiles, Sum injectedSize) <-
|
||||
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
|
||||
.| C.mapMaybe extractReference
|
||||
.| C.filter (views _2 (`Set.notMember` inhibited))
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
|
||||
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1)
|
||||
|
||||
@ -5,7 +5,8 @@ module Jobs.Types
|
||||
( Job(..), Notification(..)
|
||||
, JobChildren
|
||||
, classifyJob
|
||||
, JobCtl(..)
|
||||
, JobCtlPrewarmSource(..), _jcpsSheet, _jcpsSheetFileType
|
||||
, JobCtl(..), _jcPrewarmSource, _jcChunkInterval
|
||||
, classifyJobCtl
|
||||
, YesodJobDB
|
||||
, JobHandler(..), _JobHandlerAtomic, _JobHandlerException
|
||||
@ -45,6 +46,8 @@ import GHC.Conc (unsafeIOToSTM)
|
||||
|
||||
import Data.Generics.Product.Types (Children, ChGeneric)
|
||||
|
||||
{-# ANN module ("HLint: ignore Use newtype instead of data" :: String) #-}
|
||||
|
||||
|
||||
data Job
|
||||
= JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
|
||||
@ -178,22 +181,48 @@ classifyJob job = unpack tag
|
||||
Aeson.String tag = obj HashMap.! "job"
|
||||
|
||||
|
||||
data JobCtlPrewarmSource
|
||||
= JobCtlPrewarmSheetFile
|
||||
{ jcpsSheet :: SheetId
|
||||
, jcpsSheetFileType :: SheetFileType
|
||||
}
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
deriving anyclass (Hashable, NFData)
|
||||
|
||||
makeLenses_ ''JobCtlPrewarmSource
|
||||
|
||||
deriveJSON defaultOptions
|
||||
{ constructorTagModifier = camelToPathPiece' 3
|
||||
, fieldLabelModifier = camelToPathPiece' 1
|
||||
, tagSingleConstructors = True
|
||||
, sumEncoding = TaggedObject "source" "data"
|
||||
} ''JobCtlPrewarmSource
|
||||
|
||||
data JobCtl = JobCtlFlush
|
||||
| JobCtlPerform QueuedJobId
|
||||
| JobCtlPrewarmCache
|
||||
{ jcPrewarmSource :: JobCtlPrewarmSource
|
||||
, jcTargetTime :: UTCTime
|
||||
, jcChunkInterval :: (Maybe FileContentChunkReference, Maybe FileContentChunkReference)
|
||||
}
|
||||
| JobCtlInhibitInject
|
||||
{ jcPrewarmSource :: JobCtlPrewarmSource
|
||||
, jcTargetTime :: UTCTime
|
||||
}
|
||||
| JobCtlDetermineCrontab
|
||||
| JobCtlQueue Job
|
||||
| JobCtlGenerateHealthReport HealthCheck
|
||||
| JobCtlTest
|
||||
| JobCtlSleep Micro -- | For debugging
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
deriving anyclass (Hashable, NFData)
|
||||
|
||||
makePrisms ''JobCtl
|
||||
|
||||
instance Hashable JobCtl
|
||||
instance NFData JobCtl
|
||||
makeLenses_ ''JobCtl
|
||||
|
||||
deriveJSON defaultOptions
|
||||
{ constructorTagModifier = camelToPathPiece' 2
|
||||
, fieldLabelModifier = camelToPathPiece' 1
|
||||
, tagSingleConstructors = True
|
||||
, sumEncoding = TaggedObject "instruction" "data"
|
||||
} ''JobCtl
|
||||
|
||||
@ -176,12 +176,10 @@ makeLenses_ ''SheetGroup
|
||||
|
||||
data SheetFileType = SheetExercise | SheetHint | SheetSolution | SheetMarking
|
||||
deriving (Show, Read, Eq, Ord, Enum, Bounded, Generic)
|
||||
deriving anyclass (Hashable, NFData, Universe, Finite)
|
||||
derivePersistField "SheetFileType"
|
||||
|
||||
instance Universe SheetFileType
|
||||
instance Finite SheetFileType
|
||||
|
||||
finitePathPiece ''SheetFileType ["file", "hint", "solution", "marking"]
|
||||
pathPieceJSON ''SheetFileType
|
||||
|
||||
sheetFile2markup :: SheetFileType -> Markup
|
||||
sheetFile2markup SheetExercise = iconSFTQuestion
|
||||
|
||||
@ -75,6 +75,8 @@ import qualified Network.Minio as Minio
|
||||
|
||||
import Data.Conduit.Algorithms.FastCDC
|
||||
|
||||
import Utils.Lens.TH
|
||||
|
||||
|
||||
-- | Runtime settings to configure this application. These settings can be
|
||||
-- loaded from various sources: defaults, environment variables, config files,
|
||||
@ -214,6 +216,7 @@ data AppSettings = AppSettings
|
||||
, appMemcacheAuth :: Bool
|
||||
|
||||
, appFileSourceARCConf :: Maybe (ARCConf Int)
|
||||
, appFileSourcePrewarmConf :: Maybe PrewarmCacheConf
|
||||
} deriving Show
|
||||
|
||||
data JobMode = JobsLocal { jobsAcceptOffload :: Bool }
|
||||
@ -342,6 +345,13 @@ data ARCConf w = ARCConf
|
||||
, arccMaximumWeight :: w
|
||||
} deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
|
||||
data PrewarmCacheConf = PrewarmCacheConf
|
||||
{ precMaximumWeight :: Int
|
||||
, precStart, precEnd, precInhibit :: NominalDiffTime -- ^ Prewarming cache starts at @t - precStart@ and should be finished by @t - precEnd@; injecting from minio to database is inhibited from @t - precStart@ until @t - precStart + precInhibit@
|
||||
, precSteps :: Natural
|
||||
, precMaxSpeedup :: Rational
|
||||
} deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
|
||||
nullaryPathPiece ''ApprootScope $ camelToPathPiece' 1
|
||||
pathPieceJSON ''ApprootScope
|
||||
pathPieceJSONKey ''ApprootScope
|
||||
@ -371,6 +381,12 @@ deriveJSON defaultOptions
|
||||
deriveJSON defaultOptions
|
||||
{ fieldLabelModifier = camelToPathPiece' 1
|
||||
} ''ARCConf
|
||||
|
||||
deriveJSON defaultOptions
|
||||
{ fieldLabelModifier = camelToPathPiece' 1
|
||||
} ''PrewarmCacheConf
|
||||
|
||||
makeLenses_ ''PrewarmCacheConf
|
||||
|
||||
instance FromJSON LdapConf where
|
||||
parseJSON = withObject "LdapConf" $ \o -> do
|
||||
@ -632,7 +648,17 @@ instance FromJSON AppSettings where
|
||||
appStudyFeaturesRecacheRelevanceWithin <- o .:? "study-features-recache-relevance-within"
|
||||
appStudyFeaturesRecacheRelevanceInterval <- o .: "study-features-recache-relevance-interval"
|
||||
|
||||
appFileSourceARCConf <- assertM ((||) <$> ((> 0) . arccMaximumGhost) <*> ((> 0) . arccMaximumWeight)) <$> o .:? "file-source-arc"
|
||||
let isValidARCConf ARCConf{..} = arccMaximumWeight > 0
|
||||
appFileSourceARCConf <- assertM isValidARCConf <$> o .:? "file-source-arc"
|
||||
|
||||
let isValidPrewarmConf PrewarmCacheConf{..} = and
|
||||
[ precMaximumWeight > 0
|
||||
, precStart >= 0
|
||||
, precEnd >= 0, precEnd <= precStart
|
||||
, precSteps > 0
|
||||
, precMaxSpeedup >= 1
|
||||
]
|
||||
appFileSourcePrewarmConf <- over (_Just . _precInhibit) (max 0) . assertM isValidPrewarmConf <$> o .:? "file-source-prewarm"
|
||||
|
||||
return AppSettings{..}
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ import Utils.NTop as Utils
|
||||
import Utils.HttpConditional as Utils
|
||||
import Utils.Persist as Utils
|
||||
import Utils.ARC as Utils
|
||||
import Utils.LRU as Utils
|
||||
|
||||
import Text.Blaze (Markup, ToMarkup(..))
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ module Utils.ARC
|
||||
, ARC, initARC
|
||||
, arcAlterF, lookupARC, insertARC
|
||||
, ARCHandle, initARCHandle, cachedARC, cachedARC'
|
||||
, lookupARCHandle
|
||||
, readARCHandle
|
||||
, arcRecentSize, arcFrequentSize, arcGhostRecentSize, arcGhostFrequentSize
|
||||
, getARCRecentWeight, getARCFrequentWeight
|
||||
@ -17,6 +18,7 @@ import qualified Data.OrdPSQ as OrdPSQ
|
||||
import Control.Lens
|
||||
|
||||
-- https://web.archive.org/web/20210115184012/https://dbs.uni-leipzig.de/file/ARC.pdf
|
||||
-- https://jaspervdj.be/posts/2015-02-24-lru-cache.html
|
||||
|
||||
|
||||
newtype ARCTick = ARCTick { _getARCTick :: Word64 }
|
||||
@ -268,6 +270,17 @@ cachedARC :: forall k w v m.
|
||||
-> (Maybe (v, w) -> m (v, w))
|
||||
-> m v
|
||||
cachedARC h k f = fromMaybe (error "cachedARC: cachedARC' returned Nothing") <$> cachedARC' h k (fmap Just . f)
|
||||
|
||||
lookupARCHandle :: forall k w v m.
|
||||
( MonadIO m
|
||||
, Ord k
|
||||
, Integral w
|
||||
)
|
||||
=> ARCHandle k w v
|
||||
-> k
|
||||
-> m (Maybe (v, w))
|
||||
lookupARCHandle (ARCHandle arcVar) k = lookupARC k <$> readIORef arcVar
|
||||
|
||||
|
||||
readARCHandle :: MonadIO m
|
||||
=> ARCHandle k w v
|
||||
|
||||
213
src/Utils/LRU.hs
Normal file
213
src/Utils/LRU.hs
Normal file
@ -0,0 +1,213 @@
|
||||
module Utils.LRU
|
||||
( LRUTick
|
||||
, LRU, initLRU
|
||||
, insertLRU, lookupLRU, touchLRU, timeoutLRU
|
||||
, LRUHandle, initLRUHandle
|
||||
, insertLRUHandle, lookupLRUHandle, touchLRUHandle, timeoutLRUHandle
|
||||
, readLRUHandle
|
||||
, lruStoreSize
|
||||
, getLRUWeight
|
||||
, describeLRU
|
||||
) where
|
||||
|
||||
import ClassyPrelude
|
||||
|
||||
import Data.OrdPSQ (OrdPSQ)
|
||||
import qualified Data.OrdPSQ as OrdPSQ
|
||||
|
||||
import Control.Lens
|
||||
|
||||
-- https://jaspervdj.be/posts/2015-02-24-lru-cache.html
|
||||
|
||||
|
||||
newtype LRUTick = LRUTick { _getLRUTick :: Word64 }
|
||||
deriving (Eq, Ord, Show, Typeable)
|
||||
deriving newtype (NFData)
|
||||
|
||||
makeLenses ''LRUTick
|
||||
|
||||
data LRU k t w v = LRU
|
||||
{ lruStore :: !(OrdPSQ k (t, LRUTick) (v, w))
|
||||
, lruWeight :: !w
|
||||
, lruMaximumWeight :: !w
|
||||
}
|
||||
|
||||
instance (NFData k, NFData t, NFData w, NFData v) => NFData (LRU k t w v) where
|
||||
rnf LRU{..} = rnf lruStore
|
||||
`seq` rnf lruWeight
|
||||
`seq` rnf lruMaximumWeight
|
||||
|
||||
describeLRU :: Show w
|
||||
=> LRU k t w v
|
||||
-> String
|
||||
describeLRU LRU{..} = intercalate ", "
|
||||
[ "lruStore: " <> show (OrdPSQ.size lruStore)
|
||||
, "lruWeight: " <> show lruWeight
|
||||
, "lruMaximumWeight: " <> show lruMaximumWeight
|
||||
]
|
||||
|
||||
lruStoreSize :: LRU k t w v -> Int
|
||||
lruStoreSize = OrdPSQ.size . lruStore
|
||||
|
||||
getLRUWeight :: LRU k t w v -> w
|
||||
getLRUWeight = lruWeight
|
||||
|
||||
initialLRUTick, maximumLRUTick :: LRUTick
|
||||
initialLRUTick = LRUTick 0
|
||||
maximumLRUTick = LRUTick maxBound
|
||||
|
||||
initLRU :: forall k t w v.
|
||||
Integral w
|
||||
=> w -- ^ @lruMaximumWeight@
|
||||
-> (LRU k t w v, LRUTick)
|
||||
initLRU lruMaximumWeight
|
||||
| lruMaximumWeight < 0 = error "initLRU given negative maximum weight"
|
||||
| otherwise = (, initialLRUTick) LRU
|
||||
{ lruStore = OrdPSQ.empty
|
||||
, lruWeight = 0
|
||||
, lruMaximumWeight
|
||||
}
|
||||
|
||||
insertLRU :: forall k t w v.
|
||||
( Ord k, Ord t
|
||||
, Integral w
|
||||
)
|
||||
=> k
|
||||
-> t
|
||||
-> Maybe (v, w)
|
||||
-> LRU k t w v
|
||||
-> LRUTick -> (LRU k t w v, LRUTick)
|
||||
insertLRU k t newVal oldLRU@LRU{..} now
|
||||
| later <= initialLRUTick = uncurry (insertLRU k t newVal) $ initLRU lruMaximumWeight
|
||||
| Just (_, w) <- newVal, w > lruMaximumWeight = (oldLRU, now)
|
||||
| Just (_, w) <- newVal = (, later) $
|
||||
let (lruStore', lruWeight') = evictToSize (lruMaximumWeight - w) lruStore lruWeight
|
||||
((fromMaybe 0 . preview (_Just . _2 . _2) -> oldWeight), lruStore'')
|
||||
= OrdPSQ.alter (, ((t, now), ) <$> newVal) k lruStore'
|
||||
in oldLRU
|
||||
{ lruStore = lruStore''
|
||||
, lruWeight = lruWeight' - oldWeight + w
|
||||
}
|
||||
| Just (_, (_, w), lruStore') <- OrdPSQ.deleteView k lruStore = (, now) oldLRU
|
||||
{ lruStore = lruStore'
|
||||
, lruWeight = lruWeight - w
|
||||
}
|
||||
| otherwise = (oldLRU, now)
|
||||
where
|
||||
later :: LRUTick
|
||||
later = over getLRUTick succ now
|
||||
|
||||
evictToSize :: w -> OrdPSQ k (t, LRUTick) (v, w) -> w -> (OrdPSQ k (t, LRUTick) (v, w), w)
|
||||
evictToSize tSize c cSize
|
||||
| cSize <= tSize = (c, cSize)
|
||||
| Just (_, _, (_, w'), c') <- OrdPSQ.minView c = evictToSize tSize c' (cSize - w')
|
||||
| otherwise = error "evictToSize: cannot reach required size through eviction"
|
||||
|
||||
lookupLRU :: forall k t w v.
|
||||
Ord k
|
||||
=> k
|
||||
-> LRU k t w v
|
||||
-> Maybe (v, w)
|
||||
lookupLRU k LRU{..} = view _2 <$> OrdPSQ.lookup k lruStore
|
||||
|
||||
touchLRU :: forall k t w v.
|
||||
( Ord k, Ord t
|
||||
, Integral w
|
||||
)
|
||||
=> k
|
||||
-> t
|
||||
-> LRU k t w v
|
||||
-> LRUTick -> ((LRU k t w v, LRUTick), Maybe (v, w))
|
||||
touchLRU k t oldLRU@LRU{..} now
|
||||
| (Just (_, v), _) <- altered
|
||||
, later <= initialLRUTick = (, Just v) . uncurry (insertLRU k t $ Just v) $ initLRU lruMaximumWeight
|
||||
| (Just (_, v), lruStore') <- altered = ((oldLRU{ lruStore = lruStore' }, later), Just v)
|
||||
| otherwise = ((oldLRU, now), Nothing)
|
||||
where
|
||||
altered = OrdPSQ.alter (\oldVal -> (oldVal, over _1 (max (t, later)) <$> oldVal)) k lruStore
|
||||
|
||||
later :: LRUTick
|
||||
later = over getLRUTick succ now
|
||||
|
||||
timeoutLRU :: forall k t w v.
|
||||
( Ord k, Ord t
|
||||
, Integral w
|
||||
)
|
||||
=> t
|
||||
-> LRU k t w v
|
||||
-> LRU k t w v
|
||||
timeoutLRU t oldLRU@LRU{..} = oldLRU
|
||||
{ lruStore = lruStore'
|
||||
, lruWeight = lruWeight - evictedWeight
|
||||
}
|
||||
where
|
||||
(evicted, lruStore') = OrdPSQ.atMostView (t, maximumLRUTick) lruStore
|
||||
evictedWeight = sumOf (folded . _3 . _2) evicted
|
||||
|
||||
newtype LRUHandle k t w v = LRUHandle { _getLRUHandle :: IORef (LRU k t w v, LRUTick) }
|
||||
deriving (Eq, Typeable)
|
||||
|
||||
initLRUHandle :: forall k t w v m.
|
||||
( MonadIO m
|
||||
, Integral w
|
||||
)
|
||||
=> w -- ^ @lruMaximumWeight@
|
||||
-> m (LRUHandle k t w v)
|
||||
initLRUHandle maxWeight = fmap LRUHandle . newIORef $ initLRU maxWeight
|
||||
|
||||
insertLRUHandle :: forall k t w v m.
|
||||
( MonadIO m
|
||||
, Ord k, Ord t
|
||||
, Integral w
|
||||
, NFData k, NFData t, NFData w, NFData v
|
||||
)
|
||||
=> LRUHandle k t w v
|
||||
-> k
|
||||
-> t
|
||||
-> (v, w)
|
||||
-> m ()
|
||||
insertLRUHandle (LRUHandle lruVar) k t newVal
|
||||
= modifyIORef' lruVar $ force . uncurry (insertLRU k t $ Just newVal)
|
||||
|
||||
lookupLRUHandle :: forall k t w v m.
|
||||
( MonadIO m
|
||||
, Ord k
|
||||
)
|
||||
=> LRUHandle k t w v
|
||||
-> k
|
||||
-> m (Maybe (v, w))
|
||||
lookupLRUHandle (LRUHandle lruVar) k
|
||||
= views _1 (lookupLRU k) <$> readIORef lruVar
|
||||
|
||||
touchLRUHandle :: forall k t w v m.
|
||||
( MonadIO m
|
||||
, Ord k, Ord t
|
||||
, Integral w
|
||||
, NFData k, NFData t, NFData w, NFData v
|
||||
)
|
||||
=> LRUHandle k t w v
|
||||
-> k
|
||||
-> t
|
||||
-> m (Maybe (v, w))
|
||||
touchLRUHandle (LRUHandle lruVar) k t = do
|
||||
oldLRU <- readIORef lruVar
|
||||
let (newLRU, touched) = uncurry (touchLRU k t) oldLRU
|
||||
force newLRU `seq` writeIORef lruVar newLRU
|
||||
return touched
|
||||
|
||||
timeoutLRUHandle :: forall k t w v m.
|
||||
( MonadIO m
|
||||
, Ord k, Ord t
|
||||
, Integral w
|
||||
, NFData k, NFData t, NFData w, NFData v
|
||||
)
|
||||
=> LRUHandle k t w v
|
||||
-> t
|
||||
-> m ()
|
||||
timeoutLRUHandle (LRUHandle lruVar) t
|
||||
= modifyIORef' lruVar $ force . over _1 (timeoutLRU t)
|
||||
|
||||
readLRUHandle :: MonadIO m
|
||||
=> LRUHandle k t w v
|
||||
-> m (LRU k t w v, LRUTick)
|
||||
readLRUHandle (LRUHandle lruVar) = readIORef lruVar
|
||||
@ -17,6 +17,9 @@ module Utils.Metrics
|
||||
, observeMissingFiles
|
||||
, ARCMetrics, ARCLabel(..)
|
||||
, arcMetrics
|
||||
, LRUMetrics, LRULabel(..)
|
||||
, lruMetrics
|
||||
, InjectInhibitMetrics, injectInhibitMetrics
|
||||
) where
|
||||
|
||||
import Import.NoModel hiding (Vector, Info)
|
||||
@ -42,6 +45,11 @@ import Jobs.Types
|
||||
import qualified Data.Aeson as Aeson
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
|
||||
import Data.IntervalMap.Strict (IntervalMap)
|
||||
import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
import qualified Data.Foldable as F
|
||||
|
||||
{-# ANN module ("HLint: ignore Use even" :: String) #-}
|
||||
|
||||
|
||||
@ -230,6 +238,10 @@ missingFiles = unsafeRegister . vector "ref" $ gauge info
|
||||
where info = Info "uni2work_missing_files_count"
|
||||
"Number of files referenced from within database that are missing"
|
||||
|
||||
relabel :: Text -> Text
|
||||
-> SampleGroup -> SampleGroup
|
||||
relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v
|
||||
|
||||
data ARCMetrics = ARCMetrics
|
||||
|
||||
data ARCLabel = ARCFileSource
|
||||
@ -244,7 +256,6 @@ arcMetrics :: Integral w
|
||||
-> Metric ARCMetrics
|
||||
arcMetrics lbl ah = Metric $ return (ARCMetrics, collectARCMetrics)
|
||||
where
|
||||
relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v
|
||||
labelArc = relabel "arc"
|
||||
|
||||
collectARCMetrics = map (labelArc $ toPathPiece lbl) <$> do
|
||||
@ -266,6 +277,59 @@ arcMetrics lbl ah = Metric $ return (ARCMetrics, collectARCMetrics)
|
||||
weightInfo = Info "arc_weight"
|
||||
"Sum of weights of entries in the ARC LRUs"
|
||||
|
||||
data LRUMetrics = LRUMetrics
|
||||
|
||||
data LRULabel = LRUFileSourcePrewarm
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
|
||||
nullaryPathPiece ''LRULabel $ camelToPathPiece' 1
|
||||
|
||||
lruMetrics :: Integral w
|
||||
=> LRULabel
|
||||
-> LRUHandle k t w v
|
||||
-> Metric LRUMetrics
|
||||
lruMetrics lbl lh = Metric $ return (LRUMetrics, collectLRUMetrics)
|
||||
where
|
||||
labelLru = relabel "lru"
|
||||
|
||||
collectLRUMetrics = map (labelLru $ toPathPiece lbl) <$> do
|
||||
(lru, _) <- readLRUHandle lh
|
||||
return
|
||||
[ SampleGroup sizeInfo GaugeType
|
||||
[ Sample "lru_size" [] . encodeUtf8 . tshow $ lruStoreSize lru
|
||||
]
|
||||
, SampleGroup weightInfo GaugeType
|
||||
[ Sample "lru_weight" [] . encodeUtf8 . tshow . toInteger $ getLRUWeight lru
|
||||
]
|
||||
]
|
||||
sizeInfo = Info "lru_size"
|
||||
"Number of entries in the LRU"
|
||||
weightInfo = Info "lru_weight"
|
||||
"Sum of weights of entries in the LRU"
|
||||
|
||||
data InjectInhibitMetrics = InjectInhibitMetrics
|
||||
|
||||
injectInhibitMetrics :: TVar (IntervalMap UTCTime (Set FileContentReference))
|
||||
-> Metric InjectInhibitMetrics
|
||||
injectInhibitMetrics tvar = Metric $ return (InjectInhibitMetrics, collectInjectInhibitMetrics)
|
||||
where
|
||||
collectInjectInhibitMetrics = do
|
||||
inhibits <- readTVarIO tvar
|
||||
return
|
||||
[ SampleGroup intervalsInfo GaugeType
|
||||
[ Sample "uni2work_inject_inhibited_intervals_count" [] . encodeUtf8 . tshow $ IntervalMap.size inhibits
|
||||
]
|
||||
, SampleGroup hashesInfo GaugeType
|
||||
[ Sample "uni2work_inject_inhibited_hashes_count" [] . encodeUtf8 . tshow . Set.size $ F.fold inhibits
|
||||
]
|
||||
]
|
||||
intervalsInfo = Info "uni2work_inject_inhibited_intervals_count"
|
||||
"Number of distinct time intervals in which we don't transfer some files from upload cache to db"
|
||||
hashesInfo = Info "uni2work_inject_inhibited_hashes_count"
|
||||
"Number of files which we don't transfer from upload cache to db during some interval"
|
||||
|
||||
|
||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||
withHealthReportMetrics act = do
|
||||
before <- liftIO getPOSIXTime
|
||||
@ -373,7 +437,7 @@ observeLoginOutcome plugin outcome
|
||||
registerJobHeldLocksCount :: MonadIO m => TVar (Set QueuedJobId) -> m ()
|
||||
registerJobHeldLocksCount = liftIO . void . register . jobHeldLocksCount
|
||||
|
||||
data FileChunkStorage = StorageMinio | StorageDB | StorageARC
|
||||
data FileChunkStorage = StorageMinio | StorageDB | StorageARC | StoragePrewarm
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
nullaryPathPiece ''FileChunkStorage $ camelToPathPiece' 1
|
||||
|
||||
Loading…
Reference in New Issue
Block a user