diff --git a/config/settings.yml b/config/settings.yml index fa0edc607..de5b02cc1 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -315,17 +315,6 @@ fallback-personalised-sheet-files-keys-expire: 2419200 download-token-expire: 604801 -file-source-arc: - maximum-ghost: 512 - maximum-weight: 1073741824 # 1GiB -file-source-prewarm: - maximum-weight: 1073741824 # 1GiB - start: 1800 # 30m - end: 600 # 10m - inhibit: 3600 # 60m - steps: 20 - max-speedup: 3 - bot-mitigations: - only-logged-in-table-sorting - unauthorized-form-honeypots diff --git a/src/Application.hs b/src/Application.hs index 58156b47a..80662f3ed 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -115,8 +115,6 @@ import GHC.RTS.Flags (getRTSFlags) import qualified Prometheus -import qualified Data.IntervalMap.Strict as IntervalMap - import qualified Utils.Pool as Custom import qualified System.Clock as Clock @@ -216,14 +214,6 @@ makeFoundation appSettings''@AppSettings{..} = do appJobState <- liftIO newEmptyTMVarIO appHealthReport <- liftIO $ newTVarIO Set.empty - appFileSourcePrewarm <- for appFileSourcePrewarmConf $ \PrewarmCacheConf{..} -> do - lh <- initLRUHandle precMaximumWeight - void . Prometheus.register $ lruMetrics LRUFileSourcePrewarm lh - return lh - appFileInjectInhibit <- liftIO $ newTVarIO IntervalMap.empty - for_ (guardOnM (isn't _JobsOffload appJobMode) appInjectFiles) $ \_ -> - void . Prometheus.register $ injectInhibitMetrics appFileInjectInhibit - appStartTime <- liftIO getCurrentTime -- We need a log function to create a connection pool. We need a connection -- pool to create our foundation. And we need our foundation to get a diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index 9dbc9de50..7f6814ea7 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -30,8 +30,6 @@ import qualified Jose.Jwk as Jose import qualified Database.Memcached.Binary.IO as Memcached import Network.Minio (MinioConn) -import Data.IntervalMap.Strict (IntervalMap) - import qualified Utils.Pool as Custom import Utils.Metrics (DBConnUseState) @@ -84,8 +82,6 @@ data UniWorX = UniWorX , appUploadCache :: Maybe MinioConn , appVerpSecret :: VerpSecret , appAuthKey :: Auth.Key - , appFileSourcePrewarm :: Maybe (LRUHandle (FileContentChunkReference, (Int, Int)) UTCTime Int ByteString) - , appFileInjectInhibit :: TVar (IntervalMap UTCTime (Set FileContentReference)) , appPersonalisedSheetFilesSeedKey :: PersonalisedSheetFilesSeedKey , appVolatileClusterSettingsCache :: TVar VolatileClusterSettingsCache , appStartTime :: UTCTime -- for Status Page diff --git a/src/Handler/Utils/Files.hs b/src/Handler/Utils/Files.hs index a9de3f095..29cf40204 100644 --- a/src/Handler/Utils/Files.hs +++ b/src/Handler/Utils/Files.hs @@ -42,36 +42,17 @@ data SourceFilesException makePrisms ''SourceFilesException -fileChunk :: ( MonadHandler m - , HandlerSite m ~ UniWorX - ) - => (FileContentChunkReference, (Int, Int)) - -> m (Maybe (ByteString, Maybe FileChunkStorage)) +fileChunk :: ( MonadHandler m ) + => m (Maybe (ByteString, Maybe FileChunkStorage)) -> m (Maybe ByteString) -fileChunk k getChunkDB' = do - prewarm <- getsYesod appFileSourcePrewarm +fileChunk getChunkDB' = do -- NOTE: crude surgery happened here to remove ARC caching; useless artifacts may have remained - case prewarm of - Nothing -> do - chunk' <- getChunkDB' - for chunk' $ \(chunk, mStorage) -> chunk <$ do - $logDebugS "fileChunkARC" "No prewarm" - for_ mStorage $ \storage -> - let w = length chunk - in liftIO $ observeSourcedChunk storage w - Just lh -> do - chunkRes <- lookupLRUHandle lh k - case chunkRes of - Just (chunk, w) -> Just chunk <$ do - $logDebugS "fileChunkARC" "Prewarm hit" - liftIO $ observeSourcedChunk StoragePrewarm w - Nothing -> do - chunk' <- getChunkDB' - for chunk' $ \(chunk, mStorage) -> chunk <$ do - $logDebugS "fileChunkARC" "Prewarm miss" - for_ mStorage $ \storage -> - let w = length chunk - in liftIO $ observeSourcedChunk storage w + chunk' <- getChunkDB' + for chunk' $ \(chunk, mStorage) -> chunk <$ do + $logDebugS "fileChunkARC" "No prewarm" + for_ mStorage $ \storage -> + let w = length chunk + in liftIO $ observeSourcedChunk storage w sourceFileDB :: forall m. @@ -100,7 +81,7 @@ sourceFileChunks cont chunkHash = transPipe (withReaderT $ projectBackend @SqlRe return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize) getChunkMinio = fmap (, StorageMinio) . catchIfMaybeT (is _SourceFilesContentUnavailable) . runConduit $ sourceMinio (Left chunkHash) (Just $ ByteRangeFromTo (fromIntegral $ pred start) (fromIntegral . pred $ pred start + dbChunksize)) .| C.fold in getChunkDB' <|> getChunkMinio - chunk <- fileChunk (chunkHash, (start, dbChunksize)) getChunkDB + chunk <- fileChunk getChunkDB case chunk of Just c | olength c <= 0 -> return Nothing Just c -> do @@ -232,7 +213,7 @@ respondFileConditional representationLastModified cType FileReference{..} = do let getChunkDB = fmap (fmap $ (, Just StorageDB) . E.unValue) . E.selectOne . E.from $ \fileContentChunk -> do E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. E.val chunkHash return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val $ min cLength' dbChunksize) - chunk <- fileChunk (chunkHash, (fromIntegral start, fromIntegral $ min cLength' dbChunksize)) getChunkDB + chunk <- fileChunk getChunkDB case chunk of Nothing -> throwM SourceFilesContentUnavailable Just c -> do diff --git a/src/Jobs.hs b/src/Jobs.hs index b45b24b82..2593260d4 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -18,7 +18,6 @@ import Jobs.Offload import Jobs.Crontab import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.List as C (mapMaybe) import qualified Data.Text.Lazy as LT @@ -52,15 +51,6 @@ import Control.Concurrent.STM.Delay import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay) -import qualified Database.Esqueleto.Legacy as E -import qualified Database.Esqueleto.Utils as E - -import qualified Data.ByteString as ByteString - -import Handler.Utils.Files (sourceFileChunks, _SourceFilesContentUnavailable) - -import qualified Data.IntervalMap.Strict as IntervalMap - import Jobs.Handler.SendNotification import Jobs.Handler.SendTestEmail import Jobs.Handler.QueueNotification @@ -91,7 +81,7 @@ import Type.Reflection (typeOf) import System.Clock - + data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId @@ -188,7 +178,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> let routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ()) routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask' - + actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask') let handleExc e = do @@ -196,12 +186,12 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> atomically $ do jState <- tryReadTMVar appJobState for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown () - + void $ wait actAsync throwM e - + unmask (wait actAsync) `catchAll` handleExc - + num :: Int num = fromIntegral $ foundation ^. _appJobWorkers @@ -209,7 +199,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> spawnMissingWorkers = do shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard $ not shouldTerminate' - + oldState <- takeTMVar appJobState let missing = num - Map.size (jobWorkers oldState) guard $ missing > 0 @@ -266,7 +256,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!) go in go - + terminateGracefully :: (() -> ContT () m ()) -> STM (ContT () m ()) terminateGracefully terminate = do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown @@ -329,7 +319,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> respawn <$ case cOffload of Nothing -> return () Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler - + stopJobCtl :: MonadUnliftIO m => UniWorX -> m () -- ^ Stop all worker threads currently running @@ -388,7 +378,7 @@ execCrontab = do let doJob = mapRWST (liftHandler . runDBJobs) $ do -- newCrontab <- lift $ hoist lift determineCrontab' - -- when (newCrontab /= currentCrontab) $ + -- when (newCrontab /= currentCrontab) $ -- mapRWST (liftIO . atomically) $ -- liftBase . flip writeTVar newCrontab =<< asks (jobCrontab . jobContext) newCrontab <- liftIO . readTVarIO =<< asks (jobCrontab . jobContext) @@ -407,7 +397,7 @@ execCrontab = do case jobCtl of JobCtlQueue job -> lift $ queueDBJobCron job other -> runReaderT ?? foundation $ writeJobCtl other - + case nextMatch of MatchAsap -> doJob MatchNone -> return () @@ -497,7 +487,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker #endif , Exc.Handler $ \(e :: SomeException) -> return $ Left e ] . fmap Right - + handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) @@ -586,7 +576,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker liftHandler . runDB $ pruneLastExecs newCTab $logInfoS logIdent "PruneLastExecs" -- logDebugS logIdent $ tshow newCTab - mapReaderT (liftIO . atomically) $ + mapReaderT (liftIO . atomically) $ lift . flip writeTVar newCTab =<< asks jobCrontab handleCmd (JobCtlGenerateHealthReport kind) = do hrStorage <- getsYesod appHealthReport @@ -596,7 +586,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker $logInfoS logIdent [st|#{tshow kind}: #{toPathPiece newStatus}|] unless (newStatus > HealthFailure) $ do $logErrorS logIdent [st|#{tshow kind}: #{tshow newReport}|] - + liftIO $ do now <- getCurrentTime let updateReports = Set.insert (now, newReport) @@ -606,69 +596,6 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker $logInfoS logIdent [st|Sleeping #{tshow secs}s...|] threadDelay msecs $logInfoS logIdent [st|Slept #{tshow secs}s.|] - handleCmd JobCtlPrewarmCache{..} = do - prewarm <- getsYesod appFileSourcePrewarm - for_ prewarm $ \lh -> lift . runDBRead $ - runConduit $ sourceFileChunkIds .| C.map E.unValue - .| awaitForever (\cRef -> handleC handleUnavailable $ sourceFileChunks (withLRU lh cRef) cRef .| C.map (cRef, )) - .| C.mapM_ (sinkChunkCache lh) - where - handleUnavailable e - | is _SourceFilesContentUnavailable e = return () - | otherwise = throwM e - withLRU lh cRef range getChunk = do - touched <- touchLRUHandle lh (cRef, range) jcTargetTime - case touched of - Just (bs, _) -> return $ Just (bs, Nothing) - Nothing -> over (mapped . _2) Just <$> getChunk - (minBoundDgst, maxBoundDgst) = jcChunkInterval - sourceFileChunkIds = E.selectSource . E.from $ \fileContentEntry -> do - let cRef = fileContentEntry E.^. FileContentEntryChunkHash - eRef = fileContentEntry E.^. FileContentEntryHash - E.where_ . E.and $ catMaybes - [ minBoundDgst <&> \b -> cRef E.>=. E.val b - , maxBoundDgst <&> \b -> cRef E.<. E.val b - ] - E.where_ $ matchesPrewarmSource eRef jcPrewarmSource - return cRef - sinkChunkCache lh (cRef, (c, range)) = insertLRUHandle lh (cRef, range) jcTargetTime (c, ByteString.length c) - handleCmd JobCtlInhibitInject{..} = maybeT_ $ do - PrewarmCacheConf{..} <- MaybeT . getsYesod $ view _appFileSourcePrewarmConf - let inhibitInterval = IntervalMap.ClosedInterval - (addUTCTime (-precStart) jcTargetTime) - (addUTCTime (precInhibit - precStart) jcTargetTime) - sourceFileReferences = prewarmSourceReferences jcPrewarmSource - refs <- lift . lift . runDBRead . runConduit $ sourceFileReferences .| C.foldl (flip Set.insert) Set.empty - guard . not $ null refs - inhibitTVar <- getsYesod appFileInjectInhibit - atomically . modifyTVar' inhibitTVar $ force . IntervalMap.insertWith Set.union inhibitInterval refs - -matchesPrewarmSource :: E.SqlExpr (E.Value FileContentReference) -> JobCtlPrewarmSource -> E.SqlExpr (E.Value Bool) -matchesPrewarmSource eRef = \case - JobCtlPrewarmSheetFile{..} -> E.or - [ E.exists . E.from $ \sheetFile -> - E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet - E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType - E.&&. sheetFile E.^. SheetFileContent E.==. E.just eRef - , E.exists . E.from $ \personalisedSheetFile -> - E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet - E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType - E.&&. personalisedSheetFile E.^. PersonalisedSheetFileContent E.==. E.just eRef - ] - -prewarmSourceReferences :: JobCtlPrewarmSource -> ConduitT () FileContentReference (ReaderT SqlReadBackend (HandlerFor UniWorX)) () -prewarmSourceReferences = \case - JobCtlPrewarmSheetFile{..} -> (.| C.mapMaybe E.unValue) $ do - E.selectSource . E.from $ \sheetFile -> do - E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet - E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType - E.where_ . E.isJust $ sheetFile E.^. SheetFileContent - return $ sheetFile E.^. SheetFileContent - E.selectSource . E.from $ \personalisedSheetFile -> do - E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet - E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType - E.where_ . E.isJust $ personalisedSheetFile E.^. PersonalisedSheetFileContent - return $ personalisedSheetFile E.^. PersonalisedSheetFileContent jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a jLocked jId act = flip evalStateT False $ do @@ -707,7 +634,7 @@ jLocked jId act = flip evalStateT False $ do update jId' [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] - + bracket lock unlock $ lift . act @@ -723,7 +650,7 @@ pruneLastExecs crontab = do ensureCrontab (Entity leId CronLastExec{..}) = maybeT (return mempty) $ do now <- liftIO getCurrentTime flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval - + if | abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2 -> return mempty diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index 9b76a0b00..27d621f9b 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -27,17 +27,6 @@ import qualified Data.Conduit.List as C import qualified Database.Esqueleto.Legacy as E import qualified Database.Esqueleto.Utils as E -import Jobs.Handler.Intervals.Utils - -import System.IO.Unsafe - -import Crypto.Hash (hashDigestSize, digestFromByteString) - -import Data.List (iterate) - -{-# NOINLINE prewarmCacheIntervalsCache #-} -prewarmCacheIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)]) -prewarmCacheIntervalsCache = unsafePerformIO $ newTVarIO Map.empty determineCrontab :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Crontab JobCtl) -- ^ Extract all future jobs from the database (sheet deadlines, ...) @@ -66,51 +55,9 @@ determineCrontab = execWriterT $ do } Nothing -> mempty - let - tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (ReaderT SqlReadBackend (HandlerFor UniWorX)) () - tellPrewarmJobs jcPrewarmSource jcTargetTime = maybeT_ $ do - PrewarmCacheConf{..} <- hoistMaybe appFileSourcePrewarmConf - - let - chunkHashBytes :: forall h. - ( Unwrapped FileContentChunkReference ~ Digest h ) - => Integer - chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h)) - intervals <- mkIntervalsCached prewarmCacheIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) precSteps - - let step = realToFrac $ toRational (precStart - precEnd) / toRational precSteps - step' = realToFrac $ toRational step / precMaxSpeedup - - mapM_ tell - [ HashMap.singleton - JobCtlPrewarmCache{..} - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ ts - , cronRepeat = CronRepeatOnChange - , cronRateLimit = step' - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ ts' - } - | jcChunkInterval <- intervals - | ts <- iterate (addUTCTime step) $ addUTCTime (-precStart) jcTargetTime - | ts' <- iterate (addUTCTime step') $ addUTCTime (subtract precStart . realToFrac $ toRational (precStart - precEnd) * (1 - recip precMaxSpeedup)) jcTargetTime - ] - - lift . maybeT_ $ do - injectInterval <- fmap abs . MaybeT . getsYesod $ view _appInjectFiles - tell $ HashMap.singleton - JobCtlInhibitInject{..} - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (negate $ precStart + injectInterval + 10) jcTargetTime - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = injectInterval / 2 - , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime (precInhibit - precStart) jcTargetTime - } - let sheetJobs (Entity nSheet Sheet{..}) = do - for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> do - tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetExercise) aFrom - + for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> when (isn't _JobsOffload appJobMode) $ do tell $ HashMap.singleton (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) @@ -120,9 +67,7 @@ determineCrontab = execWriterT $ do , cronRateLimit = appNotificationRateLimit , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo } - for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> do - tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetHint) hFrom - + for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> when (isn't _JobsOffload appJobMode) . maybeT_ $ do guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) @@ -136,9 +81,7 @@ determineCrontab = execWriterT $ do , cronRateLimit = appNotificationRateLimit , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo } - for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> do - tellPrewarmJobs (JobCtlPrewarmSheetFile nSheet SheetSolution) sFrom - + for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> when (isn't _JobsOffload appJobMode) . maybeT_ $ do guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 158a92f47..94370e9f8 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -44,13 +44,6 @@ import qualified Data.Sequence as Seq import Jobs.Handler.Intervals.Utils -import Data.IntervalMap.Strict (IntervalMap) -import qualified Data.IntervalMap.Strict as IntervalMap - -import Control.Concurrent.STM.TVar (stateTVar) - -import qualified Data.Foldable as F - import qualified Control.Monad.State.Class as State import Jobs.Types @@ -96,7 +89,7 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin missingDb <- runConduit . execStateC Map.empty $ do let insertRef refKind ref = State.modify' $ Map.alter (Just . Set.insert ref . fromMaybe Set.empty) refKind - + iforM_ trackedReferences $ \refKind refQuery -> do let fileReferencesQuery = do ref <- refQuery @@ -152,7 +145,7 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin , (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent ) , (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent ) ] - + {-# NOINLINE pruneUnreferencedFilesIntervalsCache #-} @@ -208,12 +201,12 @@ dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtom let unreferencedChunkHash = E.unKey $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. unreferencedChunkHash - return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs + return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs E.where_ $ chunkIdFilter unreferencedChunkHash unmarkRefSource refSource = runConduit $ refSource .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkSourceFiles chunkSize = 100 unmarkRefSource jobFileReferences - + let getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do @@ -277,16 +270,7 @@ dispatchJobInjectFiles :: JobHandler UniWorX dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do uploadBucket <- getsYesod $ view _appUploadCacheBucket interval <- getsYesod $ view _appInjectFiles - - now <- liftIO getCurrentTime - let - extractInhibited :: IntervalMap UTCTime (Set FileContentReference) - -> (Set FileContentReference, IntervalMap UTCTime (Set FileContentReference)) - extractInhibited cState = (F.fold current, IntervalMap.union current upcoming) - where - (_, current, upcoming) = IntervalMap.splitIntersecting cState $ IntervalMap.OpenInterval (addUTCTime (-2) now) (addUTCTime 2 now) - inhibited <- atomically . flip stateTVar extractInhibited =<< getsYesod appFileInjectInhibit - + -- NOTE: crude surgery happened here to remove ARC caching; useless artifacts may have remained let extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference extractReference _ = Nothing @@ -296,7 +280,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do injectOrDelete (objInfo, fRef) = do let obj = Minio.oiObject objInfo sz = fromIntegral $ max 1 $ Minio.oiSize objInfo - + fRef' <- runDB $ do logger <- askLoggerIO @@ -352,7 +336,6 @@ dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do (Sum injectedFiles, Sum injectedSize) <- runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) .| C.mapMaybe extractReference - .| C.filter (views _2 (`Set.notMember` inhibited)) .| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1) @@ -368,7 +351,7 @@ data RechunkFileException { oldHash, newHash :: FileContentReference } deriving (Eq, Ord, Show, Generic) deriving anyclass (Exception) - + dispatchJobRechunkFiles :: JobHandler UniWorX dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin where diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 77e27c963..e1eaa1de3 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -9,8 +9,7 @@ module Jobs.Types ( Job(..), Notification(..) , JobChildren , classifyJob - , JobCtlPrewarmSource(..), _jcpsSheet, _jcpsSheetFileType - , JobCtl(..), _jcPrewarmSource, _jcChunkInterval + , JobCtl(..) , classifyJobCtl , YesodJobDB , JobHandler(..), _JobHandlerAtomic, _JobHandlerException @@ -218,34 +217,8 @@ classifyJob job = unpack tag Aeson.String tag = obj HashMap.! "job" -data JobCtlPrewarmSource - = JobCtlPrewarmSheetFile - { jcpsSheet :: SheetId - , jcpsSheetFileType :: SheetFileType - } - deriving (Eq, Ord, Read, Show, Generic) - deriving anyclass (Hashable, NFData) - -makeLenses_ ''JobCtlPrewarmSource - -deriveJSON defaultOptions - { constructorTagModifier = camelToPathPiece' 3 - , fieldLabelModifier = camelToPathPiece' 1 - , tagSingleConstructors = True - , sumEncoding = TaggedObject "source" "data" - } ''JobCtlPrewarmSource - data JobCtl = JobCtlFlush | JobCtlPerform QueuedJobId - | JobCtlPrewarmCache - { jcPrewarmSource :: JobCtlPrewarmSource - , jcTargetTime :: UTCTime - , jcChunkInterval :: (Maybe FileContentChunkReference, Maybe FileContentChunkReference) - } - | JobCtlInhibitInject - { jcPrewarmSource :: JobCtlPrewarmSource - , jcTargetTime :: UTCTime - } | JobCtlDetermineCrontab | JobCtlQueue Job | JobCtlGenerateHealthReport HealthCheck diff --git a/src/Settings.hs b/src/Settings.hs index 8a79f11e6..20a703a06 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -238,9 +238,6 @@ data AppSettings = AppSettings , appJobLmsQualificationsEnqueueHour :: Maybe Natural , appJobLmsQualificationsDequeueHour :: Maybe Natural - , appFileSourceARCConf :: Maybe (ARCConf Int) - , appFileSourcePrewarmConf :: Maybe PrewarmCacheConf - , appBotMitigations :: Set SettingBotMitigation , appVolatileClusterSettingsCacheTime :: DiffTime @@ -420,18 +417,6 @@ data VerpMode = VerpNone | Verp { verpPrefix :: Text, verpSeparator :: Char } deriving (Eq, Show, Read, Generic) -data ARCConf w = ARCConf - { arccMaximumGhost :: Int - , arccMaximumWeight :: w - } deriving (Eq, Ord, Read, Show, Generic) - -data PrewarmCacheConf = PrewarmCacheConf - { precMaximumWeight :: Int - , precStart, precEnd, precInhibit :: NominalDiffTime -- ^ Prewarming cache starts at @t - precStart@ and should be finished by @t - precEnd@; injecting from minio to database is inhibited from @t - precStart@ until @t - precStart + precInhibit@ - , precSteps :: Natural - , precMaxSpeedup :: Rational - } deriving (Eq, Ord, Read, Show, Generic) - data SettingBotMitigation = SettingBotMitigationOnlyLoggedInTableSorting | SettingBotMitigationUnauthorizedFormHoneypots @@ -475,16 +460,6 @@ deriveJSON defaultOptions , constructorTagModifier = camelToPathPiece' 1 } ''JobMode -deriveJSON defaultOptions - { fieldLabelModifier = camelToPathPiece' 1 - } ''ARCConf - -deriveJSON defaultOptions - { fieldLabelModifier = camelToPathPiece' 1 - } ''PrewarmCacheConf - -makeLenses_ ''PrewarmCacheConf - nullaryPathPiece ''SettingBotMitigation $ camelToPathPiece' 3 pathPieceJSON ''SettingBotMitigation pathPieceJSONKey ''SettingBotMitigation @@ -823,17 +798,6 @@ instance FromJSON AppSettings where appJobLmsQualificationsEnqueueHour <- o .:? "job-lms-qualifications-enqueue-hour" appJobLmsQualificationsDequeueHour <- o .:? "job-lms-qualifications-dequeue-hour" - appFileSourceARCConf <- assertM isValidARCConf <$> o .:? "file-source-arc" - - let isValidPrewarmConf PrewarmCacheConf{..} = and - [ precMaximumWeight > 0 - , precStart >= 0 - , precEnd >= 0, precEnd <= precStart - , precSteps > 0 - , precMaxSpeedup >= 1 - ] - appFileSourcePrewarmConf <- over (_Just . _precInhibit) (max 0) . assertM isValidPrewarmConf <$> o .:? "file-source-prewarm" - appBotMitigations <- o .:? "bot-mitigations" .!= Set.empty appVolatileClusterSettingsCacheTime <- o .: "volatile-cluster-settings-cache-time" @@ -846,7 +810,6 @@ instance FromJSON AppSettings where appLegalExternal <- o .: "legal-external" return AppSettings{..} - where isValidARCConf ARCConf{..} = arccMaximumWeight > 0 makeClassy_ ''AppSettings diff --git a/src/Utils.hs b/src/Utils.hs index 4789fe3f4..80b54245d 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -44,7 +44,6 @@ import Utils.I18n as Utils import Utils.NTop as Utils import Utils.HttpConditional as Utils import Utils.Persist as Utils -import Utils.LRU as Utils import Utils.Set as Utils import Text.Blaze (Markup, ToMarkup(..)) diff --git a/src/Utils/LRU.hs b/src/Utils/LRU.hs deleted file mode 100644 index 66517d70d..000000000 --- a/src/Utils/LRU.hs +++ /dev/null @@ -1,217 +0,0 @@ --- SPDX-FileCopyrightText: 2022 Gregor Kleen --- --- SPDX-License-Identifier: AGPL-3.0-or-later - -module Utils.LRU - ( LRUTick - , LRU, initLRU - , insertLRU, lookupLRU, touchLRU, timeoutLRU - , LRUHandle, initLRUHandle - , insertLRUHandle, lookupLRUHandle, touchLRUHandle, timeoutLRUHandle - , readLRUHandle - , lruStoreSize - , getLRUWeight - , describeLRU - ) where - -import ClassyPrelude - -import Data.OrdPSQ (OrdPSQ) -import qualified Data.OrdPSQ as OrdPSQ - -import Control.Lens - --- https://jaspervdj.be/posts/2015-02-24-lru-cache.html - - -newtype LRUTick = LRUTick { _getLRUTick :: Word64 } - deriving (Eq, Ord, Show) - deriving newtype (NFData) - -makeLenses ''LRUTick - -data LRU k t w v = LRU - { lruStore :: !(OrdPSQ k (t, LRUTick) (v, w)) - , lruWeight :: !w - , lruMaximumWeight :: !w - } - -instance (NFData k, NFData t, NFData w, NFData v) => NFData (LRU k t w v) where - rnf LRU{..} = rnf lruStore - `seq` rnf lruWeight - `seq` rnf lruMaximumWeight - -describeLRU :: Show w - => LRU k t w v - -> String -describeLRU LRU{..} = intercalate ", " - [ "lruStore: " <> show (OrdPSQ.size lruStore) - , "lruWeight: " <> show lruWeight - , "lruMaximumWeight: " <> show lruMaximumWeight - ] - -lruStoreSize :: LRU k t w v -> Int -lruStoreSize = OrdPSQ.size . lruStore - -getLRUWeight :: LRU k t w v -> w -getLRUWeight = lruWeight - -initialLRUTick, maximumLRUTick :: LRUTick -initialLRUTick = LRUTick 0 -maximumLRUTick = LRUTick maxBound - -initLRU :: forall k t w v. - Integral w - => w -- ^ @lruMaximumWeight@ - -> (LRU k t w v, LRUTick) -initLRU lruMaximumWeight - | lruMaximumWeight < 0 = error "initLRU given negative maximum weight" - | otherwise = (lru, initialLRUTick) - where lru = LRU { lruStore = OrdPSQ.empty - , lruWeight = 0 - , lruMaximumWeight - } - -insertLRU :: forall k t w v. - ( Ord k, Ord t - , Integral w - ) - => k - -> t - -> Maybe (v, w) - -> LRU k t w v - -> LRUTick -> (LRU k t w v, LRUTick) -insertLRU k t newVal oldLRU@LRU{..} now - | later <= initialLRUTick = uncurry (insertLRU k t newVal) $ initLRU lruMaximumWeight - | Just (_, w) <- newVal, w > lruMaximumWeight = (oldLRU, now) - | Just (_, w) <- newVal = (, later) $ - let (lruStore', lruWeight') = evictToSize (lruMaximumWeight - w) lruStore lruWeight - (fromMaybe 0 . preview (_Just . _2 . _2) -> oldWeight, lruStore'') - = OrdPSQ.alter (, ((t, now), ) <$> newVal) k lruStore' - in oldLRU { lruStore = lruStore'' - , lruWeight = lruWeight' - oldWeight + w - } - | Just (_, (_, w), lruStore') <- OrdPSQ.deleteView k lruStore = - let lru = oldLRU { lruStore = lruStore' - , lruWeight = lruWeight - w - } - in (lru, now) - | otherwise = (oldLRU, now) - where - later :: LRUTick - later = over getLRUTick succ now - - evictToSize :: w -> OrdPSQ k (t, LRUTick) (v, w) -> w -> (OrdPSQ k (t, LRUTick) (v, w), w) - evictToSize tSize c cSize - | cSize <= tSize = (c, cSize) - | Just (_, _, (_, w'), c') <- OrdPSQ.minView c = evictToSize tSize c' (cSize - w') - | otherwise = error "evictToSize: cannot reach required size through eviction" - -lookupLRU :: forall k t w v. - Ord k - => k - -> LRU k t w v - -> Maybe (v, w) -lookupLRU k LRU{..} = view _2 <$> OrdPSQ.lookup k lruStore - -touchLRU :: forall k t w v. - ( Ord k, Ord t - , Integral w - ) - => k - -> t - -> LRU k t w v - -> LRUTick -> ((LRU k t w v, LRUTick), Maybe (v, w)) -touchLRU k t oldLRU@LRU{..} now - | (Just (_, v), _) <- altered - , later <= initialLRUTick = (, Just v) . uncurry (insertLRU k t $ Just v) $ initLRU lruMaximumWeight - | (Just (_, v), lruStore') <- altered = ((oldLRU{ lruStore = lruStore' }, later), Just v) - | otherwise = ((oldLRU, now), Nothing) - where - altered = OrdPSQ.alter (\oldVal -> (oldVal, over _1 (max (t, later)) <$> oldVal)) k lruStore - - later :: LRUTick - later = over getLRUTick succ now - -timeoutLRU :: forall k t w v. - ( Ord k, Ord t - , Integral w - ) - => t - -> LRU k t w v - -> LRU k t w v -timeoutLRU t oldLRU@LRU{..} = oldLRU - { lruStore = lruStore' - , lruWeight = lruWeight - evictedWeight - } - where - (evicted, lruStore') = OrdPSQ.atMostView (t, maximumLRUTick) lruStore - evictedWeight = sumOf (folded . _3 . _2) evicted - -newtype LRUHandle k t w v = LRUHandle { _getLRUHandle :: IORef (LRU k t w v, LRUTick) } - deriving (Eq) - -initLRUHandle :: forall k t w v m. - ( MonadIO m - , Integral w - ) - => w -- ^ @lruMaximumWeight@ - -> m (LRUHandle k t w v) -initLRUHandle maxWeight = fmap LRUHandle . newIORef $ initLRU maxWeight - -insertLRUHandle :: forall k t w v m. - ( MonadIO m - , Ord k, Ord t - , Integral w - , NFData k, NFData t, NFData w, NFData v - ) - => LRUHandle k t w v - -> k - -> t - -> (v, w) - -> m () -insertLRUHandle (LRUHandle lruVar) k t newVal - = modifyIORef' lruVar $ force . uncurry (insertLRU k t $ Just newVal) - -lookupLRUHandle :: forall k t w v m. - ( MonadIO m - , Ord k - ) - => LRUHandle k t w v - -> k - -> m (Maybe (v, w)) -lookupLRUHandle (LRUHandle lruVar) k - = views _1 (lookupLRU k) <$> readIORef lruVar - -touchLRUHandle :: forall k t w v m. - ( MonadIO m - , Ord k, Ord t - , Integral w - , NFData k, NFData t, NFData w, NFData v - ) - => LRUHandle k t w v - -> k - -> t - -> m (Maybe (v, w)) -touchLRUHandle (LRUHandle lruVar) k t = do - oldLRU <- readIORef lruVar - let (newLRU, touched) = uncurry (touchLRU k t) oldLRU - force newLRU `seq` writeIORef lruVar newLRU - return touched - -timeoutLRUHandle :: forall k t w v m. - ( MonadIO m - , Ord k, Ord t - , Integral w - , NFData k, NFData t, NFData w, NFData v - ) - => LRUHandle k t w v - -> t - -> m () -timeoutLRUHandle (LRUHandle lruVar) t - = modifyIORef' lruVar $ force . over _1 (timeoutLRU t) - -readLRUHandle :: MonadIO m - => LRUHandle k t w v - -> m (LRU k t w v, LRUTick) -readLRUHandle (LRUHandle lruVar) = readIORef lruVar diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 4eeefbb75..31f29e6c6 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -19,9 +19,6 @@ module Utils.Metrics , observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles , registerJobWorkerQueueDepth , observeMissingFiles - , LRUMetrics, LRULabel(..) - , lruMetrics - , InjectInhibitMetrics, injectInhibitMetrics , PoolMetrics, PoolLabel(..) , poolMetrics , observeDatabaseConnectionOpened, observeDatabaseConnectionClosed @@ -53,11 +50,6 @@ import Jobs.Types import qualified Data.Aeson as Aeson import qualified Data.HashMap.Strict as HashMap -import Data.IntervalMap.Strict (IntervalMap) -import qualified Data.IntervalMap.Strict as IntervalMap - -import qualified Data.Foldable as F - import qualified Utils.Pool as Custom import GHC.Stack @@ -272,57 +264,6 @@ relabel :: Text -> Text -> SampleGroup -> SampleGroup relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lbls v) -> Sample k ((l, s) : filter (views _1 $ (/=) l) lbls) v -data LRUMetrics = LRUMetrics - -data LRULabel = LRUFileSourcePrewarm - deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic) - deriving anyclass (Universe, Finite) - -nullaryPathPiece ''LRULabel $ camelToPathPiece' 1 - -lruMetrics :: Integral w - => LRULabel - -> LRUHandle k t w v - -> Metric LRUMetrics -lruMetrics lbl lh = Metric $ return (LRUMetrics, collectLRUMetrics) - where - labelLru = relabel "lru" - - collectLRUMetrics = map (labelLru $ toPathPiece lbl) <$> do - (lru, _) <- readLRUHandle lh - return - [ SampleGroup sizeInfo GaugeType - [ Sample "lru_size" [] . encodeUtf8 . tshow $ lruStoreSize lru - ] - , SampleGroup weightInfo GaugeType - [ Sample "lru_weight" [] . encodeUtf8 . tshow . toInteger $ getLRUWeight lru - ] - ] - sizeInfo = Info "lru_size" - "Number of entries in the LRU" - weightInfo = Info "lru_weight" - "Sum of weights of entries in the LRU" - -data InjectInhibitMetrics = InjectInhibitMetrics - -injectInhibitMetrics :: TVar (IntervalMap UTCTime (Set FileContentReference)) - -> Metric InjectInhibitMetrics -injectInhibitMetrics tvar = Metric $ return (InjectInhibitMetrics, collectInjectInhibitMetrics) - where - collectInjectInhibitMetrics = do - inhibits <- readTVarIO tvar - return - [ SampleGroup intervalsInfo GaugeType - [ Sample "uni2work_inject_inhibited_intervals_count" [] . encodeUtf8 . tshow $ IntervalMap.size inhibits - ] - , SampleGroup hashesInfo GaugeType - [ Sample "uni2work_inject_inhibited_hashes_count" [] . encodeUtf8 . tshow . Set.size $ F.fold inhibits - ] - ] - intervalsInfo = Info "uni2work_inject_inhibited_intervals_count" - "Number of distinct time intervals in which we don't transfer some files from upload cache to db" - hashesInfo = Info "uni2work_inject_inhibited_hashes_count" - "Number of files which we don't transfer from upload cache to db during some interval" data PoolMetrics = PoolMetrics