diff --git a/src/Jobs.hs b/src/Jobs.hs index 91e8cf605..17ec46921 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -102,6 +102,7 @@ handleJobs foundation@UniWorX{..} jobCurrentCrontab <- liftIO $ newTVarIO Nothing jobHeldLocks <- liftIO $ newTVarIO Set.empty registerJobHeldLocksCount jobHeldLocks + registerJobWorkerQueueDepth appJobState atomically $ putTMVar appJobState JobState { jobContext = JobContext{..} , .. @@ -452,6 +453,12 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker JobHandlerException act -> do act & withJobWorkerState wNum (JobWorkerExecJob content) runDB $ setSerializable cleanup + JobHandlerAtomicWithFinalizer act fin -> do + res <- runDBJobs . setSerializable $ do + res <- act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + return res + fin res handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandler . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 4190a1131..c3e24551d 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -34,11 +34,13 @@ import Handler.Utils.Files (sourceFileDB) dispatchJobPruneSessionFiles :: JobHandler UniWorX -dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - expires <- getsYesod $ view _appSessionFilesExpire - n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] - $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|] +dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + now <- liftIO getCurrentTime + expires <- getsYesod $ view _appSessionFilesExpire + deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] + fin n = $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|] @@ -64,142 +66,146 @@ pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChu pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX -dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - interval <- getsYesod $ view _appPruneUnreferencedFilesInterval - keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles +dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + now <- liftIO getCurrentTime + interval <- getsYesod $ view _appPruneUnreferencedFilesInterval + keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles - let - chunkHashBytes :: forall h. - ( Unwrapped FileContentChunkReference ~ Digest h ) - => Integer - chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h)) - chunkHashBits = chunkHashBytes * 8 - base :: Integer - base = 2 ^ chunkHashBits - intervals :: [Integer] - -- | Exclusive upper bounds - intervals - | numIterations <= 0 = pure base - | otherwise = go protoIntervals ^.. folded . _1 - where - go [] = [] - go ints - | maximumOf (folded . _1) ints == Just base = ints - | otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts - where - closest = maximumBy (comparing $ view _2) ints - (lts, geqs) = partition (((>) `on` view _1) closest) ints - gts = filter (((<) `on` view _1) closest) geqs + let + chunkHashBytes :: forall h. + ( Unwrapped FileContentChunkReference ~ Digest h ) + => Integer + chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h)) + chunkHashBits = chunkHashBytes * 8 + base :: Integer + base = 2 ^ chunkHashBits + intervals :: [Integer] -- | Exclusive upper bounds - protoIntervals :: [(Integer, Integer)] - protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations - | i <- [1 .. toInteger numIterations] - ] + intervals + | numIterations <= 0 = pure base + | otherwise = go protoIntervals ^.. folded . _1 + where + go [] = [] + go ints + | maximumOf (folded . _1) ints == Just base = ints + | otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts + where + closest = maximumBy (comparing $ view _2) ints + (lts, geqs) = partition (((>) `on` view _1) closest) ints + gts = filter (((<) `on` view _1) closest) geqs + -- | Exclusive upper bounds + protoIntervals :: [(Integer, Integer)] + protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations + | i <- [1 .. toInteger numIterations] + ] - intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals + intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals - toDigest :: Integer -> Maybe FileContentChunkReference - toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step - where step i - | i <= 0 = Nothing - | otherwise = Just (fromIntegral i, i `shiftR` 8) - pad bs - | toInteger (ByteString.length bs) >= chunkHashBytes = bs - | otherwise = pad $ ByteString.cons 0 bs + toDigest :: Integer -> Maybe FileContentChunkReference + toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step + where step i + | i <= 0 = Nothing + | otherwise = Just (fromIntegral i, i `shiftR` 8) + pad bs + | toInteger (ByteString.length bs) >= chunkHashBytes = bs + | otherwise = pad $ ByteString.cons 0 bs - intervalsDgsts <- atomically $ do - cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache - case Map.lookup numIterations cachedDgsts of - Just c -> return c - Nothing -> do - modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts' - return intervalsDgsts' + intervalsDgsts <- atomically $ do + cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache + case Map.lookup numIterations cachedDgsts of + Just c -> return c + Nothing -> do + modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts' + return intervalsDgsts' - let - permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch) - - (minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts) - chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool) - chunkIdFilter cRef = E.and $ catMaybes - [ minBoundDgst <&> \b -> cRef E.>=. E.val b - , maxBoundDgst <&> \b -> cRef E.<. E.val b - ] + let + permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch) - $logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst) - - E.insertSelectWithConflict - (UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint") - (E.from $ \fileContentChunk -> do - E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do - E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId + (minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts) + chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool) + chunkIdFilter cRef = E.and $ catMaybes + [ minBoundDgst <&> \b -> cRef E.>=. E.val b + , maxBoundDgst <&> \b -> cRef E.<. E.val b + ] + + $logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst) + + E.insertSelectWithConflict + (UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint") + (E.from $ \fileContentChunk -> do + E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId + return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash + E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash + return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now + ) + (\current excluded -> + [ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ] + ) + + E.delete . E.from $ \fileContentChunkUnreferenced -> do + E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash - E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash - return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now - ) - (\current excluded -> - [ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ] - ) + E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) - E.delete . E.from $ \fileContentChunkUnreferenced -> do - E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do - E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash - return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash - E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) + let + getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do + let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do + E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) + return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince + E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince - let - getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do - let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do - E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash - E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash - E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) - return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince - E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince + E.groupBy $ fileContentEntry E.^. FileContentEntryHash + E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ] - E.groupBy $ fileContentEntry E.^. FileContentEntryHash - E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ] - - return $ fileContentEntry E.^. FileContentEntryHash + return $ fileContentEntry E.^. FileContentEntryHash - deleteEntry :: _ -> DB (Sum Natural) - deleteEntry (E.Value fRef) = - bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef] - - Sum deletedEntries <- runConduit $ - getEntryCandidates - .| takeWhileTime (interval / 3) - .| C.mapM deleteEntry - .| C.fold - - when (deletedEntries > 0) $ - $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|] - - let - getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do - E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now) - E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> - E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash - - E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) + deleteEntry :: _ -> DB (Sum Natural) + deleteEntry (E.Value fRef) = + bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef] - return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash - , E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent) - ) + Sum deletedEntries <- runConduit $ + getEntryCandidates + .| takeWhileTime (interval / 3) + .| C.mapM deleteEntry + .| C.fold - deleteChunk :: _ -> DB (Sum Natural, Sum Word64) - deleteChunk (E.Value cRef, E.Value size) = do - deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ] - (, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef] + let + getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do + E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now) + E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash - (Sum deletedChunks, Sum deletedChunkSize) <- runConduit $ - getChunkCandidates - .| takeWhileTime (interval / 3) - .| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64) - .| C.mapM deleteChunk - .| C.fold + E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) - when (deletedChunks > 0 || deletedChunkSize > 0) $ - $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|] + return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash + , E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent) + ) + + deleteChunk :: _ -> DB (Sum Natural, Sum Word64) + deleteChunk (E.Value cRef, E.Value size) = do + deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ] + (, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef] + + (Sum deletedChunks, Sum deletedChunkSize) <- runConduit $ + getChunkCandidates + .| takeWhileTime (interval / 3) + .| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64) + .| C.mapM deleteChunk + .| C.fold + + return (deletedEntries, deletedChunks, deletedChunkSize) + + fin (deletedEntries, deletedChunks, deletedChunkSize) = do + observeDeletedUnreferencedFiles deletedEntries + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|] + observeDeletedUnreferencedChunks deletedChunks deletedChunkSize + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|] dispatchJobInjectFiles :: JobHandler UniWorX @@ -245,10 +251,10 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do .| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) .| C.mapM (lift . injectOrDelete) + .| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeRechunkedFiles inj sz) .| C.fold - when (injectedFiles > 0 || injectedSize > 0) $ - $logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{tshow injectedSize} bytes)|] + $logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{tshow injectedSize} bytes)|] data RechunkFileException @@ -258,38 +264,42 @@ data RechunkFileException deriving anyclass (Exception) dispatchJobRechunkFiles :: JobHandler UniWorX -dispatchJobRechunkFiles = JobHandlerAtomic . hoist lift $ do - interval <- getsYesod $ view _appRechunkFiles - let - getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do - E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do - E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash - E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash - E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased - - let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do +dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + interval <- getsYesod $ view _appRechunkFiles + let + getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do + E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash - return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64)) - - return ( fileContentEntry E.^. FileContentEntryHash - , size - ) + E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased - rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64) - rechunkFile fRef sz = do - fRef' <- sinkFileDB True $ sourceFileDB fRef - unless (fRef == fRef') $ - throwM $ RechunkFileExceptionHashMismatch fRef fRef' - return (Sum 1, Sum sz) + let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do + E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash + E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash + return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64)) - (Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $ - getEntryCandidates - .| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz) - .| maybe (C.map id) (takeWhileTime . (/ 2)) interval - .| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64) - .| C.mapM (uncurry rechunkFile) - .| C.fold - - when (rechunkedFiles > 0 || rechunkedSize > 0) $ - $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{tshow rechunkedSize} bytes)|] + return ( fileContentEntry E.^. FileContentEntryHash + , size + ) + + rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64) + rechunkFile fRef sz = do + fRef' <- sinkFileDB True $ sourceFileDB fRef + unless (fRef == fRef') $ + throwM $ RechunkFileExceptionHashMismatch fRef fRef' + return (Sum 1, Sum sz) + + (Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $ + getEntryCandidates + .| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz) + .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + .| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64) + .| C.mapM (uncurry rechunkFile) + .| C.fold + + return (rechunkedFiles, rechunkedSize) + fin (rechunkedFiles, rechunkedSize) = do + observeRechunkedFiles rechunkedFiles rechunkedSize + $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{tshow rechunkedSize} bytes)|] diff --git a/src/Jobs/Handler/PersonalisedSheetFiles.hs b/src/Jobs/Handler/PersonalisedSheetFiles.hs index 35bd8cd61..af906e61f 100644 --- a/src/Jobs/Handler/PersonalisedSheetFiles.hs +++ b/src/Jobs/Handler/PersonalisedSheetFiles.hs @@ -8,8 +8,10 @@ import Database.Persist.Sql (deleteWhereCount) dispatchJobPruneFallbackPersonalisedSheetFilesKeys :: JobHandler UniWorX -dispatchJobPruneFallbackPersonalisedSheetFilesKeys = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - expires <- getsYesod $ view _appFallbackPersonalisedSheetFilesKeysExpire - n <- deleteWhereCount [ FallbackPersonalisedSheetFilesKeyGenerated <. addUTCTime (- expires) now ] - $logInfoS "PruneFallbackPersonalisedSheetFilesKeys" [st|Deleted #{n} expired fallback personalised sheet files keys|] +dispatchJobPruneFallbackPersonalisedSheetFilesKeys = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + now <- liftIO getCurrentTime + expires <- getsYesod $ view _appFallbackPersonalisedSheetFilesKeysExpire + deleteWhereCount [ FallbackPersonalisedSheetFilesKeyGenerated <. addUTCTime (- expires) now ] + fin n = $logInfoS "PruneFallbackPersonalisedSheetFilesKeys" [st|Deleted #{n} expired fallback personalised sheet files keys|] diff --git a/src/Jobs/Handler/PruneInvitations.hs b/src/Jobs/Handler/PruneInvitations.hs index e7516f204..756b56d0a 100644 --- a/src/Jobs/Handler/PruneInvitations.hs +++ b/src/Jobs/Handler/PruneInvitations.hs @@ -7,7 +7,9 @@ import Import import Database.Persist.Sql (deleteWhereCount) dispatchJobPruneInvitations :: JobHandler UniWorX -dispatchJobPruneInvitations = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - n <- deleteWhereCount [ InvitationExpiresAt <. Just now ] - $logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|] +dispatchJobPruneInvitations = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + now <- liftIO getCurrentTime + deleteWhereCount [ InvitationExpiresAt <. Just now ] + fin n = $logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|] diff --git a/src/Jobs/Handler/TransactionLog.hs b/src/Jobs/Handler/TransactionLog.hs index 131ba2491..ae8d8fa96 100644 --- a/src/Jobs/Handler/TransactionLog.hs +++ b/src/Jobs/Handler/TransactionLog.hs @@ -9,23 +9,27 @@ import Handler.Utils.DateTime import Database.Persist.Sql (updateWhereCount, deleteWhereCount) dispatchJobTruncateTransactionLog, dispatchJobDeleteTransactionLogIPs :: JobHandler UniWorX -dispatchJobTruncateTransactionLog = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - let localNow = utcToLocalTime now - (localCurrentYear, _, _) = toGregorian $ localDay localNow - localStartOfPreviousYear = LocalTime (fromGregorian (pred localCurrentYear) 1 1) midnight - (currentYear, _, _) = toGregorian $ utctDay now - startOfPreviousYear = UTCTime (fromGregorian (pred currentYear) 1 1) 0 - startOfPreviousYear' = case localTimeToUTC localStartOfPreviousYear of - LTUUnique utc' _ -> utc' - _other -> startOfPreviousYear +dispatchJobTruncateTransactionLog = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + now <- liftIO getCurrentTime + let localNow = utcToLocalTime now + (localCurrentYear, _, _) = toGregorian $ localDay localNow + localStartOfPreviousYear = LocalTime (fromGregorian (pred localCurrentYear) 1 1) midnight + (currentYear, _, _) = toGregorian $ utctDay now + startOfPreviousYear = UTCTime (fromGregorian (pred currentYear) 1 1) 0 + startOfPreviousYear' = case localTimeToUTC localStartOfPreviousYear of + LTUUnique utc' _ -> utc' + _other -> startOfPreviousYear - n <- deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ] - $logInfoS "TruncateTransactionLog" [st|Deleted #{n} transaction log entries|] -dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime - let cutoff = addUTCTime (- retentionTime) now + deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ] + fin n = $logInfoS "TruncateTransactionLog" [st|Deleted #{n} transaction log entries|] +dispatchJobDeleteTransactionLogIPs = JobHandlerAtomicWithFinalizer act fin + where + act = hoist lift $ do + now <- liftIO getCurrentTime + retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime + let cutoff = addUTCTime (- retentionTime) now - n <- updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ] - $logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|] + updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ] + fin n = $logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|] diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 59a15c99a..ee1c9aa3b 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -11,11 +11,10 @@ module Jobs.Types , JobContext(..) , JobState(..) , jobWorkerNames - , JobWorkerState(..) - , withJobWorkerState + , JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob , JobWorkerId , showWorkerId, newWorkerId - , JobQueue, jqInsert, jqDequeue + , JobQueue, jqInsert, jqDequeue, jqDepth , JobPriority(..), prioritiseJob , jobNoQueueSame , module Cron @@ -38,10 +37,6 @@ import qualified Data.Set as Set import Data.PQueue.Prio.Max (MaxPQueue) import qualified Data.PQueue.Prio.Max as PQ -import Utils.Metrics (withJobWorkerStateLbls) - -import qualified Prometheus (Label4) - import Cron (CronNextMatch(..), _MatchAsap, _MatchAt, _MatchNone) @@ -179,7 +174,8 @@ type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJo data JobHandler site = JobHandlerAtomic (YesodJobDB site ()) | JobHandlerException (HandlerFor site ()) - deriving (Generic, Typeable) + | forall a. JobHandlerAtomicWithFinalizer (YesodJobDB site a) (a -> HandlerFor site ()) + deriving (Typeable) makePrisms ''JobHandler @@ -199,20 +195,6 @@ deriveJSON defaultOptions , sumEncoding = TaggedObject "state" "data" } ''JobWorkerState -classifyJobWorkerState :: JobWorkerId -> JobWorkerState -> Prometheus.Label4 -classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCtl, maybe "n/a" pack mJob) - where - Aeson.Object obj = Aeson.toJSON jws - Aeson.String tag = obj HashMap.! "state" - mJobCtl = asum - [ classifyJobCtl <$> jws ^? _jobWorkerJobCtl - , "perform" <$ jws ^? _jobWorkerJob - ] - mJob = classifyJob <$> jws ^? _jobWorkerJob - -withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> JobWorkerState -> m a -> m a -withJobWorkerState wId newSt = withJobWorkerStateLbls $ classifyJobWorkerState wId newSt - newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique } deriving (Eq, Ord) @@ -273,6 +255,9 @@ jqInsert job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job) job jqDequeue :: JobQueue -> Maybe (JobCtl, JobQueue) jqDequeue = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxView . getJobQueue +jqDepth :: Integral n => JobQueue -> n +jqDepth = fromIntegral . PQ.size . getJobQueue + data JobState = JobState { jobWorkers :: Map (Async ()) (TVar JobQueue) diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 443b816ea..16c6f994c 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -1,14 +1,18 @@ +{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-} + module Utils.Metrics ( withHealthReportMetrics , registerGHCMetrics , observeHTTPRequestLatency , registerReadyMetric - , registerJobHeldLocksCount - , withJobWorkerStateLbls + , withJobWorkerState , observeYesodCacheSize , observeFavouritesQuickActionsDuration , LoginOutcome(..), observeLoginOutcome + , registerJobHeldLocksCount , FileChunkStorage(..), observeSourcedChunk, observeSunkChunk + , observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles + , registerJobWorkerQueueDepth ) where import Import.NoModel hiding (Vector, Info) @@ -29,6 +33,11 @@ import Yesod.Core.Types (HandlerData(..), GHState(..)) import qualified Data.Set as Set +import Jobs.Types + +import qualified Data.Aeson as Aeson +import qualified Data.HashMap.Strict as HashMap + {-# ANN module ("HLint: ignore Use even" :: String) #-} @@ -130,15 +139,75 @@ jobHeldLocksCount heldLocks = Metric $ return (MkJobHeldLocksCount, collectJobHe sourcedFileChunkSizes :: Vector Label1 Histogram sourcedFileChunkSizes = unsafeRegister . vector "storage" $ histogram info buckets where info = Info "uni2work_sourced_file_chunks_bytes" - "Sizes of files chunks sourced" - buckets = 0 : histogramBuckets 1 20000000 + "Sizes of file chunks sourced" + buckets = 0 : histogramBuckets 1 1000000000 {-# NOINLINE sunkFileChunkSizes #-} sunkFileChunkSizes :: Vector Label1 Histogram sunkFileChunkSizes = unsafeRegister . vector "storage" $ histogram info buckets where info = Info "uni2work_sunk_file_chunks_bytes" - "Sizes of files chunks sunk" - buckets = 0 : histogramBuckets 1 100000000 + "Sizes of file chunks sunk" + buckets = 0 : histogramBuckets 1 1000000000 + +{-# NOINLINE deletedUnreferencedFiles #-} +deletedUnreferencedFiles :: Counter +deletedUnreferencedFiles = unsafeRegister $ counter info + where info = Info "uni2work_deleted_unreferenced_files_count" + "Number of unreferenced files deleted" + +{-# NOINLINE deletedUnreferencedChunks #-} +deletedUnreferencedChunks :: Counter +deletedUnreferencedChunks = unsafeRegister $ counter info + where info = Info "uni2work_deleted_unreferenced_chunks_count" + "Number of unreferenced chunks deleted" + +{-# NOINLINE deletedUnreferencedChunksBytes #-} +deletedUnreferencedChunksBytes :: Counter +deletedUnreferencedChunksBytes = unsafeRegister $ counter info + where info = Info "uni2work_deleted_unreferenced_chunks_bytes" + "Size of unreferenced chunks deleted" + +{-# NOINLINE injectedFiles #-} +injectedFiles :: Counter +injectedFiles = unsafeRegister $ counter info + where info = Info "uni2work_injected_files_count" + "Number of files injected from upload cache into database" + +{-# NOINLINE injectedFilesBytes #-} +injectedFilesBytes :: Counter +injectedFilesBytes = unsafeRegister $ counter info + where info = Info "uni2work_injected_files_bytes" + "Size of files injected from upload cache into database" + +{-# NOINLINE rechunkedFiles #-} +rechunkedFiles :: Counter +rechunkedFiles = unsafeRegister $ counter info + where info = Info "uni2work_rechunked_files_count" + "Number of files rechunked within database" + +{-# NOINLINE rechunkedFilesBytes #-} +rechunkedFilesBytes :: Counter +rechunkedFilesBytes = unsafeRegister $ counter info + where info = Info "uni2work_rechunked_files_bytes" + "Size of files rechunked within database" + +data JobWorkerQueueDepth = MkJobWorkerQueueDepth + +jobWorkerQueueDepth :: TMVar JobState -> Metric JobWorkerQueueDepth +jobWorkerQueueDepth jSt = Metric $ return (MkJobWorkerQueueDepth, collectJobWorkerQueueDepth) + where + collectJobWorkerQueueDepth = maybeT (return []) $ do + wQueues <- hoist atomically $ do + JobState{..} <- MaybeT $ tryReadTMVar jSt + flip ifoldMapM jobWorkers $ \wAsync wQueue + -> lift $ pure . (jobWorkerName wAsync, ) . jqDepth <$> readTVar wQueue + return [ SampleGroup info GaugeType + [ Sample "uni2work_queued_jobs_count" [("worker", showWorkerId wName)] . encodeUtf8 $ tshow wDepth + | (wName, wDepth) <- wQueues + ] + ] + info = Info "uni2work_queued_jobs_count" + "Number of JobQueue entries in this Uni2work-instance" withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport withHealthReportMetrics act = do @@ -181,6 +250,20 @@ observeHTTPRequestLatency classifyHandler app req respond' = do registerReadyMetric :: MonadIO m => m () registerReadyMetric = liftIO $ void . register . readyMetric =<< getPOSIXTime +classifyJobWorkerState :: JobWorkerId -> JobWorkerState -> Prometheus.Label4 +classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCtl, maybe "n/a" pack mJob) + where + Aeson.Object obj = Aeson.toJSON jws + Aeson.String tag = obj HashMap.! "state" + mJobCtl = asum + [ classifyJobCtl <$> jws ^? _jobWorkerJobCtl + , "perform" <$ jws ^? _jobWorkerJob + ] + mJob = classifyJob <$> jws ^? _jobWorkerJob + +withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> JobWorkerState -> m a -> m a +withJobWorkerState wId newSt = withJobWorkerStateLbls $ classifyJobWorkerState wId newSt + withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Label4 -> m a -> m a withJobWorkerStateLbls newLbls act = do liftIO $ withLabel jobWorkerStateTransitions newLbls incCounter @@ -240,3 +323,24 @@ observeSourcedChunk store = liftIO . observeChunkSize sourcedFileChunkSizes stor observeChunkSize :: Vector Label1 Histogram -> FileChunkStorage -> Integer -> IO () observeChunkSize metric (toPathPiece -> storageLabel) = withLabel metric storageLabel . flip observe . fromInteger + +observeDeletedUnreferencedFiles :: MonadIO m => Natural -> m () +observeDeletedUnreferencedFiles = liftIO . void . addCounter deletedUnreferencedFiles . fromIntegral + +observeDeletedUnreferencedChunks :: MonadIO m => Natural -> Word64 -> m () +observeDeletedUnreferencedChunks num size = liftIO $ do + void . addCounter deletedUnreferencedChunks $ fromIntegral num + void . addCounter deletedUnreferencedChunksBytes $ fromIntegral size + +observeInjectedFiles :: MonadIO m => Natural -> Word64 -> m () +observeInjectedFiles num size = liftIO $ do + void . addCounter injectedFiles $ fromIntegral num + void . addCounter injectedFilesBytes $ fromIntegral size + +observeRechunkedFiles :: MonadIO m => Natural -> Word64 -> m () +observeRechunkedFiles num size = liftIO $ do + void . addCounter rechunkedFiles $ fromIntegral num + void . addCounter rechunkedFilesBytes $ fromIntegral size + +registerJobWorkerQueueDepth :: MonadIO m => TMVar JobState -> m () +registerJobWorkerQueueDepth = liftIO . void . register . jobWorkerQueueDepth