diff --git a/config/settings.yml b/config/settings.yml
index edd971e64..d35732623 100644
--- a/config/settings.yml
+++ b/config/settings.yml
@@ -36,8 +36,10 @@ health-check-interval:
ldap-admins: "_env:HEALTHCHECK_INTERVAL_LDAP_ADMINS:600"
smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600"
widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600"
+ active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60"
health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true"
health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)?
+health-check-active-job-executors-timeout: "_env:HEALTHCHECK_ACTIVE_JOB_EXECUTORS_TIMEOUT:5"
log-settings:
detailed: "_env:DETAILED_LOGGING:false"
diff --git a/messages/uniworx/de.msg b/messages/uniworx/de.msg
index 99ed87ddf..563aede8a 100644
--- a/messages/uniworx/de.msg
+++ b/messages/uniworx/de.msg
@@ -1061,6 +1061,7 @@ HealthHTTPReachable: Cluster kann an der erwarteten URL über HTTP erreicht werd
HealthLDAPAdmins: Anteil der Administratoren, die im LDAP-Verzeichnis gefunden werden können
HealthSMTPConnect: SMTP-Server kann erreicht werden
HealthWidgetMemcached: Memcached-Server liefert Widgets korrekt aus
+HealthActiveJobExecutors: Anteil der job-workers, die neue Befehle annehmen
CourseParticipants n@Int: Derzeit #{n} angemeldete Kursteilnehmer
CourseParticipantsInvited n@Int: #{n} #{pluralDE n "Einladung" "Einladungen"} per E-Mail verschickt
diff --git a/src/Handler/Health.hs b/src/Handler/Health.hs
index 7b29e2bbd..36649a436 100644
--- a/src/Handler/Health.hs
+++ b/src/Handler/Health.hs
@@ -70,6 +70,9 @@ getHealthR = do
$of HealthWidgetMemcached (Just passed)
_{MsgHealthWidgetMemcached}
#{boolSymbol passed}
+ $of HealthActiveJobExecutors (Just active)
+ _{MsgHealthActiveJobExecutors}
+ #{textPercent active 1}
$of _
|]
provideJson healthReports
diff --git a/src/Jobs.hs b/src/Jobs.hs
index 3da01ebee..f8cdb2ee5 100644
--- a/src/Jobs.hs
+++ b/src/Jobs.hs
@@ -114,7 +114,7 @@ manageJobPool, manageCrontab :: forall m.
manageCrontab foundation@UniWorX{..} = do
context <- atomically . fmap jobContext $ readTMVar appJobState
liftIO . unsafeHandler foundation . void $ do
- runReaderT ?? context $
+ runReaderT ?? foundation $
writeJobCtlBlock JobCtlDetermineCrontab
evalRWST execCrontab' context HashMap.empty
where
@@ -239,7 +239,8 @@ execCrontab = do
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
- instanceID' <- getsYesod appInstanceID
+ foundation <- getYesod
+ let instanceID' = foundation ^. _appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
@@ -252,7 +253,7 @@ execCrontab = do
}
[ CronLastExecTime =. now ]
lift $ queueDBJob job
- other -> writeJobCtl other
+ other -> runReaderT ?? foundation $ writeJobCtl other
| otherwise
-> mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
@@ -322,10 +323,10 @@ mkLogIdent wId = "Job-Executor " <> showWorkerId wId
handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
- resVars <- mapReaderT (liftIO . atomically) $
- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
- sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
+ sentRes <- mapReaderT (liftIO . atomically) $ do
+ resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
+ lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
@@ -338,7 +339,8 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
- handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
+ handleCmd JobCtlNoOp = return ()
+ handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of
diff --git a/src/Jobs/HealthReport.hs b/src/Jobs/HealthReport.hs
index bf65049f9..6aecd01f6 100644
--- a/src/Jobs/HealthReport.hs
+++ b/src/Jobs/HealthReport.hs
@@ -7,6 +7,7 @@ module Jobs.HealthReport
import Import
import Data.List (genericLength)
+import qualified Data.Map.Strict as Map
import qualified Data.Aeson as Aeson
import Data.Proxy (Proxy(..))
@@ -27,6 +28,12 @@ import qualified Data.CaseInsensitive as CI
import qualified Network.HaskellNet.SMTP as SMTP
import Data.Pool (withResource)
+import System.Timeout
+
+import Jobs.Queue
+
+import Control.Concurrent.Async.Lifted.Safe (forConcurrently)
+
generateHealthReport :: HealthCheck -> Handler HealthReport
generateHealthReport = $(dispatchTH ''HealthCheck)
@@ -135,3 +142,26 @@ dispatchHealthCheckWidgetMemcached = HealthWidgetMemcached <$> do
(== content) . responseBody <$> httpLBS httpRequest
_other -> return False
+
+dispatchHealthCheckActiveJobExecutors :: Handler HealthReport
+dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do
+ app <- getYesod
+ jState <- atomically . tryReadTMVar $ appJobState app
+ let configuredNumber = app ^. _appJobWorkers
+ timeoutLength = app ^. _appHealthCheckActiveJobExecutorsTimeout
+ case jState of
+ Nothing
+ | configuredNumber == 0 -> return Nothing
+ Nothing -> return $ Just 0
+ Just JobState{jobWorkers, jobWorkerName} -> do
+ tid <- liftIO myThreadId
+ let workers' = Map.fromSet jobWorkerName (Map.keysSet jobWorkers)
+ workers = Map.filterWithKey (\a _ -> asyncThreadId a /= tid) workers'
+ timeoutMicro = let (MkFixed micro :: Micro) = realToFrac timeoutLength
+ in fromInteger micro
+ $logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers'
+ responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName)
+ -> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp)
+ if
+ | Map.null workers -> return Nothing
+ | otherwise -> return . Just $ responders % fromIntegral (Map.size workers)
diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs
index f0ddede48..8b71c2960 100644
--- a/src/Jobs/Queue.hs
+++ b/src/Jobs/Queue.hs
@@ -1,5 +1,6 @@
module Jobs.Queue
( writeJobCtl, writeJobCtlBlock
+ , writeJobCtl', writeJobCtlBlock'
, queueJob, queueJob'
, YesodJobDB
, runDBJobs, queueDBJob, sinkDBJobs
@@ -9,12 +10,14 @@ module Jobs.Queue
import Import hiding ((<>))
import Utils.Sql
+import Utils.Lens
import Jobs.Types
import Control.Monad.Trans.Writer (WriterT, runWriterT)
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Trans.Reader (ReaderT, mapReaderT)
+import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.HashMap.Strict as HashMap
@@ -27,39 +30,54 @@ import Data.Semigroup ((<>))
data JobQueueException = JobQueuePoolEmpty
+ | JobQueueWorkerNotFound
deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic)
instance Exception JobQueueException
-writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
+writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m ()
+-- | Pass an instruction to the given `Job`-Worker
+writeJobCtl' target cmd = do
+ JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar
+ if
+ | null jobWorkers
+ -> throwM JobQueuePoolEmpty
+ | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
+ -> atomically $ writeTChan chan cmd
+ | otherwise
+ -> throwM JobQueueWorkerNotFound
+
+writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers
--
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
writeJobCtl cmd = do
+ names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar
tid <- liftIO myThreadId
- wMap <- fmap jobWorkers $ getsYesod appJobState >>= atomically . readTMVar
- if
- | null wMap -> throwM JobQueuePoolEmpty
- | otherwise -> do
- let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap
- liftIO . atomically $ writeTChan chan cmd
+ let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names
+ writeJobCtl' target cmd
-writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
--- | Pass an instruction to the `Job`-Workers and block until it was acted upon
-writeJobCtlBlock cmd = do
- getResVar <- asks jobConfirm
- resVar <- liftIO . atomically $ do
+
+writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
+-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon
+writeJobCtlBlock' writeCtl cmd = do
+ getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar
+ resVar <- atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
- lift $ writeJobCtl cmd
+ writeCtl cmd
let
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
- mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
+ mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
+writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
+-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
+writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
+
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do
now <- liftIO getCurrentTime
@@ -83,7 +101,9 @@ queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
-queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
+queueJob' job = do
+ app <- getYesod
+ queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
@@ -102,5 +122,6 @@ runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -
-- Jobs get immediately executed if the transaction succeeds
runDBJobs act = do
(ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act
- forM_ jIds $ writeJobCtl . JobCtlPerform
+ app <- getYesod
+ forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform
return ret
diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs
index 9e4cbc56b..74fd7afe3 100644
--- a/src/Jobs/Types.hs
+++ b/src/Jobs/Types.hs
@@ -3,6 +3,7 @@ module Jobs.Types
, JobCtl(..)
, JobContext(..)
, JobState(..)
+ , jobWorkerNames
, JobWorkerId
, showWorkerId, newWorkerId
) where
@@ -16,6 +17,9 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Unique
+import qualified Data.Map.Strict as Map
+import qualified Data.Set as Set
+
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@@ -75,6 +79,7 @@ data JobCtl = JobCtlFlush
| JobCtlDetermineCrontab
| JobCtlQueue Job
| JobCtlGenerateHealthReport HealthCheck
+ | JobCtlNoOp
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl
@@ -103,3 +108,6 @@ data JobState = JobState
, jobCron :: Async ()
, jobShutdown :: TMVar ()
}
+
+jobWorkerNames :: JobState -> Set JobWorkerId
+jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers
diff --git a/src/Model/Types/Health.hs b/src/Model/Types/Health.hs
index aea99d735..ce0f53e23 100644
--- a/src/Model/Types/Health.hs
+++ b/src/Model/Types/Health.hs
@@ -15,6 +15,7 @@ data HealthCheck
| HealthCheckLDAPAdmins
| HealthCheckSMTPConnect
| HealthCheckWidgetMemcached
+ | HealthCheckActiveJobExecutors
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
instance Universe HealthCheck
instance Finite HealthCheck
@@ -39,6 +40,8 @@ data HealthReport
-- ^ Can we connect to the SMTP server and say @NOOP@?
| HealthWidgetMemcached { healthWidgetMemcached :: Maybe Bool }
-- ^ Can we store values in memcached and retrieve them via HTTP?
+ | HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational }
+ -- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
instance NFData HealthReport
@@ -57,6 +60,7 @@ classifyHealthReport HealthLDAPAdmins{} = HealthCheckLDAPAdmins
classifyHealthReport HealthHTTPReachable{} = HealthCheckHTTPReachable
classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect
classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached
+classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors
-- | `HealthReport` classified (`classifyHealthReport`) by badness
--
@@ -84,4 +88,6 @@ healthReportStatus = \case
| prop <= 0 -> HealthFailure
HealthSMTPConnect (Just False) -> HealthFailure
HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully?
+ HealthActiveJobExecutors (Just prop )
+ | prop < 1 -> HealthFailure
_other -> maxBound -- Minimum badness
diff --git a/src/Settings.hs b/src/Settings.hs
index c53e90269..191e1ca1d 100644
--- a/src/Settings.hs
+++ b/src/Settings.hs
@@ -118,6 +118,7 @@ data AppSettings = AppSettings
, appHealthCheckInterval :: HealthCheck -> Maybe NominalDiffTime
, appHealthCheckDelayNotify :: Bool
, appHealthCheckHTTP :: Bool
+ , appHealthCheckActiveJobExecutorsTimeout :: NominalDiffTime
, appInitialLogSettings :: LogSettings
@@ -389,6 +390,7 @@ instance FromJSON AppSettings where
appHealthCheckInterval <- (assertM' (> 0) . ) <$> o .: "health-check-interval"
appHealthCheckDelayNotify <- o .: "health-check-delay-notify"
appHealthCheckHTTP <- o .: "health-check-http"
+ appHealthCheckActiveJobExecutorsTimeout <- o .: "health-check-active-job-executors-timeout"
appSessionTimeout <- o .: "session-timeout"