From fb0ae65ac5928443abc01de9b57c69849d6a6b21 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 23 Sep 2020 16:48:41 +0200 Subject: [PATCH] feat(files): monitor missing files --- src/Jobs/Crontab.hs | 9 +++++ src/Jobs/Handler/Files.hs | 69 ++++++++++++++++++++++++++++++++++++++- src/Jobs/Types.hs | 2 ++ src/Utils/Metrics.hs | 11 +++++++ 4 files changed, 90 insertions(+), 1 deletion(-) diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index 8127e16e8..1663cb2fc 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -100,6 +100,15 @@ determineCrontab = execWriterT $ do , cronNotAfter = Right CronNotScheduled } + tell $ HashMap.singleton + (JobCtlQueue JobDetectMissingFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = 7200 + , cronNotAfter = Right CronNotScheduled + } + tell . flip foldMap universeF $ \kind -> case appHealthCheckInterval kind of Just int -> HashMap.singleton diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 2648edd05..b22a02af6 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -6,6 +6,7 @@ module Jobs.Handler.Files ( dispatchJobPruneSessionFiles , dispatchJobPruneUnreferencedFiles , dispatchJobInjectFiles, dispatchJobRechunkFiles + , dispatchJobDetectMissingFiles ) where import Import hiding (matching, maximumBy, init) @@ -30,7 +31,7 @@ import Data.Bits (Bits(shiftR)) import qualified Data.Map.Strict as Map -import Control.Monad.Random.Lazy +import Control.Monad.Random.Lazy (evalRand, mkStdGen) import System.Random.Shuffle (shuffleM) import System.IO.Unsafe @@ -40,6 +41,10 @@ import Control.Monad.Logger (askLoggerIO, runLoggingT) import System.Clock +import qualified Data.Set as Set + +import Jobs.Queue (YesodJobDB) + dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin @@ -69,6 +74,68 @@ fileReferences (E.just -> fHash) ] +dispatchJobDetectMissingFiles :: JobHandler UniWorX +dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin + where + act :: YesodJobDB UniWorX (Map Text (NonNull (Set FileContentReference))) + act = hoist lift $ do + uploadBucket <- getsYesod $ view _appUploadCacheBucket + + missingDb <- forM trackedReferences $ \refQuery -> + fmap (Set.fromList . mapMaybe E.unValue) . E.select $ do + ref <- refQuery + E.where_ . E.not_ $ E.isNothing ref + E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref + E.distinctOnOrderBy [E.asc ref] $ return ref + + let allMissingDb :: Set Minio.Object + allMissingDb = setOf (folded . folded . re minioFileReference) missingDb + filterMissingDb :: forall m. Monad m + => Set Minio.Object + -> ConduitT Minio.ListItem (Set Minio.Object) m () + filterMissingDb remaining = maybeT (yield remaining) $ do + nextMinio <- MaybeT await + remaining' <- case nextMinio of + Minio.ListItemObject oi -> do + let (missingMinio, remaining') = Set.split (Minio.oiObject oi) remaining + lift $ yield missingMinio + return remaining' + _other -> return remaining + lift $ filterMissingDb remaining' + + allMissingMinio <- maybeT (return $ fold missingDb) . hoistMaybeM . runAppMinio . runMaybeT . runConduit $ + transPipe lift (Minio.listObjects uploadBucket Nothing True) + .| filterMissingDb allMissingDb + .| C.foldMapE (setOf minioFileReference) + + return $ Map.mapMaybe (fromNullable . Set.intersection allMissingMinio) missingDb + + fin :: Map Text (NonNull (Set FileContentReference)) -> Handler () + fin missingCounts = do + forM_ (Map.keysSet trackedReferences) $ \refIdent -> + observeMissingFiles refIdent . maybe 0 olength $ missingCounts Map.!? refIdent + + iforM_ missingCounts $ \refIdent missingFiles + -> let missingRefs = unlines . map tshow . Set.toList $ toNullable missingFiles + in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}\n#{missingRefs}|] + + when (Map.null missingCounts) $ + $logInfoS "MissingFiles" [st|No missing files|] + + trackedReferences = Map.fromList $ over (traverse . _1) nameToPathPiece + [ (''CourseApplicationFile, E.from $ \appFile -> return $ appFile E.^. CourseApplicationFileContent ) + , (''MaterialFile, E.from $ \matFile -> return $ matFile E.^. MaterialFileContent ) + , (''CourseNewsFile, E.from $ \newsFile -> return $ newsFile E.^. CourseNewsFileContent ) + , (''SheetFile, E.from $ \sheetFile -> return $ sheetFile E.^. SheetFileContent ) + , (''CourseAppInstructionFile, E.from $ \appInstr -> return $ appInstr E.^. CourseAppInstructionFileContent) + , (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent ) + , (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent ) + , (''AllocationMatching, E.from $ \matching -> return . E.just $ matching E.^. AllocationMatchingLog ) + ] + + + {-# NOINLINE pruneUnreferencedFilesIntervalsCache #-} pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)]) pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 851ad9ac7..504264894 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -91,6 +91,7 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica | JobInjectFiles | JobPruneFallbackPersonalisedSheetFilesKeys | JobRechunkFiles + | JobDetectMissingFiles deriving (Eq, Ord, Show, Read, Generic, Typeable) data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId } | NotificationSheetActive { nSheet :: SheetId } @@ -246,6 +247,7 @@ jobNoQueueSame = \case JobInjectFiles{} -> True JobPruneFallbackPersonalisedSheetFilesKeys{} -> True JobRechunkFiles{} -> True + JobDetectMissingFiles{} -> True _ -> False jobMovable :: JobCtl -> Bool diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index f9cfc0b2e..a722b5c94 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -13,6 +13,7 @@ module Utils.Metrics , FileChunkStorage(..), observeSourcedChunk, observeSunkChunk , observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles , registerJobWorkerQueueDepth + , observeMissingFiles ) where import Import.NoModel hiding (Vector, Info) @@ -209,6 +210,13 @@ jobWorkerQueueDepth jSt = Metric $ return (MkJobWorkerQueueDepth, collectJobWork info = Info "uni2work_queued_jobs_count" "Number of JobQueue entries in this Uni2work-instance" +{-# NOINLINE missingFiles #-} +missingFiles :: Vector Label1 Gauge +missingFiles = unsafeRegister . vector "ref" $ gauge info + where info = Info "uni2work_missing_files_count" + "Number of files referenced from within database that are missing" + + withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport withHealthReportMetrics act = do before <- liftIO $ getTime Monotonic @@ -344,3 +352,6 @@ observeRechunkedFiles num size = liftIO $ do registerJobWorkerQueueDepth :: MonadIO m => TMVar JobState -> m () registerJobWorkerQueueDepth = liftIO . void . register . jobWorkerQueueDepth + +observeMissingFiles :: MonadIO m => Text -> Int -> m () +observeMissingFiles refIdent = liftIO . withLabel missingFiles refIdent . flip setGauge . fromIntegral