From f0f046f4b62378d9a9ea9147914b0456a961d687 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 3 Mar 2020 16:50:55 +0100 Subject: [PATCH] refactor(metrics): collect better metrics about jobs --- src/Foundation.hs | 5 +++- src/Jobs.hs | 11 ++++--- src/Jobs/Types.hs | 8 ++---- src/Utils/Metrics.hs | 68 +++++++++++++++++++++++--------------------- 4 files changed, 47 insertions(+), 45 deletions(-) diff --git a/src/Foundation.hs b/src/Foundation.hs index fc6b5c51f..07f373149 100644 --- a/src/Foundation.hs +++ b/src/Foundation.hs @@ -74,6 +74,7 @@ import Handler.Utils.Routes import Utils.Form import Utils.Sheet import Utils.SystemMessage +import Utils.Metrics import Text.Cassius (cassiusFile) @@ -1480,7 +1481,7 @@ instance Yesod UniWorX where -- b) Validates that incoming write requests include that token in either a header or POST parameter. -- To add it, chain it together with the defaultMiddleware: yesodMiddleware = defaultYesodMiddleware . defaultCsrfMiddleware -- For details, see the CSRF documentation in the Yesod.Core.Handler module of the yesod-core package. - yesodMiddleware = languagesMiddleware appLanguages . headerMessagesMiddleware . defaultYesodMiddleware . normalizeRouteMiddleware . defaultCsrfMiddleware . updateFavouritesMiddleware + yesodMiddleware = observeYesodCacheSizeMiddleware . languagesMiddleware appLanguages . headerMessagesMiddleware . defaultYesodMiddleware . normalizeRouteMiddleware . defaultCsrfMiddleware . updateFavouritesMiddleware where updateFavouritesMiddleware :: Handler a -> Handler a updateFavouritesMiddleware handler = (*> handler) . runMaybeT $ do @@ -1512,6 +1513,8 @@ instance Yesod UniWorX where lift . bracketOnError getMessages (mapM_ addMessage') $ addCustomHeader HeaderAlerts . decodeUtf8 . urlEncode True . toStrict . JSON.encode + observeYesodCacheSizeMiddleware :: Handler a -> Handler a + observeYesodCacheSizeMiddleware handler = handler `finally` observeYesodCacheSize -- Since we implement `errorHandler` ourselves we don't need `defaultMessageWidget` defaultMessageWidget _title _body = error "defaultMessageWidget: undefined" diff --git a/src/Jobs.hs b/src/Jobs.hs index 600294731..0daaaadcb 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -174,7 +174,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ Nothing -> retry Just (j, q) -> j <$ writeTVar chan q return $ yield nextVal >> streamChan - runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) . withJobWorkerState workerId Nothing JobWorkerIdle $ do + runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId $logInfoS logIdent "Stopped" @@ -357,9 +357,9 @@ mkLogIdent :: JobWorkerId -> Text mkLogIdent wId = "Job-Executor " <> showWorkerId wId handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) () -handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdle) JobWorkerBusy $ do +handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do $logDebugS logIdent $ tshow jctl - res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (Just JobWorkerBusy) (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl + res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl sentRes <- mapReaderT (liftIO . atomically) $ do resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) @@ -378,7 +378,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdl handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) handleCmd (JobCtlQueue job) = lift $ queueJob' job - handleCmd jctl@(JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do + handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do @@ -391,8 +391,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum (Just JobWorkerIdl now <- liftIO getCurrentTime performJob content - & withJobDuration (classifyJob content) - & withJobWorkerState wNum (Just $ JobWorkerExecJobCtl jctl) (JobWorkerExecJob content) + & withJobWorkerState wNum (JobWorkerExecJob content) -- `performJob` is expected to throw an exception if it detects that the job was not done runDB . setSerializable $ do diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 54bb430cc..5d1eb8be5 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -156,8 +156,7 @@ classifyJobCtl jobctl = unpack tag Aeson.String tag = obj HashMap.! "instruction" data JobWorkerState - = JobWorkerIdle - | JobWorkerBusy + = JobWorkerBusy | JobWorkerExecJobCtl { jobWorkerJobCtl :: JobCtl } | JobWorkerExecJob { jobWorkerJob :: Job } deriving (Eq, Ord, Read, Show, Generic, Typeable) @@ -182,9 +181,8 @@ classifyJobWorkerState wId jws = (showWorkerId wId, tag, maybe "n/a" pack mJobCt ] mJob = classifyJob <$> jws ^? _jobWorkerJob -withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> Maybe JobWorkerState -> JobWorkerState -> m a -> m a -withJobWorkerState wId oldSt newSt - = withJobWorkerStateLbls (classifyJobWorkerState wId <$> oldSt) (classifyJobWorkerState wId newSt) +withJobWorkerState :: (MonadIO m, MonadMask m) => JobWorkerId -> JobWorkerState -> m a -> m a +withJobWorkerState wId newSt = withJobWorkerStateLbls $ classifyJobWorkerState wId newSt newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique } diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 25233023d..2e8b8239c 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -3,8 +3,8 @@ module Utils.Metrics , registerGHCMetrics , observeHTTPRequestLatency , registerReadyMetric - , withJobDuration , withJobWorkerStateLbls + , observeYesodCacheSize ) where import Import.NoFoundation hiding (Vector, Info) @@ -19,6 +19,8 @@ import Network.Wai (Middleware) import qualified Network.Wai as Wai import qualified Network.HTTP.Types as HTTP +import Yesod.Core.Types (HandlerData(..), GHState(..)) + histogramBuckets :: Rational -- ^ min -> Rational -- ^ max @@ -69,18 +71,25 @@ readyMetric ts = Metric $ return (MkReadySince, collectReadySince) "POSIXTime this Uni2work-instance became ready" sample = encodeUtf8 $ tshow (realToFrac ts :: Nano) -{-# NOINLINE jobDuration #-} -jobDuration :: Vector Label2 Histogram -jobDuration = unsafeRegister . vector ("task", "status") $ histogram info buckets - where info = Info "uni2work_job_duration_seconds" - "Duration of time taken to execute a background job" - buckets = histogramBuckets 5e-6 500 +{-# NOINLINE jobWorkerStateDuration #-} +jobWorkerStateDuration :: Vector Label4 Histogram +jobWorkerStateDuration = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ histogram info buckets + where info = Info "uni2work_job_worker_state_duration_seconds" + "Duration of time a Uni2work job executor spent in a certain state" + buckets = histogramBuckets 1e-6 500 -{-# NOINLINE jobWorkerState #-} -jobWorkerState :: Vector Label4 Gauge -jobWorkerState = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ gauge info - where info = Info "uni2work_job_worker_state" - "Current state of Uni2work job executors" +{-# NOINLINE jobWorkerStateTransitions #-} +jobWorkerStateTransitions :: Vector Label4 Counter +jobWorkerStateTransitions = unsafeRegister . vector ("worker", "state", "jobctl", "task") $ counter info + where info = Info "uni2work_job_worker_state_transitions_total" + "Number of times a Uni2work job executor entered a certain state" + +{-# NOINLINE yesodCacheSize #-} +yesodCacheSize :: Histogram +yesodCacheSize = unsafeRegister $ histogram info buckets + where info = Info "yesod_ghs_cache_items" + "Number of items in Yesod's ghsCache and ghsCacheBy" + buckets = 0 : histogramBuckets 1 1e6 withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport @@ -124,29 +133,22 @@ observeHTTPRequestLatency classifyHandler app req respond' = do registerReadyMetric :: MonadIO m => m () registerReadyMetric = liftIO $ void . register . readyMetric =<< getPOSIXTime -withJobDuration :: (MonadIO m, MonadCatch m) => String -> m a -> m a -withJobDuration job doJob = do +withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Label4 -> m a -> m a +withJobWorkerStateLbls newLbls act = do + liftIO $ withLabel jobWorkerStateTransitions newLbls incCounter + start <- liftIO getPOSIXTime - res <- handleAll (return . Left) $ Right <$> doJob + res <- handleAll (return . Left) $ Right <$> act end <- liftIO getPOSIXTime - liftIO . withLabel jobDuration (pack job, bool "failure" "success" $ is _Right res) . flip observe . realToFrac $ end - start + liftIO . withLabel jobWorkerStateDuration newLbls . flip observe . realToFrac $ end - start - case res of - Left exc -> throwM exc - Right res' -> return res' - -withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Maybe Label4 -> Label4 -> m a -> m a -withJobWorkerStateLbls oldLbls newLbls - = bracket_ acquireState releaseState - where - acquireState = do - whenIsJust oldLbls $ \oldLbls' -> - liftIO $ withLabel jobWorkerState oldLbls' decGauge - liftIO $ withLabel jobWorkerState newLbls incGauge - releaseState = do - liftIO $ withLabel jobWorkerState newLbls decGauge - whenIsJust oldLbls $ \oldLbls' -> - liftIO $ withLabel jobWorkerState oldLbls' incGauge - + either throwM return res +observeYesodCacheSize :: MonadHandler m => m () +observeYesodCacheSize = do + HandlerData{handlerState} <- liftHandler ask + liftIO $ do + GHState{..} <- readIORef handlerState + let size = fromIntegral $ length ghsCache + length ghsCacheBy + observe yesodCacheSize size