feat(files): monitor missing files
This commit is contained in:
parent
e4416e7f0e
commit
fb0ae65ac5
@ -100,6 +100,15 @@ determineCrontab = execWriterT $ do
|
|||||||
, cronNotAfter = Right CronNotScheduled
|
, cronNotAfter = Right CronNotScheduled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tell $ HashMap.singleton
|
||||||
|
(JobCtlQueue JobDetectMissingFiles)
|
||||||
|
Cron
|
||||||
|
{ cronInitial = CronAsap
|
||||||
|
, cronRepeat = CronRepeatScheduled CronAsap
|
||||||
|
, cronRateLimit = 7200
|
||||||
|
, cronNotAfter = Right CronNotScheduled
|
||||||
|
}
|
||||||
|
|
||||||
tell . flip foldMap universeF $ \kind ->
|
tell . flip foldMap universeF $ \kind ->
|
||||||
case appHealthCheckInterval kind of
|
case appHealthCheckInterval kind of
|
||||||
Just int -> HashMap.singleton
|
Just int -> HashMap.singleton
|
||||||
|
|||||||
@ -6,6 +6,7 @@ module Jobs.Handler.Files
|
|||||||
( dispatchJobPruneSessionFiles
|
( dispatchJobPruneSessionFiles
|
||||||
, dispatchJobPruneUnreferencedFiles
|
, dispatchJobPruneUnreferencedFiles
|
||||||
, dispatchJobInjectFiles, dispatchJobRechunkFiles
|
, dispatchJobInjectFiles, dispatchJobRechunkFiles
|
||||||
|
, dispatchJobDetectMissingFiles
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Import hiding (matching, maximumBy, init)
|
import Import hiding (matching, maximumBy, init)
|
||||||
@ -30,7 +31,7 @@ import Data.Bits (Bits(shiftR))
|
|||||||
|
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
|
|
||||||
import Control.Monad.Random.Lazy
|
import Control.Monad.Random.Lazy (evalRand, mkStdGen)
|
||||||
import System.Random.Shuffle (shuffleM)
|
import System.Random.Shuffle (shuffleM)
|
||||||
import System.IO.Unsafe
|
import System.IO.Unsafe
|
||||||
|
|
||||||
@ -40,6 +41,10 @@ import Control.Monad.Logger (askLoggerIO, runLoggingT)
|
|||||||
|
|
||||||
import System.Clock
|
import System.Clock
|
||||||
|
|
||||||
|
import qualified Data.Set as Set
|
||||||
|
|
||||||
|
import Jobs.Queue (YesodJobDB)
|
||||||
|
|
||||||
|
|
||||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||||
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
|
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
|
||||||
@ -69,6 +74,68 @@ fileReferences (E.just -> fHash)
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
dispatchJobDetectMissingFiles :: JobHandler UniWorX
|
||||||
|
dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin
|
||||||
|
where
|
||||||
|
act :: YesodJobDB UniWorX (Map Text (NonNull (Set FileContentReference)))
|
||||||
|
act = hoist lift $ do
|
||||||
|
uploadBucket <- getsYesod $ view _appUploadCacheBucket
|
||||||
|
|
||||||
|
missingDb <- forM trackedReferences $ \refQuery ->
|
||||||
|
fmap (Set.fromList . mapMaybe E.unValue) . E.select $ do
|
||||||
|
ref <- refQuery
|
||||||
|
E.where_ . E.not_ $ E.isNothing ref
|
||||||
|
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
|
||||||
|
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref
|
||||||
|
E.distinctOnOrderBy [E.asc ref] $ return ref
|
||||||
|
|
||||||
|
let allMissingDb :: Set Minio.Object
|
||||||
|
allMissingDb = setOf (folded . folded . re minioFileReference) missingDb
|
||||||
|
filterMissingDb :: forall m. Monad m
|
||||||
|
=> Set Minio.Object
|
||||||
|
-> ConduitT Minio.ListItem (Set Minio.Object) m ()
|
||||||
|
filterMissingDb remaining = maybeT (yield remaining) $ do
|
||||||
|
nextMinio <- MaybeT await
|
||||||
|
remaining' <- case nextMinio of
|
||||||
|
Minio.ListItemObject oi -> do
|
||||||
|
let (missingMinio, remaining') = Set.split (Minio.oiObject oi) remaining
|
||||||
|
lift $ yield missingMinio
|
||||||
|
return remaining'
|
||||||
|
_other -> return remaining
|
||||||
|
lift $ filterMissingDb remaining'
|
||||||
|
|
||||||
|
allMissingMinio <- maybeT (return $ fold missingDb) . hoistMaybeM . runAppMinio . runMaybeT . runConduit $
|
||||||
|
transPipe lift (Minio.listObjects uploadBucket Nothing True)
|
||||||
|
.| filterMissingDb allMissingDb
|
||||||
|
.| C.foldMapE (setOf minioFileReference)
|
||||||
|
|
||||||
|
return $ Map.mapMaybe (fromNullable . Set.intersection allMissingMinio) missingDb
|
||||||
|
|
||||||
|
fin :: Map Text (NonNull (Set FileContentReference)) -> Handler ()
|
||||||
|
fin missingCounts = do
|
||||||
|
forM_ (Map.keysSet trackedReferences) $ \refIdent ->
|
||||||
|
observeMissingFiles refIdent . maybe 0 olength $ missingCounts Map.!? refIdent
|
||||||
|
|
||||||
|
iforM_ missingCounts $ \refIdent missingFiles
|
||||||
|
-> let missingRefs = unlines . map tshow . Set.toList $ toNullable missingFiles
|
||||||
|
in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}\n#{missingRefs}|]
|
||||||
|
|
||||||
|
when (Map.null missingCounts) $
|
||||||
|
$logInfoS "MissingFiles" [st|No missing files|]
|
||||||
|
|
||||||
|
trackedReferences = Map.fromList $ over (traverse . _1) nameToPathPiece
|
||||||
|
[ (''CourseApplicationFile, E.from $ \appFile -> return $ appFile E.^. CourseApplicationFileContent )
|
||||||
|
, (''MaterialFile, E.from $ \matFile -> return $ matFile E.^. MaterialFileContent )
|
||||||
|
, (''CourseNewsFile, E.from $ \newsFile -> return $ newsFile E.^. CourseNewsFileContent )
|
||||||
|
, (''SheetFile, E.from $ \sheetFile -> return $ sheetFile E.^. SheetFileContent )
|
||||||
|
, (''CourseAppInstructionFile, E.from $ \appInstr -> return $ appInstr E.^. CourseAppInstructionFileContent)
|
||||||
|
, (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent )
|
||||||
|
, (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent )
|
||||||
|
, (''AllocationMatching, E.from $ \matching -> return . E.just $ matching E.^. AllocationMatchingLog )
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
|
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
|
||||||
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
|
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
|
||||||
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
|
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
|
||||||
|
|||||||
@ -91,6 +91,7 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica
|
|||||||
| JobInjectFiles
|
| JobInjectFiles
|
||||||
| JobPruneFallbackPersonalisedSheetFilesKeys
|
| JobPruneFallbackPersonalisedSheetFilesKeys
|
||||||
| JobRechunkFiles
|
| JobRechunkFiles
|
||||||
|
| JobDetectMissingFiles
|
||||||
deriving (Eq, Ord, Show, Read, Generic, Typeable)
|
deriving (Eq, Ord, Show, Read, Generic, Typeable)
|
||||||
data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
|
data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
|
||||||
| NotificationSheetActive { nSheet :: SheetId }
|
| NotificationSheetActive { nSheet :: SheetId }
|
||||||
@ -246,6 +247,7 @@ jobNoQueueSame = \case
|
|||||||
JobInjectFiles{} -> True
|
JobInjectFiles{} -> True
|
||||||
JobPruneFallbackPersonalisedSheetFilesKeys{} -> True
|
JobPruneFallbackPersonalisedSheetFilesKeys{} -> True
|
||||||
JobRechunkFiles{} -> True
|
JobRechunkFiles{} -> True
|
||||||
|
JobDetectMissingFiles{} -> True
|
||||||
_ -> False
|
_ -> False
|
||||||
|
|
||||||
jobMovable :: JobCtl -> Bool
|
jobMovable :: JobCtl -> Bool
|
||||||
|
|||||||
@ -13,6 +13,7 @@ module Utils.Metrics
|
|||||||
, FileChunkStorage(..), observeSourcedChunk, observeSunkChunk
|
, FileChunkStorage(..), observeSourcedChunk, observeSunkChunk
|
||||||
, observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles
|
, observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles
|
||||||
, registerJobWorkerQueueDepth
|
, registerJobWorkerQueueDepth
|
||||||
|
, observeMissingFiles
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Import.NoModel hiding (Vector, Info)
|
import Import.NoModel hiding (Vector, Info)
|
||||||
@ -209,6 +210,13 @@ jobWorkerQueueDepth jSt = Metric $ return (MkJobWorkerQueueDepth, collectJobWork
|
|||||||
info = Info "uni2work_queued_jobs_count"
|
info = Info "uni2work_queued_jobs_count"
|
||||||
"Number of JobQueue entries in this Uni2work-instance"
|
"Number of JobQueue entries in this Uni2work-instance"
|
||||||
|
|
||||||
|
{-# NOINLINE missingFiles #-}
|
||||||
|
missingFiles :: Vector Label1 Gauge
|
||||||
|
missingFiles = unsafeRegister . vector "ref" $ gauge info
|
||||||
|
where info = Info "uni2work_missing_files_count"
|
||||||
|
"Number of files referenced from within database that are missing"
|
||||||
|
|
||||||
|
|
||||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||||
withHealthReportMetrics act = do
|
withHealthReportMetrics act = do
|
||||||
before <- liftIO $ getTime Monotonic
|
before <- liftIO $ getTime Monotonic
|
||||||
@ -344,3 +352,6 @@ observeRechunkedFiles num size = liftIO $ do
|
|||||||
|
|
||||||
registerJobWorkerQueueDepth :: MonadIO m => TMVar JobState -> m ()
|
registerJobWorkerQueueDepth :: MonadIO m => TMVar JobState -> m ()
|
||||||
registerJobWorkerQueueDepth = liftIO . void . register . jobWorkerQueueDepth
|
registerJobWorkerQueueDepth = liftIO . void . register . jobWorkerQueueDepth
|
||||||
|
|
||||||
|
observeMissingFiles :: MonadIO m => Text -> Int -> m ()
|
||||||
|
observeMissingFiles refIdent = liftIO . withLabel missingFiles refIdent . flip setGauge . fromIntegral
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user