diff --git a/src/Jobs.hs b/src/Jobs.hs index 923d43626..423e0c20c 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -186,7 +186,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ Nothing -> retry Just (j, q) -> j <$ writeTVar chan q return $ yield nextVal >> streamChan - runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do + runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) . withJobWorkerState workerId Nothing JobWorkerIdle $ do $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId $logInfoS logIdent "Stopped" @@ -369,9 +369,9 @@ mkLogIdent :: JobWorkerId -> Text mkLogIdent wId = "Job-Executor " <> showWorkerId wId handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) () -handleJobs' wNum = C.mapM_ $ \jctl -> do +handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdle) JobWorkerBusy $ do $logDebugS logIdent $ tshow jctl - res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl + res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (Just JobWorkerBusy) (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl sentRes <- mapReaderT (liftIO . atomically) $ do resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) @@ -390,7 +390,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) handleCmd (JobCtlQueue job) = lift $ queueJob' job - handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do + handleCmd jctl@(JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do @@ -402,7 +402,9 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do instanceID' <- getsYesod $ view instanceID now <- liftIO getCurrentTime - withJobDuration (classifyJob content) $ performJob content + performJob content + & withJobDuration (classifyJob content) + & withJobWorkerState wNum (Just $ JobWorkerExecJobCtl jctl) (JobWorkerExecJob content) -- `performJob` is expected to throw an exception if it detects that the job was not done runDB . setSerializable $ do diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 9d33b1258..8a3c247b3 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -4,9 +4,12 @@ module Jobs.Types ( Job(..), Notification(..) , classifyJob , JobCtl(..) + , classifyJobCtl , JobContext(..) , JobState(..) , jobWorkerNames + , JobWorkerState(..) + , withJobWorkerState , JobWorkerId , showWorkerId, newWorkerId , JobQueue, jqInsert, jqDequeue @@ -31,7 +34,9 @@ import qualified Data.Set as Set import Data.PQueue.Prio.Max (MaxPQueue) import qualified Data.PQueue.Prio.Max as PQ -import Utils.Metrics (observeJobQueueDepth) +import Utils.Metrics (withJobWorkerStateLbls) + +import qualified Prometheus (Label4) data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } @@ -140,6 +145,50 @@ data JobCtl = JobCtlFlush instance Hashable JobCtl instance NFData JobCtl +deriveJSON defaultOptions + { constructorTagModifier = camelToPathPiece' 2 + , tagSingleConstructors = True + , sumEncoding = TaggedObject "instruction" "data" + } ''JobCtl + + +classifyJobCtl :: JobCtl -> String +classifyJobCtl jobctl = unpack tag + where + Aeson.Object obj = Aeson.toJSON jobctl + Aeson.String tag = obj HashMap.! "instruction" + +data JobWorkerState + = JobWorkerIdle + | JobWorkerBusy + | JobWorkerExecJobCtl { jobWorkerJobCtl :: JobCtl } + | JobWorkerExecJob { jobWorkerJob :: Job } + deriving (Eq, Ord, Read, Show, Generic, Typeable) + +makeLenses_ ''JobWorkerState + +deriveJSON defaultOptions + { constructorTagModifier = camelToPathPiece' 2 + , fieldLabelModifier = camelToPathPiece' 2 + , tagSingleConstructors = True + , sumEncoding = TaggedObject "state" "data" + } ''JobWorkerState + +classifyJobWorkerState :: JobWorkerId -> JobWorkerState -> Prometheus.Label4 +classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCtl, maybe "n/a" pack mJob) + where + Aeson.Object obj = Aeson.toJSON jws + Aeson.String tag = obj HashMap.! "state" + mJobCtl = asum + [ classifyJobCtl <$> jws ^? _jobWorkerJobCtl + , "perform" <$ jws ^? _jobWorkerJob + ] + mJob = classifyJob <$> jws ^? _jobWorkerJob + +withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> Maybe JobWorkerState -> JobWorkerState -> m a -> m a +withJobWorkerState wId oldSt newSt + = withJobWorkerStateLbls (classifyJobWorkerState wId <$> oldSt) (classifyJobWorkerState wId newSt) + newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique } deriving (Eq, Ord) diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 6a2368a86..25233023d 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -4,6 +4,7 @@ module Utils.Metrics , observeHTTPRequestLatency , registerReadyMetric , withJobDuration + , withJobWorkerStateLbls ) where import Import.NoFoundation hiding (Vector, Info) @@ -70,16 +71,16 @@ readyMetric ts = Metric $ return (MkReadySince, collectReadySince) {-# NOINLINE jobDuration #-} jobDuration :: Vector Label2 Histogram -jobDuration = unsafeRegister . vector ("kind", "status") $ histogram info buckets +jobDuration = unsafeRegister . vector ("task", "status") $ histogram info buckets where info = Info "uni2work_job_duration_seconds" "Duration of time taken to execute a background job" buckets = histogramBuckets 5e-6 500 -{-# NOINLINE jobQueueDepth #-} -jobQueueDepth :: Vector Label2 Gauge -jobQueueDepth = unsafeRegister . vector ("worker", "priority") $ gauge info - where info = Info "uni2work_job_queue_size_count" - "Current depth of worker queue" +{-# NOINLINE jobWorkerState #-} +jobWorkerState :: Vector Label4 Gauge +jobWorkerState = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ gauge info + where info = Info "uni2work_job_worker_state" + "Current state of Uni2work job executors" withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport @@ -134,3 +135,18 @@ withJobDuration job doJob = do case res of Left exc -> throwM exc Right res' -> return res' + +withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Maybe Label4 -> Label4 -> m a -> m a +withJobWorkerStateLbls oldLbls newLbls + = bracket_ acquireState releaseState + where + acquireState = do + whenIsJust oldLbls $ \oldLbls' -> + liftIO $ withLabel jobWorkerState oldLbls' decGauge + liftIO $ withLabel jobWorkerState newLbls incGauge + releaseState = do + liftIO $ withLabel jobWorkerState newLbls decGauge + whenIsJust oldLbls $ \oldLbls' -> + liftIO $ withLabel jobWorkerState oldLbls' incGauge + +