refactor(metrics): collect better metrics about jobs
This commit is contained in:
parent
432a77f705
commit
f0f046f4b6
@ -74,6 +74,7 @@ import Handler.Utils.Routes
|
||||
import Utils.Form
|
||||
import Utils.Sheet
|
||||
import Utils.SystemMessage
|
||||
import Utils.Metrics
|
||||
|
||||
import Text.Cassius (cassiusFile)
|
||||
|
||||
@ -1480,7 +1481,7 @@ instance Yesod UniWorX where
|
||||
-- b) Validates that incoming write requests include that token in either a header or POST parameter.
|
||||
-- To add it, chain it together with the defaultMiddleware: yesodMiddleware = defaultYesodMiddleware . defaultCsrfMiddleware
|
||||
-- For details, see the CSRF documentation in the Yesod.Core.Handler module of the yesod-core package.
|
||||
yesodMiddleware = languagesMiddleware appLanguages . headerMessagesMiddleware . defaultYesodMiddleware . normalizeRouteMiddleware . defaultCsrfMiddleware . updateFavouritesMiddleware
|
||||
yesodMiddleware = observeYesodCacheSizeMiddleware . languagesMiddleware appLanguages . headerMessagesMiddleware . defaultYesodMiddleware . normalizeRouteMiddleware . defaultCsrfMiddleware . updateFavouritesMiddleware
|
||||
where
|
||||
updateFavouritesMiddleware :: Handler a -> Handler a
|
||||
updateFavouritesMiddleware handler = (*> handler) . runMaybeT $ do
|
||||
@ -1512,6 +1513,8 @@ instance Yesod UniWorX where
|
||||
|
||||
lift . bracketOnError getMessages (mapM_ addMessage') $
|
||||
addCustomHeader HeaderAlerts . decodeUtf8 . urlEncode True . toStrict . JSON.encode
|
||||
observeYesodCacheSizeMiddleware :: Handler a -> Handler a
|
||||
observeYesodCacheSizeMiddleware handler = handler `finally` observeYesodCacheSize
|
||||
|
||||
-- Since we implement `errorHandler` ourselves we don't need `defaultMessageWidget`
|
||||
defaultMessageWidget _title _body = error "defaultMessageWidget: undefined"
|
||||
|
||||
11
src/Jobs.hs
11
src/Jobs.hs
@ -174,7 +174,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
Nothing -> retry
|
||||
Just (j, q) -> j <$ writeTVar chan q
|
||||
return $ yield nextVal >> streamChan
|
||||
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) . withJobWorkerState workerId Nothing JobWorkerIdle $ do
|
||||
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
|
||||
$logInfoS logIdent "Started"
|
||||
runConduit $ streamChan .| handleJobs' workerId
|
||||
$logInfoS logIdent "Stopped"
|
||||
@ -357,9 +357,9 @@ mkLogIdent :: JobWorkerId -> Text
|
||||
mkLogIdent wId = "Job-Executor " <> showWorkerId wId
|
||||
|
||||
handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) ()
|
||||
handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdle) JobWorkerBusy $ do
|
||||
handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
$logDebugS logIdent $ tshow jctl
|
||||
res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (Just JobWorkerBusy) (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl
|
||||
res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl
|
||||
sentRes <- mapReaderT (liftIO . atomically) $ do
|
||||
resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
|
||||
lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
|
||||
@ -378,7 +378,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdl
|
||||
handleCmd JobCtlTest = return ()
|
||||
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
handleCmd (JobCtlQueue job) = lift $ queueJob' job
|
||||
handleCmd jctl@(JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
|
||||
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
|
||||
content <- case fromJSON queuedJobContent of
|
||||
Aeson.Success c -> return c
|
||||
Aeson.Error t -> do
|
||||
@ -391,8 +391,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdl
|
||||
now <- liftIO getCurrentTime
|
||||
|
||||
performJob content
|
||||
& withJobDuration (classifyJob content)
|
||||
& withJobWorkerState wNum (Just $ JobWorkerExecJobCtl jctl) (JobWorkerExecJob content)
|
||||
& withJobWorkerState wNum (JobWorkerExecJob content)
|
||||
|
||||
-- `performJob` is expected to throw an exception if it detects that the job was not done
|
||||
runDB . setSerializable $ do
|
||||
|
||||
@ -156,8 +156,7 @@ classifyJobCtl jobctl = unpack tag
|
||||
Aeson.String tag = obj HashMap.! "instruction"
|
||||
|
||||
data JobWorkerState
|
||||
= JobWorkerIdle
|
||||
| JobWorkerBusy
|
||||
= JobWorkerBusy
|
||||
| JobWorkerExecJobCtl { jobWorkerJobCtl :: JobCtl }
|
||||
| JobWorkerExecJob { jobWorkerJob :: Job }
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
@ -182,9 +181,8 @@ classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCt
|
||||
]
|
||||
mJob = classifyJob <$> jws ^? _jobWorkerJob
|
||||
|
||||
withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> Maybe JobWorkerState -> JobWorkerState -> m a -> m a
|
||||
withJobWorkerState wId oldSt newSt
|
||||
= withJobWorkerStateLbls (classifyJobWorkerState wId <$> oldSt) (classifyJobWorkerState wId newSt)
|
||||
withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> JobWorkerState -> m a -> m a
|
||||
withJobWorkerState wId newSt = withJobWorkerStateLbls $ classifyJobWorkerState wId newSt
|
||||
|
||||
|
||||
newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
|
||||
|
||||
@ -3,8 +3,8 @@ module Utils.Metrics
|
||||
, registerGHCMetrics
|
||||
, observeHTTPRequestLatency
|
||||
, registerReadyMetric
|
||||
, withJobDuration
|
||||
, withJobWorkerStateLbls
|
||||
, observeYesodCacheSize
|
||||
) where
|
||||
|
||||
import Import.NoFoundation hiding (Vector, Info)
|
||||
@ -19,6 +19,8 @@ import Network.Wai (Middleware)
|
||||
import qualified Network.Wai as Wai
|
||||
import qualified Network.HTTP.Types as HTTP
|
||||
|
||||
import Yesod.Core.Types (HandlerData(..), GHState(..))
|
||||
|
||||
|
||||
histogramBuckets :: Rational -- ^ min
|
||||
-> Rational -- ^ max
|
||||
@ -69,18 +71,25 @@ readyMetric ts = Metric $ return (MkReadySince, collectReadySince)
|
||||
"POSIXTime this Uni2work-instance became ready"
|
||||
sample = encodeUtf8 $ tshow (realToFrac ts :: Nano)
|
||||
|
||||
{-# NOINLINE jobDuration #-}
|
||||
jobDuration :: Vector Label2 Histogram
|
||||
jobDuration = unsafeRegister . vector ("task", "status") $ histogram info buckets
|
||||
where info = Info "uni2work_job_duration_seconds"
|
||||
"Duration of time taken to execute a background job"
|
||||
buckets = histogramBuckets 5e-6 500
|
||||
{-# NOINLINE jobWorkerStateDuration #-}
|
||||
jobWorkerStateDuration :: Vector Label4 Histogram
|
||||
jobWorkerStateDuration = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ histogram info buckets
|
||||
where info = Info "uni2work_job_worker_state_duration_seconds"
|
||||
"Duration of time a Uni2work job executor spent in a certain state"
|
||||
buckets = histogramBuckets 1e-6 500
|
||||
|
||||
{-# NOINLINE jobWorkerState #-}
|
||||
jobWorkerState :: Vector Label4 Gauge
|
||||
jobWorkerState = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ gauge info
|
||||
where info = Info "uni2work_job_worker_state"
|
||||
"Current state of Uni2work job executors"
|
||||
{-# NOINLINE jobWorkerStateTransitions #-}
|
||||
jobWorkerStateTransitions :: Vector Label4 Counter
|
||||
jobWorkerStateTransitions = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ counter info
|
||||
where info = Info "uni2work_job_worker_state_transitions_total"
|
||||
"Number of times a Uni2work job executor entered a certain state"
|
||||
|
||||
{-# NOINLINE yesodCacheSize #-}
|
||||
yesodCacheSize :: Histogram
|
||||
yesodCacheSize = unsafeRegister $ histogram info buckets
|
||||
where info = Info "yesod_ghs_cache_items"
|
||||
"Number of items in Yesod's ghsCache and ghsCacheBy"
|
||||
buckets = 0 : histogramBuckets 1 1e6
|
||||
|
||||
|
||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||
@ -124,29 +133,22 @@ observeHTTPRequestLatency classifyHandler app req respond' = do
|
||||
registerReadyMetric :: MonadIO m => m ()
|
||||
registerReadyMetric = liftIO $ void . register . readyMetric =<< getPOSIXTime
|
||||
|
||||
withJobDuration :: (MonadIO m, MonadCatch m) => String -> m a -> m a
|
||||
withJobDuration job doJob = do
|
||||
withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Label4 -> m a -> m a
|
||||
withJobWorkerStateLbls newLbls act = do
|
||||
liftIO $ withLabel jobWorkerStateTransitions newLbls incCounter
|
||||
|
||||
start <- liftIO getPOSIXTime
|
||||
res <- handleAll (return . Left) $ Right <$> doJob
|
||||
res <- handleAll (return . Left) $ Right <$> act
|
||||
end <- liftIO getPOSIXTime
|
||||
|
||||
liftIO . withLabel jobDuration (pack job, bool "failure" "success" $ is _Right res) . flip observe . realToFrac $ end - start
|
||||
liftIO . withLabel jobWorkerStateDuration newLbls . flip observe . realToFrac $ end - start
|
||||
|
||||
case res of
|
||||
Left exc -> throwM exc
|
||||
Right res' -> return res'
|
||||
|
||||
withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Maybe Label4 -> Label4 -> m a -> m a
|
||||
withJobWorkerStateLbls oldLbls newLbls
|
||||
= bracket_ acquireState releaseState
|
||||
where
|
||||
acquireState = do
|
||||
whenIsJust oldLbls $ \oldLbls' ->
|
||||
liftIO $ withLabel jobWorkerState oldLbls' decGauge
|
||||
liftIO $ withLabel jobWorkerState newLbls incGauge
|
||||
releaseState = do
|
||||
liftIO $ withLabel jobWorkerState newLbls decGauge
|
||||
whenIsJust oldLbls $ \oldLbls' ->
|
||||
liftIO $ withLabel jobWorkerState oldLbls' incGauge
|
||||
|
||||
either throwM return res
|
||||
|
||||
observeYesodCacheSize :: MonadHandler m => m ()
|
||||
observeYesodCacheSize = do
|
||||
HandlerData{handlerState} <- liftHandler ask
|
||||
liftIO $ do
|
||||
GHState{..} <- readIORef handlerState
|
||||
let size = fromIntegral $ length ghsCache + length ghsCacheBy
|
||||
observe yesodCacheSize size
|
||||
|
||||
Loading…
Reference in New Issue
Block a user