feat: improve logging/metrics wrt. batch jobs
This commit is contained in:
parent
8afacdd6b7
commit
d21faf4de0
@ -102,6 +102,7 @@ handleJobs foundation@UniWorX{..}
|
||||
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
||||
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
||||
registerJobHeldLocksCount jobHeldLocks
|
||||
registerJobWorkerQueueDepth appJobState
|
||||
atomically $ putTMVar appJobState JobState
|
||||
{ jobContext = JobContext{..}
|
||||
, ..
|
||||
@ -452,6 +453,12 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
|
||||
JobHandlerException act -> do
|
||||
act & withJobWorkerState wNum (JobWorkerExecJob content)
|
||||
runDB $ setSerializable cleanup
|
||||
JobHandlerAtomicWithFinalizer act fin -> do
|
||||
res <- runDBJobs . setSerializable $ do
|
||||
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
|
||||
hoist lift cleanup
|
||||
return res
|
||||
fin res
|
||||
handleCmd JobCtlDetermineCrontab = do
|
||||
newCTab <- liftHandler . runDB $ setSerializable determineCrontab'
|
||||
-- logDebugS logIdent $ tshow newCTab
|
||||
|
||||
@ -34,11 +34,13 @@ import Handler.Utils.Files (sourceFileDB)
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
expires <- getsYesod $ view _appSessionFilesExpire
|
||||
n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
|
||||
$logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
expires <- getsYesod $ view _appSessionFilesExpire
|
||||
deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
|
||||
fin n = $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
|
||||
|
||||
|
||||
|
||||
@ -64,142 +66,146 @@ pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChu
|
||||
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
|
||||
|
||||
dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX
|
||||
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
|
||||
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
|
||||
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
|
||||
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
|
||||
|
||||
let
|
||||
chunkHashBytes :: forall h.
|
||||
( Unwrapped FileContentChunkReference ~ Digest h )
|
||||
=> Integer
|
||||
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
|
||||
chunkHashBits = chunkHashBytes * 8
|
||||
base :: Integer
|
||||
base = 2 ^ chunkHashBits
|
||||
intervals :: [Integer]
|
||||
-- | Exclusive upper bounds
|
||||
intervals
|
||||
| numIterations <= 0 = pure base
|
||||
| otherwise = go protoIntervals ^.. folded . _1
|
||||
where
|
||||
go [] = []
|
||||
go ints
|
||||
| maximumOf (folded . _1) ints == Just base = ints
|
||||
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
|
||||
where
|
||||
closest = maximumBy (comparing $ view _2) ints
|
||||
(lts, geqs) = partition (((>) `on` view _1) closest) ints
|
||||
gts = filter (((<) `on` view _1) closest) geqs
|
||||
let
|
||||
chunkHashBytes :: forall h.
|
||||
( Unwrapped FileContentChunkReference ~ Digest h )
|
||||
=> Integer
|
||||
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
|
||||
chunkHashBits = chunkHashBytes * 8
|
||||
base :: Integer
|
||||
base = 2 ^ chunkHashBits
|
||||
intervals :: [Integer]
|
||||
-- | Exclusive upper bounds
|
||||
protoIntervals :: [(Integer, Integer)]
|
||||
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
|
||||
| i <- [1 .. toInteger numIterations]
|
||||
]
|
||||
intervals
|
||||
| numIterations <= 0 = pure base
|
||||
| otherwise = go protoIntervals ^.. folded . _1
|
||||
where
|
||||
go [] = []
|
||||
go ints
|
||||
| maximumOf (folded . _1) ints == Just base = ints
|
||||
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
|
||||
where
|
||||
closest = maximumBy (comparing $ view _2) ints
|
||||
(lts, geqs) = partition (((>) `on` view _1) closest) ints
|
||||
gts = filter (((<) `on` view _1) closest) geqs
|
||||
-- | Exclusive upper bounds
|
||||
protoIntervals :: [(Integer, Integer)]
|
||||
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
|
||||
| i <- [1 .. toInteger numIterations]
|
||||
]
|
||||
|
||||
intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals
|
||||
intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals
|
||||
|
||||
toDigest :: Integer -> Maybe FileContentChunkReference
|
||||
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
|
||||
where step i
|
||||
| i <= 0 = Nothing
|
||||
| otherwise = Just (fromIntegral i, i `shiftR` 8)
|
||||
pad bs
|
||||
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
|
||||
| otherwise = pad $ ByteString.cons 0 bs
|
||||
toDigest :: Integer -> Maybe FileContentChunkReference
|
||||
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
|
||||
where step i
|
||||
| i <= 0 = Nothing
|
||||
| otherwise = Just (fromIntegral i, i `shiftR` 8)
|
||||
pad bs
|
||||
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
|
||||
| otherwise = pad $ ByteString.cons 0 bs
|
||||
|
||||
intervalsDgsts <- atomically $ do
|
||||
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
|
||||
case Map.lookup numIterations cachedDgsts of
|
||||
Just c -> return c
|
||||
Nothing -> do
|
||||
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
|
||||
return intervalsDgsts'
|
||||
intervalsDgsts <- atomically $ do
|
||||
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
|
||||
case Map.lookup numIterations cachedDgsts of
|
||||
Just c -> return c
|
||||
Nothing -> do
|
||||
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
|
||||
return intervalsDgsts'
|
||||
|
||||
let
|
||||
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
|
||||
|
||||
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
|
||||
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
|
||||
chunkIdFilter cRef = E.and $ catMaybes
|
||||
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
|
||||
, maxBoundDgst <&> \b -> cRef E.<. E.val b
|
||||
]
|
||||
let
|
||||
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
|
||||
|
||||
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
|
||||
|
||||
E.insertSelectWithConflict
|
||||
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
|
||||
(E.from $ \fileContentChunk -> do
|
||||
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
|
||||
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
|
||||
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
|
||||
chunkIdFilter cRef = E.and $ catMaybes
|
||||
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
|
||||
, maxBoundDgst <&> \b -> cRef E.<. E.val b
|
||||
]
|
||||
|
||||
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
|
||||
|
||||
E.insertSelectWithConflict
|
||||
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
|
||||
(E.from $ \fileContentChunk -> do
|
||||
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
|
||||
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
|
||||
)
|
||||
(\current excluded ->
|
||||
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
|
||||
)
|
||||
|
||||
E.delete . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
|
||||
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
|
||||
)
|
||||
(\current excluded ->
|
||||
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
|
||||
)
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
|
||||
E.delete . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
|
||||
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
|
||||
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
|
||||
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
|
||||
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
|
||||
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
|
||||
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
|
||||
|
||||
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
|
||||
|
||||
return $ fileContentEntry E.^. FileContentEntryHash
|
||||
return $ fileContentEntry E.^. FileContentEntryHash
|
||||
|
||||
deleteEntry :: _ -> DB (Sum Natural)
|
||||
deleteEntry (E.Value fRef) =
|
||||
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
|
||||
|
||||
Sum deletedEntries <- runConduit $
|
||||
getEntryCandidates
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| C.mapM deleteEntry
|
||||
.| C.fold
|
||||
|
||||
when (deletedEntries > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
|
||||
|
||||
let
|
||||
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
|
||||
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
deleteEntry :: _ -> DB (Sum Natural)
|
||||
deleteEntry (E.Value fRef) =
|
||||
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
|
||||
|
||||
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
|
||||
)
|
||||
Sum deletedEntries <- runConduit $
|
||||
getEntryCandidates
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| C.mapM deleteEntry
|
||||
.| C.fold
|
||||
|
||||
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
|
||||
deleteChunk (E.Value cRef, E.Value size) = do
|
||||
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
|
||||
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
|
||||
let
|
||||
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
|
||||
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
|
||||
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
|
||||
getChunkCandidates
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
|
||||
.| C.mapM deleteChunk
|
||||
.| C.fold
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
|
||||
when (deletedChunks > 0 || deletedChunkSize > 0) $
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|]
|
||||
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
|
||||
)
|
||||
|
||||
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
|
||||
deleteChunk (E.Value cRef, E.Value size) = do
|
||||
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
|
||||
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
|
||||
|
||||
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
|
||||
getChunkCandidates
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
|
||||
.| C.mapM deleteChunk
|
||||
.| C.fold
|
||||
|
||||
return (deletedEntries, deletedChunks, deletedChunkSize)
|
||||
|
||||
fin (deletedEntries, deletedChunks, deletedChunkSize) = do
|
||||
observeDeletedUnreferencedFiles deletedEntries
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
|
||||
observeDeletedUnreferencedChunks deletedChunks deletedChunkSize
|
||||
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|]
|
||||
|
||||
|
||||
dispatchJobInjectFiles :: JobHandler UniWorX
|
||||
@ -245,10 +251,10 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
|
||||
.| C.mapM (lift . injectOrDelete)
|
||||
.| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeRechunkedFiles inj sz)
|
||||
.| C.fold
|
||||
|
||||
when (injectedFiles > 0 || injectedSize > 0) $
|
||||
$logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{tshow injectedSize} bytes)|]
|
||||
$logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{tshow injectedSize} bytes)|]
|
||||
|
||||
|
||||
data RechunkFileException
|
||||
@ -258,38 +264,42 @@ data RechunkFileException
|
||||
deriving anyclass (Exception)
|
||||
|
||||
dispatchJobRechunkFiles :: JobHandler UniWorX
|
||||
dispatchJobRechunkFiles = JobHandlerAtomic . hoist lift $ do
|
||||
interval <- getsYesod $ view _appRechunkFiles
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
|
||||
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
|
||||
|
||||
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
interval <- getsYesod $ view _appRechunkFiles
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
|
||||
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
|
||||
|
||||
return ( fileContentEntry E.^. FileContentEntryHash
|
||||
, size
|
||||
)
|
||||
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
|
||||
|
||||
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
|
||||
rechunkFile fRef sz = do
|
||||
fRef' <- sinkFileDB True $ sourceFileDB fRef
|
||||
unless (fRef == fRef') $
|
||||
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
|
||||
return (Sum 1, Sum sz)
|
||||
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
|
||||
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
|
||||
|
||||
(Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $
|
||||
getEntryCandidates
|
||||
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
|
||||
.| C.mapM (uncurry rechunkFile)
|
||||
.| C.fold
|
||||
|
||||
when (rechunkedFiles > 0 || rechunkedSize > 0) $
|
||||
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{tshow rechunkedSize} bytes)|]
|
||||
return ( fileContentEntry E.^. FileContentEntryHash
|
||||
, size
|
||||
)
|
||||
|
||||
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
|
||||
rechunkFile fRef sz = do
|
||||
fRef' <- sinkFileDB True $ sourceFileDB fRef
|
||||
unless (fRef == fRef') $
|
||||
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
|
||||
return (Sum 1, Sum sz)
|
||||
|
||||
(Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $
|
||||
getEntryCandidates
|
||||
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
|
||||
.| C.mapM (uncurry rechunkFile)
|
||||
.| C.fold
|
||||
|
||||
return (rechunkedFiles, rechunkedSize)
|
||||
fin (rechunkedFiles, rechunkedSize) = do
|
||||
observeRechunkedFiles rechunkedFiles rechunkedSize
|
||||
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{tshow rechunkedSize} bytes)|]
|
||||
|
||||
@ -8,8 +8,10 @@ import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
|
||||
dispatchJobPruneFallbackPersonalisedSheetFilesKeys :: JobHandler UniWorX
|
||||
dispatchJobPruneFallbackPersonalisedSheetFilesKeys = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
expires <- getsYesod $ view _appFallbackPersonalisedSheetFilesKeysExpire
|
||||
n <- deleteWhereCount [ FallbackPersonalisedSheetFilesKeyGenerated <. addUTCTime (- expires) now ]
|
||||
$logInfoS "PruneFallbackPersonalisedSheetFilesKeys" [st|Deleted #{n} expired fallback personalised sheet files keys|]
|
||||
dispatchJobPruneFallbackPersonalisedSheetFilesKeys = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
expires <- getsYesod $ view _appFallbackPersonalisedSheetFilesKeysExpire
|
||||
deleteWhereCount [ FallbackPersonalisedSheetFilesKeyGenerated <. addUTCTime (- expires) now ]
|
||||
fin n = $logInfoS "PruneFallbackPersonalisedSheetFilesKeys" [st|Deleted #{n} expired fallback personalised sheet files keys|]
|
||||
|
||||
@ -7,7 +7,9 @@ import Import
|
||||
import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
dispatchJobPruneInvitations :: JobHandler UniWorX
|
||||
dispatchJobPruneInvitations = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
n <- deleteWhereCount [ InvitationExpiresAt <. Just now ]
|
||||
$logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|]
|
||||
dispatchJobPruneInvitations = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
deleteWhereCount [ InvitationExpiresAt <. Just now ]
|
||||
fin n = $logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|]
|
||||
|
||||
@ -9,23 +9,27 @@ import Handler.Utils.DateTime
|
||||
import Database.Persist.Sql (updateWhereCount, deleteWhereCount)
|
||||
|
||||
dispatchJobTruncateTransactionLog, dispatchJobDeleteTransactionLogIPs :: JobHandler UniWorX
|
||||
dispatchJobTruncateTransactionLog = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
let localNow = utcToLocalTime now
|
||||
(localCurrentYear, _, _) = toGregorian $ localDay localNow
|
||||
localStartOfPreviousYear = LocalTime (fromGregorian (pred localCurrentYear) 1 1) midnight
|
||||
(currentYear, _, _) = toGregorian $ utctDay now
|
||||
startOfPreviousYear = UTCTime (fromGregorian (pred currentYear) 1 1) 0
|
||||
startOfPreviousYear' = case localTimeToUTC localStartOfPreviousYear of
|
||||
LTUUnique utc' _ -> utc'
|
||||
_other -> startOfPreviousYear
|
||||
dispatchJobTruncateTransactionLog = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
let localNow = utcToLocalTime now
|
||||
(localCurrentYear, _, _) = toGregorian $ localDay localNow
|
||||
localStartOfPreviousYear = LocalTime (fromGregorian (pred localCurrentYear) 1 1) midnight
|
||||
(currentYear, _, _) = toGregorian $ utctDay now
|
||||
startOfPreviousYear = UTCTime (fromGregorian (pred currentYear) 1 1) 0
|
||||
startOfPreviousYear' = case localTimeToUTC localStartOfPreviousYear of
|
||||
LTUUnique utc' _ -> utc'
|
||||
_other -> startOfPreviousYear
|
||||
|
||||
n <- deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ]
|
||||
$logInfoS "TruncateTransactionLog" [st|Deleted #{n} transaction log entries|]
|
||||
dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime
|
||||
let cutoff = addUTCTime (- retentionTime) now
|
||||
deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ]
|
||||
fin n = $logInfoS "TruncateTransactionLog" [st|Deleted #{n} transaction log entries|]
|
||||
dispatchJobDeleteTransactionLogIPs = JobHandlerAtomicWithFinalizer act fin
|
||||
where
|
||||
act = hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime
|
||||
let cutoff = addUTCTime (- retentionTime) now
|
||||
|
||||
n <- updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ]
|
||||
$logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|]
|
||||
updateWhereCount [ TransactionLogTime <. cutoff, TransactionLogRemote !=. Nothing ] [ TransactionLogRemote =. Nothing ]
|
||||
fin n = $logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|]
|
||||
|
||||
@ -11,11 +11,10 @@ module Jobs.Types
|
||||
, JobContext(..)
|
||||
, JobState(..)
|
||||
, jobWorkerNames
|
||||
, JobWorkerState(..)
|
||||
, withJobWorkerState
|
||||
, JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob
|
||||
, JobWorkerId
|
||||
, showWorkerId, newWorkerId
|
||||
, JobQueue, jqInsert, jqDequeue
|
||||
, JobQueue, jqInsert, jqDequeue, jqDepth
|
||||
, JobPriority(..), prioritiseJob
|
||||
, jobNoQueueSame
|
||||
, module Cron
|
||||
@ -38,10 +37,6 @@ import qualified Data.Set as Set
|
||||
import Data.PQueue.Prio.Max (MaxPQueue)
|
||||
import qualified Data.PQueue.Prio.Max as PQ
|
||||
|
||||
import Utils.Metrics (withJobWorkerStateLbls)
|
||||
|
||||
import qualified Prometheus (Label4)
|
||||
|
||||
import Cron (CronNextMatch(..), _MatchAsap, _MatchAt, _MatchNone)
|
||||
|
||||
|
||||
@ -179,7 +174,8 @@ type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJo
|
||||
data JobHandler site
|
||||
= JobHandlerAtomic (YesodJobDB site ())
|
||||
| JobHandlerException (HandlerFor site ())
|
||||
deriving (Generic, Typeable)
|
||||
| forall a. JobHandlerAtomicWithFinalizer (YesodJobDB site a) (a -> HandlerFor site ())
|
||||
deriving (Typeable)
|
||||
|
||||
makePrisms ''JobHandler
|
||||
|
||||
@ -199,20 +195,6 @@ deriveJSON defaultOptions
|
||||
, sumEncoding = TaggedObject "state" "data"
|
||||
} ''JobWorkerState
|
||||
|
||||
classifyJobWorkerState :: JobWorkerId -> JobWorkerState -> Prometheus.Label4
|
||||
classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCtl, maybe "n/a" pack mJob)
|
||||
where
|
||||
Aeson.Object obj = Aeson.toJSON jws
|
||||
Aeson.String tag = obj HashMap.! "state"
|
||||
mJobCtl = asum
|
||||
[ classifyJobCtl <$> jws ^? _jobWorkerJobCtl
|
||||
, "perform" <$ jws ^? _jobWorkerJob
|
||||
]
|
||||
mJob = classifyJob <$> jws ^? _jobWorkerJob
|
||||
|
||||
withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> JobWorkerState -> m a -> m a
|
||||
withJobWorkerState wId newSt = withJobWorkerStateLbls $ classifyJobWorkerState wId newSt
|
||||
|
||||
|
||||
newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
|
||||
deriving (Eq, Ord)
|
||||
@ -273,6 +255,9 @@ jqInsert job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job) job
|
||||
jqDequeue :: JobQueue -> Maybe (JobCtl, JobQueue)
|
||||
jqDequeue = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxView . getJobQueue
|
||||
|
||||
jqDepth :: Integral n => JobQueue -> n
|
||||
jqDepth = fromIntegral . PQ.size . getJobQueue
|
||||
|
||||
|
||||
data JobState = JobState
|
||||
{ jobWorkers :: Map (Async ()) (TVar JobQueue)
|
||||
|
||||
@ -1,14 +1,18 @@
|
||||
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
|
||||
|
||||
module Utils.Metrics
|
||||
( withHealthReportMetrics
|
||||
, registerGHCMetrics
|
||||
, observeHTTPRequestLatency
|
||||
, registerReadyMetric
|
||||
, registerJobHeldLocksCount
|
||||
, withJobWorkerStateLbls
|
||||
, withJobWorkerState
|
||||
, observeYesodCacheSize
|
||||
, observeFavouritesQuickActionsDuration
|
||||
, LoginOutcome(..), observeLoginOutcome
|
||||
, registerJobHeldLocksCount
|
||||
, FileChunkStorage(..), observeSourcedChunk, observeSunkChunk
|
||||
, observeDeletedUnreferencedFiles, observeDeletedUnreferencedChunks, observeInjectedFiles, observeRechunkedFiles
|
||||
, registerJobWorkerQueueDepth
|
||||
) where
|
||||
|
||||
import Import.NoModel hiding (Vector, Info)
|
||||
@ -29,6 +33,11 @@ import Yesod.Core.Types (HandlerData(..), GHState(..))
|
||||
|
||||
import qualified Data.Set as Set
|
||||
|
||||
import Jobs.Types
|
||||
|
||||
import qualified Data.Aeson as Aeson
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
|
||||
{-# ANN module ("HLint: ignore Use even" :: String) #-}
|
||||
|
||||
|
||||
@ -130,15 +139,75 @@ jobHeldLocksCount heldLocks = Metric $ return (MkJobHeldLocksCount, collectJobHe
|
||||
sourcedFileChunkSizes :: Vector Label1 Histogram
|
||||
sourcedFileChunkSizes = unsafeRegister . vector "storage" $ histogram info buckets
|
||||
where info = Info "uni2work_sourced_file_chunks_bytes"
|
||||
"Sizes of files chunks sourced"
|
||||
buckets = 0 : histogramBuckets 1 20000000
|
||||
"Sizes of file chunks sourced"
|
||||
buckets = 0 : histogramBuckets 1 1000000000
|
||||
|
||||
{-# NOINLINE sunkFileChunkSizes #-}
|
||||
sunkFileChunkSizes :: Vector Label1 Histogram
|
||||
sunkFileChunkSizes = unsafeRegister . vector "storage" $ histogram info buckets
|
||||
where info = Info "uni2work_sunk_file_chunks_bytes"
|
||||
"Sizes of files chunks sunk"
|
||||
buckets = 0 : histogramBuckets 1 100000000
|
||||
"Sizes of file chunks sunk"
|
||||
buckets = 0 : histogramBuckets 1 1000000000
|
||||
|
||||
{-# NOINLINE deletedUnreferencedFiles #-}
|
||||
deletedUnreferencedFiles :: Counter
|
||||
deletedUnreferencedFiles = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_deleted_unreferenced_files_count"
|
||||
"Number of unreferenced files deleted"
|
||||
|
||||
{-# NOINLINE deletedUnreferencedChunks #-}
|
||||
deletedUnreferencedChunks :: Counter
|
||||
deletedUnreferencedChunks = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_deleted_unreferenced_chunks_count"
|
||||
"Number of unreferenced chunks deleted"
|
||||
|
||||
{-# NOINLINE deletedUnreferencedChunksBytes #-}
|
||||
deletedUnreferencedChunksBytes :: Counter
|
||||
deletedUnreferencedChunksBytes = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_deleted_unreferenced_chunks_bytes"
|
||||
"Size of unreferenced chunks deleted"
|
||||
|
||||
{-# NOINLINE injectedFiles #-}
|
||||
injectedFiles :: Counter
|
||||
injectedFiles = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_injected_files_count"
|
||||
"Number of files injected from upload cache into database"
|
||||
|
||||
{-# NOINLINE injectedFilesBytes #-}
|
||||
injectedFilesBytes :: Counter
|
||||
injectedFilesBytes = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_injected_files_bytes"
|
||||
"Size of files injected from upload cache into database"
|
||||
|
||||
{-# NOINLINE rechunkedFiles #-}
|
||||
rechunkedFiles :: Counter
|
||||
rechunkedFiles = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_rechunked_files_count"
|
||||
"Number of files rechunked within database"
|
||||
|
||||
{-# NOINLINE rechunkedFilesBytes #-}
|
||||
rechunkedFilesBytes :: Counter
|
||||
rechunkedFilesBytes = unsafeRegister $ counter info
|
||||
where info = Info "uni2work_rechunked_files_bytes"
|
||||
"Size of files rechunked within database"
|
||||
|
||||
data JobWorkerQueueDepth = MkJobWorkerQueueDepth
|
||||
|
||||
jobWorkerQueueDepth :: TMVar JobState -> Metric JobWorkerQueueDepth
|
||||
jobWorkerQueueDepth jSt = Metric $ return (MkJobWorkerQueueDepth, collectJobWorkerQueueDepth)
|
||||
where
|
||||
collectJobWorkerQueueDepth = maybeT (return []) $ do
|
||||
wQueues <- hoist atomically $ do
|
||||
JobState{..} <- MaybeT $ tryReadTMVar jSt
|
||||
flip ifoldMapM jobWorkers $ \wAsync wQueue
|
||||
-> lift $ pure . (jobWorkerName wAsync, ) . jqDepth <$> readTVar wQueue
|
||||
return [ SampleGroup info GaugeType
|
||||
[ Sample "uni2work_queued_jobs_count" [("worker", showWorkerId wName)] . encodeUtf8 $ tshow wDepth
|
||||
| (wName, wDepth) <- wQueues
|
||||
]
|
||||
]
|
||||
info = Info "uni2work_queued_jobs_count"
|
||||
"Number of JobQueue entries in this Uni2work-instance"
|
||||
|
||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||
withHealthReportMetrics act = do
|
||||
@ -181,6 +250,20 @@ observeHTTPRequestLatency classifyHandler app req respond' = do
|
||||
registerReadyMetric :: MonadIO m => m ()
|
||||
registerReadyMetric = liftIO $ void . register . readyMetric =<< getPOSIXTime
|
||||
|
||||
classifyJobWorkerState :: JobWorkerId -> JobWorkerState -> Prometheus.Label4
|
||||
classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCtl, maybe "n/a" pack mJob)
|
||||
where
|
||||
Aeson.Object obj = Aeson.toJSON jws
|
||||
Aeson.String tag = obj HashMap.! "state"
|
||||
mJobCtl = asum
|
||||
[ classifyJobCtl <$> jws ^? _jobWorkerJobCtl
|
||||
, "perform" <$ jws ^? _jobWorkerJob
|
||||
]
|
||||
mJob = classifyJob <$> jws ^? _jobWorkerJob
|
||||
|
||||
withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> JobWorkerState -> m a -> m a
|
||||
withJobWorkerState wId newSt = withJobWorkerStateLbls $ classifyJobWorkerState wId newSt
|
||||
|
||||
withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Label4 -> m a -> m a
|
||||
withJobWorkerStateLbls newLbls act = do
|
||||
liftIO $ withLabel jobWorkerStateTransitions newLbls incCounter
|
||||
@ -240,3 +323,24 @@ observeSourcedChunk store = liftIO . observeChunkSize sourcedFileChunkSizes stor
|
||||
|
||||
observeChunkSize :: Vector Label1 Histogram -> FileChunkStorage -> Integer -> IO ()
|
||||
observeChunkSize metric (toPathPiece -> storageLabel) = withLabel metric storageLabel . flip observe . fromInteger
|
||||
|
||||
observeDeletedUnreferencedFiles :: MonadIO m => Natural -> m ()
|
||||
observeDeletedUnreferencedFiles = liftIO . void . addCounter deletedUnreferencedFiles . fromIntegral
|
||||
|
||||
observeDeletedUnreferencedChunks :: MonadIO m => Natural -> Word64 -> m ()
|
||||
observeDeletedUnreferencedChunks num size = liftIO $ do
|
||||
void . addCounter deletedUnreferencedChunks $ fromIntegral num
|
||||
void . addCounter deletedUnreferencedChunksBytes $ fromIntegral size
|
||||
|
||||
observeInjectedFiles :: MonadIO m => Natural -> Word64 -> m ()
|
||||
observeInjectedFiles num size = liftIO $ do
|
||||
void . addCounter injectedFiles $ fromIntegral num
|
||||
void . addCounter injectedFilesBytes $ fromIntegral size
|
||||
|
||||
observeRechunkedFiles :: MonadIO m => Natural -> Word64 -> m ()
|
||||
observeRechunkedFiles num size = liftIO $ do
|
||||
void . addCounter rechunkedFiles $ fromIntegral num
|
||||
void . addCounter rechunkedFilesBytes $ fromIntegral size
|
||||
|
||||
registerJobWorkerQueueDepth :: MonadIO m => TMVar JobState -> m ()
|
||||
registerJobWorkerQueueDepth = liftIO . void . register . jobWorkerQueueDepth
|
||||
|
||||
Loading…
Reference in New Issue
Block a user