feat(health): check for active job workers

This commit is contained in:
Gregor Kleen 2019-07-24 09:41:17 +02:00
parent 20686f185b
commit d1abe530b6
9 changed files with 98 additions and 23 deletions

View File

@ -36,8 +36,10 @@ health-check-interval:
ldap-admins: "_env:HEALTHCHECK_INTERVAL_LDAP_ADMINS:600"
smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600"
widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600"
active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60"
health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true"
health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)?
health-check-active-job-executors-timeout: "_env:HEALTHCHECK_ACTIVE_JOB_EXECUTORS_TIMEOUT:5"
log-settings:
detailed: "_env:DETAILED_LOGGING:false"

View File

@ -1061,6 +1061,7 @@ HealthHTTPReachable: Cluster kann an der erwarteten URL über HTTP erreicht werd
HealthLDAPAdmins: Anteil der Administratoren, die im LDAP-Verzeichnis gefunden werden können
HealthSMTPConnect: SMTP-Server kann erreicht werden
HealthWidgetMemcached: Memcached-Server liefert Widgets korrekt aus
HealthActiveJobExecutors: Anteil der job-workers, die neue Befehle annehmen
CourseParticipants n@Int: Derzeit #{n} angemeldete Kursteilnehmer
CourseParticipantsInvited n@Int: #{n} #{pluralDE n "Einladung" "Einladungen"} per E-Mail verschickt

View File

@ -70,6 +70,9 @@ getHealthR = do
$of HealthWidgetMemcached (Just passed)
<dt .deflist__dt>_{MsgHealthWidgetMemcached}
<dd .deflist__dd>#{boolSymbol passed}
$of HealthActiveJobExecutors (Just active)
<dt .deflist__dt>_{MsgHealthActiveJobExecutors}
<dd .deflist__dd>#{textPercent active 1}
$of _
|]
provideJson healthReports

View File

@ -114,7 +114,7 @@ manageJobPool, manageCrontab :: forall m.
manageCrontab foundation@UniWorX{..} = do
context <- atomically . fmap jobContext $ readTMVar appJobState
liftIO . unsafeHandler foundation . void $ do
runReaderT ?? context $
runReaderT ?? foundation $
writeJobCtlBlock JobCtlDetermineCrontab
evalRWST execCrontab' context HashMap.empty
where
@ -239,7 +239,8 @@ execCrontab = do
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID' <- getsYesod appInstanceID
foundation <- getYesod
let instanceID' = foundation ^. _appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
@ -252,7 +253,7 @@ execCrontab = do
}
[ CronLastExecTime =. now ]
lift $ queueDBJob job
other -> writeJobCtl other
other -> runReaderT ?? foundation $ writeJobCtl other
| otherwise
-> mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
@ -322,10 +323,10 @@ mkLogIdent wId = "Job-Executor " <> showWorkerId wId
handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
resVars <- mapReaderT (liftIO . atomically) $
HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
sentRes <- mapReaderT (liftIO . atomically) $ do
resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
@ -338,7 +339,8 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd JobCtlNoOp = return ()
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of

View File

@ -7,6 +7,7 @@ module Jobs.HealthReport
import Import
import Data.List (genericLength)
import qualified Data.Map.Strict as Map
import qualified Data.Aeson as Aeson
import Data.Proxy (Proxy(..))
@ -27,6 +28,12 @@ import qualified Data.CaseInsensitive as CI
import qualified Network.HaskellNet.SMTP as SMTP
import Data.Pool (withResource)
import System.Timeout
import Jobs.Queue
import Control.Concurrent.Async.Lifted.Safe (forConcurrently)
generateHealthReport :: HealthCheck -> Handler HealthReport
generateHealthReport = $(dispatchTH ''HealthCheck)
@ -135,3 +142,26 @@ dispatchHealthCheckWidgetMemcached = HealthWidgetMemcached <$> do
(== content) . responseBody <$> httpLBS httpRequest
_other -> return False
dispatchHealthCheckActiveJobExecutors :: Handler HealthReport
dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do
app <- getYesod
jState <- atomically . tryReadTMVar $ appJobState app
let configuredNumber = app ^. _appJobWorkers
timeoutLength = app ^. _appHealthCheckActiveJobExecutorsTimeout
case jState of
Nothing
| configuredNumber == 0 -> return Nothing
Nothing -> return $ Just 0
Just JobState{jobWorkers, jobWorkerName} -> do
tid <- liftIO myThreadId
let workers' = Map.fromSet jobWorkerName (Map.keysSet jobWorkers)
workers = Map.filterWithKey (\a _ -> asyncThreadId a /= tid) workers'
timeoutMicro = let (MkFixed micro :: Micro) = realToFrac timeoutLength
in fromInteger micro
$logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers'
responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName)
-> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp)
if
| Map.null workers -> return Nothing
| otherwise -> return . Just $ responders % fromIntegral (Map.size workers)

View File

@ -1,5 +1,6 @@
module Jobs.Queue
( writeJobCtl, writeJobCtlBlock
, writeJobCtl', writeJobCtlBlock'
, queueJob, queueJob'
, YesodJobDB
, runDBJobs, queueDBJob, sinkDBJobs
@ -9,12 +10,14 @@ module Jobs.Queue
import Import hiding ((<>))
import Utils.Sql
import Utils.Lens
import Jobs.Types
import Control.Monad.Trans.Writer (WriterT, runWriterT)
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Trans.Reader (ReaderT, mapReaderT)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.HashMap.Strict as HashMap
@ -27,39 +30,54 @@ import Data.Semigroup ((<>))
data JobQueueException = JobQueuePoolEmpty
| JobQueueWorkerNotFound
deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic)
instance Exception JobQueueException
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m ()
-- | Pass an instruction to the given `Job`-Worker
writeJobCtl' target cmd = do
JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar
if
| null jobWorkers
-> throwM JobQueuePoolEmpty
| [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
-> atomically $ writeTChan chan cmd
| otherwise
-> throwM JobQueueWorkerNotFound
writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers
--
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
writeJobCtl cmd = do
names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar
tid <- liftIO myThreadId
wMap <- fmap jobWorkers $ getsYesod appJobState >>= atomically . readTMVar
if
| null wMap -> throwM JobQueuePoolEmpty
| otherwise -> do
let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap
liftIO . atomically $ writeTChan chan cmd
let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names
writeJobCtl' target cmd
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock cmd = do
getResVar <- asks jobConfirm
resVar <- liftIO . atomically $ do
writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon
writeJobCtlBlock' writeCtl cmd = do
getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar
resVar <- atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
lift $ writeJobCtl cmd
writeCtl cmd
let
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do
now <- liftIO getCurrentTime
@ -83,7 +101,9 @@ queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
queueJob' job = do
app <- getYesod
queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
@ -102,5 +122,6 @@ runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -
-- Jobs get immediately executed if the transaction succeeds
runDBJobs act = do
(ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act
forM_ jIds $ writeJobCtl . JobCtlPerform
app <- getYesod
forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform
return ret

View File

@ -3,6 +3,7 @@ module Jobs.Types
, JobCtl(..)
, JobContext(..)
, JobState(..)
, jobWorkerNames
, JobWorkerId
, showWorkerId, newWorkerId
) where
@ -16,6 +17,9 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Unique
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@ -75,6 +79,7 @@ data JobCtl = JobCtlFlush
| JobCtlDetermineCrontab
| JobCtlQueue Job
| JobCtlGenerateHealthReport HealthCheck
| JobCtlNoOp
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl
@ -103,3 +108,6 @@ data JobState = JobState
, jobCron :: Async ()
, jobShutdown :: TMVar ()
}
jobWorkerNames :: JobState -> Set JobWorkerId
jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers

View File

@ -15,6 +15,7 @@ data HealthCheck
| HealthCheckLDAPAdmins
| HealthCheckSMTPConnect
| HealthCheckWidgetMemcached
| HealthCheckActiveJobExecutors
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
instance Universe HealthCheck
instance Finite HealthCheck
@ -39,6 +40,8 @@ data HealthReport
-- ^ Can we connect to the SMTP server and say @NOOP@?
| HealthWidgetMemcached { healthWidgetMemcached :: Maybe Bool }
-- ^ Can we store values in memcached and retrieve them via HTTP?
| HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational }
-- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
instance NFData HealthReport
@ -57,6 +60,7 @@ classifyHealthReport HealthLDAPAdmins{} = HealthCheckLDAPAdmins
classifyHealthReport HealthHTTPReachable{} = HealthCheckHTTPReachable
classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect
classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached
classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors
-- | `HealthReport` classified (`classifyHealthReport`) by badness
--
@ -84,4 +88,6 @@ healthReportStatus = \case
| prop <= 0 -> HealthFailure
HealthSMTPConnect (Just False) -> HealthFailure
HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully?
HealthActiveJobExecutors (Just prop )
| prop < 1 -> HealthFailure
_other -> maxBound -- Minimum badness

View File

@ -118,6 +118,7 @@ data AppSettings = AppSettings
, appHealthCheckInterval :: HealthCheck -> Maybe NominalDiffTime
, appHealthCheckDelayNotify :: Bool
, appHealthCheckHTTP :: Bool
, appHealthCheckActiveJobExecutorsTimeout :: NominalDiffTime
, appInitialLogSettings :: LogSettings
@ -389,6 +390,7 @@ instance FromJSON AppSettings where
appHealthCheckInterval <- (assertM' (> 0) . ) <$> o .: "health-check-interval"
appHealthCheckDelayNotify <- o .: "health-check-delay-notify"
appHealthCheckHTTP <- o .: "health-check-http"
appHealthCheckActiveJobExecutorsTimeout <- o .: "health-check-active-job-executors-timeout"
appSessionTimeout <- o .: "session-timeout"