chore(health): monitor job flushes
This commit is contained in:
parent
6c12737ad9
commit
aa1c0c8a3e
@ -50,6 +50,7 @@ health-check-interval:
|
|||||||
smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600"
|
smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600"
|
||||||
widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600"
|
widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600"
|
||||||
active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60"
|
active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60"
|
||||||
|
does-flush: "_env:HEALTHCHECK_INTERVAL_DOES_FLUSH:15"
|
||||||
health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true"
|
health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true"
|
||||||
health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)?
|
health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)?
|
||||||
|
|
||||||
|
|||||||
@ -117,6 +117,7 @@ handleJobs foundation@UniWorX{..}
|
|||||||
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
||||||
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
||||||
jobOffload <- liftIO newEmptyTMVarIO
|
jobOffload <- liftIO newEmptyTMVarIO
|
||||||
|
jobLastFlush <- liftIO $ newTVarIO Nothing
|
||||||
registerJobHeldLocksCount jobHeldLocks
|
registerJobHeldLocksCount jobHeldLocks
|
||||||
registerJobWorkerQueueDepth appJobState
|
registerJobWorkerQueueDepth appJobState
|
||||||
atomically $ putTMVar appJobState JobState
|
atomically $ putTMVar appJobState JobState
|
||||||
@ -501,6 +502,8 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
|
|||||||
void . lift . runDB . runConduit
|
void . lift . runDB . runConduit
|
||||||
$ selectKeys [ QueuedJobId /<-. Set.toList heldLocks ] [ Asc QueuedJobCreationTime ]
|
$ selectKeys [ QueuedJobId /<-. Set.toList heldLocks ] [ Asc QueuedJobCreationTime ]
|
||||||
.| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
.| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||||
|
lFlushTVar <- asks jobLastFlush
|
||||||
|
atomically . modifyTVar' lFlushTVar . max . Just =<< liftIO getCurrentTime
|
||||||
$logInfoS logIdent "JobCtlFlush"
|
$logInfoS logIdent "JobCtlFlush"
|
||||||
handleCmd (JobCtlQueue job) = do
|
handleCmd (JobCtlQueue job) = do
|
||||||
$logDebugS logIdent "JobCtlQueue..."
|
$logDebugS logIdent "JobCtlQueue..."
|
||||||
|
|||||||
@ -173,3 +173,16 @@ dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do
|
|||||||
if
|
if
|
||||||
| Map.null workers -> return Nothing
|
| Map.null workers -> return Nothing
|
||||||
| otherwise -> return . Just $ responders % fromIntegral (Map.size workers)
|
| otherwise -> return . Just $ responders % fromIntegral (Map.size workers)
|
||||||
|
|
||||||
|
|
||||||
|
dispatchHealthCheckDoesFlush :: Handler HealthReport
|
||||||
|
dispatchHealthCheckDoesFlush = fmap HealthDoesFlush . runMaybeT $ do
|
||||||
|
UniWorX{ appSettings' = AppSettings{..}, appJobState } <- getYesod
|
||||||
|
|
||||||
|
interval <- hoistMaybe $ guardOnM (isn't _JobsOffload appJobMode) appJobFlushInterval
|
||||||
|
lFlush <- MaybeT . atomically $ do
|
||||||
|
jState <- tryReadTMVar appJobState
|
||||||
|
fmap join . for jState $ \JobState{jobContext} -> readTVar $ jobLastFlush jobContext
|
||||||
|
|
||||||
|
now <- liftIO getCurrentTime
|
||||||
|
return $ toRational (now `diffUTCTime` lFlush) / toRational interval
|
||||||
|
|||||||
@ -284,6 +284,7 @@ data JobContext = JobContext
|
|||||||
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
|
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
|
||||||
, jobHeldLocks :: TVar (Set QueuedJobId)
|
, jobHeldLocks :: TVar (Set QueuedJobId)
|
||||||
, jobOffload :: TMVar JobOffloadHandler
|
, jobOffload :: TMVar JobOffloadHandler
|
||||||
|
, jobLastFlush :: TVar (Maybe UTCTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -16,6 +16,7 @@ data HealthCheck
|
|||||||
| HealthCheckSMTPConnect
|
| HealthCheckSMTPConnect
|
||||||
| HealthCheckWidgetMemcached
|
| HealthCheckWidgetMemcached
|
||||||
| HealthCheckActiveJobExecutors
|
| HealthCheckActiveJobExecutors
|
||||||
|
| HealthCheckDoesFlush
|
||||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||||
instance Universe HealthCheck
|
instance Universe HealthCheck
|
||||||
instance Finite HealthCheck
|
instance Finite HealthCheck
|
||||||
@ -43,6 +44,7 @@ data HealthReport
|
|||||||
-- ^ Can we store values in memcached and retrieve them via HTTP?
|
-- ^ Can we store values in memcached and retrieve them via HTTP?
|
||||||
| HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational }
|
| HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational }
|
||||||
-- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout
|
-- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout
|
||||||
|
| HealthDoesFlush { healthFlushOverdue :: Maybe Rational }
|
||||||
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
|
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
|
||||||
|
|
||||||
instance NFData HealthReport
|
instance NFData HealthReport
|
||||||
@ -62,6 +64,7 @@ classifyHealthReport HealthHTTPReachable{} = HealthCheckHTTPReachable
|
|||||||
classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect
|
classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect
|
||||||
classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached
|
classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached
|
||||||
classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors
|
classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors
|
||||||
|
classifyHealthReport HealthDoesFlush{} = HealthCheckDoesFlush
|
||||||
|
|
||||||
-- | `HealthReport` classified (`classifyHealthReport`) by badness
|
-- | `HealthReport` classified (`classifyHealthReport`) by badness
|
||||||
--
|
--
|
||||||
@ -91,4 +94,6 @@ healthReportStatus = \case
|
|||||||
HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully?
|
HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully?
|
||||||
HealthActiveJobExecutors (Just prop )
|
HealthActiveJobExecutors (Just prop )
|
||||||
| prop <= 0 -> HealthFailure
|
| prop <= 0 -> HealthFailure
|
||||||
|
HealthDoesFlush (Just prop )
|
||||||
|
| prop >= 2 -> HealthFailure
|
||||||
_other -> maxBound -- Minimum badness
|
_other -> maxBound -- Minimum badness
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user