feat(metrics): monitor job executor state
This commit is contained in:
parent
0da6c49392
commit
b74bb53041
12
src/Jobs.hs
12
src/Jobs.hs
@ -186,7 +186,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
Nothing -> retry
|
||||
Just (j, q) -> j <$ writeTVar chan q
|
||||
return $ yield nextVal >> streamChan
|
||||
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
|
||||
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) . withJobWorkerState workerId Nothing JobWorkerIdle $ do
|
||||
$logInfoS logIdent "Started"
|
||||
runConduit $ streamChan .| handleJobs' workerId
|
||||
$logInfoS logIdent "Stopped"
|
||||
@ -369,9 +369,9 @@ mkLogIdent :: JobWorkerId -> Text
|
||||
mkLogIdent wId = "Job-Executor " <> showWorkerId wId
|
||||
|
||||
handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) ()
|
||||
handleJobs' wNum = C.mapM_ $ \jctl -> do
|
||||
handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdle) JobWorkerBusy $ do
|
||||
$logDebugS logIdent $ tshow jctl
|
||||
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
|
||||
res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (Just JobWorkerBusy) (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl
|
||||
sentRes <- mapReaderT (liftIO . atomically) $ do
|
||||
resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
|
||||
lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
|
||||
@ -390,7 +390,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do
|
||||
handleCmd JobCtlTest = return ()
|
||||
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
handleCmd (JobCtlQueue job) = lift $ queueJob' job
|
||||
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
|
||||
handleCmd jctl@(JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
|
||||
content <- case fromJSON queuedJobContent of
|
||||
Aeson.Success c -> return c
|
||||
Aeson.Error t -> do
|
||||
@ -402,7 +402,9 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do
|
||||
instanceID' <- getsYesod $ view instanceID
|
||||
now <- liftIO getCurrentTime
|
||||
|
||||
withJobDuration (classifyJob content) $ performJob content
|
||||
performJob content
|
||||
& withJobDuration (classifyJob content)
|
||||
& withJobWorkerState wNum (Just $ JobWorkerExecJobCtl jctl) (JobWorkerExecJob content)
|
||||
|
||||
-- `performJob` is expected to throw an exception if it detects that the job was not done
|
||||
runDB . setSerializable $ do
|
||||
|
||||
@ -4,9 +4,12 @@ module Jobs.Types
|
||||
( Job(..), Notification(..)
|
||||
, classifyJob
|
||||
, JobCtl(..)
|
||||
, classifyJobCtl
|
||||
, JobContext(..)
|
||||
, JobState(..)
|
||||
, jobWorkerNames
|
||||
, JobWorkerState(..)
|
||||
, withJobWorkerState
|
||||
, JobWorkerId
|
||||
, showWorkerId, newWorkerId
|
||||
, JobQueue, jqInsert, jqDequeue
|
||||
@ -31,7 +34,9 @@ import qualified Data.Set as Set
|
||||
import Data.PQueue.Prio.Max (MaxPQueue)
|
||||
import qualified Data.PQueue.Prio.Max as PQ
|
||||
|
||||
import Utils.Metrics (observeJobQueueDepth)
|
||||
import Utils.Metrics (withJobWorkerStateLbls)
|
||||
|
||||
import qualified Prometheus (Label4)
|
||||
|
||||
|
||||
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
|
||||
@ -140,6 +145,50 @@ data JobCtl = JobCtlFlush
|
||||
instance Hashable JobCtl
|
||||
instance NFData JobCtl
|
||||
|
||||
deriveJSON defaultOptions
|
||||
{ constructorTagModifier = camelToPathPiece' 2
|
||||
, tagSingleConstructors = True
|
||||
, sumEncoding = TaggedObject "instruction" "data"
|
||||
} ''JobCtl
|
||||
|
||||
|
||||
classifyJobCtl :: JobCtl -> String
|
||||
classifyJobCtl jobctl = unpack tag
|
||||
where
|
||||
Aeson.Object obj = Aeson.toJSON jobctl
|
||||
Aeson.String tag = obj HashMap.! "instruction"
|
||||
|
||||
data JobWorkerState
|
||||
= JobWorkerIdle
|
||||
| JobWorkerBusy
|
||||
| JobWorkerExecJobCtl { jobWorkerJobCtl :: JobCtl }
|
||||
| JobWorkerExecJob { jobWorkerJob :: Job }
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
|
||||
makeLenses_ ''JobWorkerState
|
||||
|
||||
deriveJSON defaultOptions
|
||||
{ constructorTagModifier = camelToPathPiece' 2
|
||||
, fieldLabelModifier = camelToPathPiece' 2
|
||||
, tagSingleConstructors = True
|
||||
, sumEncoding = TaggedObject "state" "data"
|
||||
} ''JobWorkerState
|
||||
|
||||
classifyJobWorkerState :: JobWorkerId -> JobWorkerState -> Prometheus.Label4
|
||||
classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCtl, maybe "n/a" pack mJob)
|
||||
where
|
||||
Aeson.Object obj = Aeson.toJSON jws
|
||||
Aeson.String tag = obj HashMap.! "state"
|
||||
mJobCtl = asum
|
||||
[ classifyJobCtl <$> jws ^? _jobWorkerJobCtl
|
||||
, "perform" <$ jws ^? _jobWorkerJob
|
||||
]
|
||||
mJob = classifyJob <$> jws ^? _jobWorkerJob
|
||||
|
||||
withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> Maybe JobWorkerState -> JobWorkerState -> m a -> m a
|
||||
withJobWorkerState wId oldSt newSt
|
||||
= withJobWorkerStateLbls (classifyJobWorkerState wId <$> oldSt) (classifyJobWorkerState wId newSt)
|
||||
|
||||
|
||||
newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
|
||||
deriving (Eq, Ord)
|
||||
|
||||
@ -4,6 +4,7 @@ module Utils.Metrics
|
||||
, observeHTTPRequestLatency
|
||||
, registerReadyMetric
|
||||
, withJobDuration
|
||||
, withJobWorkerStateLbls
|
||||
) where
|
||||
|
||||
import Import.NoFoundation hiding (Vector, Info)
|
||||
@ -70,16 +71,16 @@ readyMetric ts = Metric $ return (MkReadySince, collectReadySince)
|
||||
|
||||
{-# NOINLINE jobDuration #-}
|
||||
jobDuration :: Vector Label2 Histogram
|
||||
jobDuration = unsafeRegister . vector ("kind", "status") $ histogram info buckets
|
||||
jobDuration = unsafeRegister . vector ("task", "status") $ histogram info buckets
|
||||
where info = Info "uni2work_job_duration_seconds"
|
||||
"Duration of time taken to execute a background job"
|
||||
buckets = histogramBuckets 5e-6 500
|
||||
|
||||
{-# NOINLINE jobQueueDepth #-}
|
||||
jobQueueDepth :: Vector Label2 Gauge
|
||||
jobQueueDepth = unsafeRegister . vector ("worker", "priority") $ gauge info
|
||||
where info = Info "uni2work_job_queue_size_count"
|
||||
"Current depth of worker queue"
|
||||
{-# NOINLINE jobWorkerState #-}
|
||||
jobWorkerState :: Vector Label4 Gauge
|
||||
jobWorkerState = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ gauge info
|
||||
where info = Info "uni2work_job_worker_state"
|
||||
"Current state of Uni2work job executors"
|
||||
|
||||
|
||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||
@ -134,3 +135,18 @@ withJobDuration job doJob = do
|
||||
case res of
|
||||
Left exc -> throwM exc
|
||||
Right res' -> return res'
|
||||
|
||||
withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Maybe Label4 -> Label4 -> m a -> m a
|
||||
withJobWorkerStateLbls oldLbls newLbls
|
||||
= bracket_ acquireState releaseState
|
||||
where
|
||||
acquireState = do
|
||||
whenIsJust oldLbls $ \oldLbls' ->
|
||||
liftIO $ withLabel jobWorkerState oldLbls' decGauge
|
||||
liftIO $ withLabel jobWorkerState newLbls incGauge
|
||||
releaseState = do
|
||||
liftIO $ withLabel jobWorkerState newLbls decGauge
|
||||
whenIsJust oldLbls $ \oldLbls' ->
|
||||
liftIO $ withLabel jobWorkerState oldLbls' incGauge
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user