Merge branch 'fix/jobs'

This commit is contained in:
Gregor Kleen 2019-07-24 11:14:48 +02:00
commit 17e1b98582
15 changed files with 383 additions and 203 deletions

View File

@ -36,8 +36,10 @@ health-check-interval:
ldap-admins: "_env:HEALTHCHECK_INTERVAL_LDAP_ADMINS:600"
smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600"
widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600"
active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60"
health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true"
health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)?
health-check-active-job-executors-timeout: "_env:HEALTHCHECK_ACTIVE_JOB_EXECUTORS_TIMEOUT:5"
log-settings:
detailed: "_env:DETAILED_LOGGING:false"

View File

@ -1061,6 +1061,7 @@ HealthHTTPReachable: Cluster kann an der erwarteten URL über HTTP erreicht werd
HealthLDAPAdmins: Anteil der Administratoren, die im LDAP-Verzeichnis gefunden werden können
HealthSMTPConnect: SMTP-Server kann erreicht werden
HealthWidgetMemcached: Memcached-Server liefert Widgets korrekt aus
HealthActiveJobExecutors: Anteil der job-workers, die neue Befehle annehmen
CourseParticipants n@Int: Derzeit #{n} angemeldete Kursteilnehmer
CourseParticipantsInvited n@Int: #{n} #{pluralDE n "Einladung" "Einladungen"} per E-Mail verschickt

View File

@ -38,8 +38,6 @@ import System.Log.FastLogger ( defaultBufSize, newStderrLoggerSet
import Handler.Utils (runAppLoggingT)
import qualified Data.Map.Strict as Map
import Foreign.Store
import qualified Data.UUID as UUID
@ -158,8 +156,7 @@ makeFoundation appSettings'@AppSettings{..} = do
appInstanceID <- liftIO $ maybe UUID.nextRandom (either readInstanceIDFile return) appInitialInstanceID
appJobCtl <- liftIO $ newTVarIO Map.empty
appCronThread <- liftIO newEmptyTMVarIO
appJobState <- liftIO newEmptyTMVarIO
appHealthReport <- liftIO $ newTVarIO Set.empty
-- We need a log function to create a connection pool. We need a connection
@ -371,7 +368,7 @@ develMain = runResourceT $ do
wsettings <- liftIO . getDevSettings $ warpSettings foundation
app <- makeApplication foundation
handleJobs foundation
runAppLoggingT foundation $ handleJobs foundation
liftIO . develMainHelper $ return (wsettings, app)
-- | The @main@ function for an executable running this site.
@ -471,7 +468,7 @@ getApplicationRepl :: (MonadResource m, MonadBaseControl IO m) => m (Int, UniWor
getApplicationRepl = do
settings <- getAppDevSettings
foundation <- makeFoundation settings
handleJobs foundation
runAppLoggingT foundation $ handleJobs foundation
wsettings <- liftIO . getDevSettings $ warpSettings foundation
app1 <- makeApplication foundation
@ -481,7 +478,7 @@ getApplicationRepl = do
return (getPort wsettings, foundation, app1)
shutdownApp :: MonadIO m => UniWorX -> m ()
shutdownApp :: (MonadIO m, MonadBaseControl IO m) => UniWorX -> m ()
shutdownApp app = do
stopJobCtl app
liftIO $ do

View File

@ -0,0 +1,15 @@
module Control.Concurrent.Async.Lifted.Safe.Utils
( allocateLinkedAsync
) where
import ClassyPrelude hiding (cancel)
import Control.Concurrent.Async.Lifted.Safe
import Control.Monad.Trans.Resource
allocateLinkedAsync :: forall m a.
MonadResource m
=> IO a -> m (Async a)
allocateLinkedAsync act = allocate (async act) cancel >>= (\(_k, a) -> a <$ link a)

View File

@ -110,8 +110,7 @@ data UniWorX = UniWorX
, appCryptoIDKey :: CryptoIDKey
, appClusterID :: ClusterId
, appInstanceID :: InstanceId
, appJobCtl :: TVar (Map ThreadId (TMChan JobCtl))
, appCronThread :: TMVar (ReleaseKey, ThreadId)
, appJobState :: TMVar JobState
, appSessionKey :: ClientSession.Key
, appSecretBoxKey :: SecretBox.Key
, appJSONWebKeySet :: Jose.JwkSet

View File

@ -70,6 +70,9 @@ getHealthR = do
$of HealthWidgetMemcached (Just passed)
<dt .deflist__dt>_{MsgHealthWidgetMemcached}
<dd .deflist__dd>#{boolSymbol passed}
$of HealthActiveJobExecutors (Just active)
<dt .deflist__dt>_{MsgHealthActiveJobExecutors}
<dd .deflist__dd>#{textPercent active 1}
$of _
|]
provideJson healthReports

View File

@ -45,7 +45,7 @@ import Data.Hashable as Import
import Data.List.NonEmpty as Import (NonEmpty(..), nonEmpty)
import Data.Text.Encoding.Error as Import(UnicodeException(..))
import Data.Semigroup as Import (Semigroup)
import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..))
import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..), Endo(..))
import Data.Binary as Import (Binary)
import Numeric.Natural as Import (Natural)

View File

@ -7,14 +7,12 @@ module Jobs
import Import
import Utils.Lens
import Handler.Utils
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Types (JobCtl(JobCtlQueue))
import Jobs.Queue
import Jobs.Crontab
import Data.Conduit.TMChan
import qualified Data.Conduit.List as C
import qualified Data.Text.Lazy as LT
@ -28,7 +26,7 @@ import Data.Semigroup (Max(..))
import Utils.Sql
import Control.Monad.Random (evalRand, mkStdGen, getRandomR)
import Control.Monad.Random (evalRand, mkStdGen, getRandomR, uniformMay)
import Cron
import qualified Data.HashMap.Strict as HashMap
@ -38,20 +36,26 @@ import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Map.Strict as Map
import Data.Map.Strict ((!))
import Data.Foldable (foldrM)
import Control.Monad.Trans.Reader (mapReaderT)
import Control.Monad.Trans.State (evalStateT, mapStateT)
import Control.Monad.Trans.Writer (execWriterT)
import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST)
import qualified Control.Monad.State.Class as State
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT, allocate, release)
import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT)
import Control.Monad.Trans.Maybe (MaybeT(..))
import Control.Monad.Trans.Cont (ContT(..), callCC)
import Control.Monad.Random.Lazy (evalRandTIO, mapRandT)
import Control.Monad.Logger
import Data.Time.Zones
import Control.Concurrent.STM (retry)
import Control.Concurrent.STM.Delay
import Jobs.Handler.SendNotification
@ -75,198 +79,267 @@ instance Exception JobQueueException
handleJobs :: ( MonadResource m
, MonadIO m
, MonadLoggerIO m
)
=> UniWorX -> m ()
-- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs foundation@UniWorX{..} = do
let num = foundation ^. _appJobWorkers
handleJobs foundation@UniWorX{..}
| foundation ^. _appJobWorkers == 0 = return ()
| otherwise = do
logger <- askLoggerIO
let runInIO = flip runLoggingT logger . runResourceT
jobCrontab <- liftIO $ newTMVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
jobPoolManager <- allocateLinkedAsync . runInIO $ manageJobPool foundation
forM_ [1..num] $ \n -> do
(bChan, chan) <- atomically $ newBroadcastTMChan >>= (\c -> (c, ) <$> dupTMChan c)
let
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId
doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n
(_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan)
atomically . modifyTVar' appJobCtl $ Map.insert tId bChan
jobCron <- allocateLinkedAsync . runInIO $ manageCrontab foundation
-- Start cron operation
when (num > 0) $ do
registeredCron <- liftIO newEmptyTMVarIO
let execCrontab' = whenM (atomically $ readTMVar registeredCron) $
runReaderT (execCrontab foundation) JobContext{..}
unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread
cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab)
registeredCron' <- atomically $ do
registeredCron' <- tryPutTMVar appCronThread cData
registeredCron' <$ putTMVar registeredCron registeredCron'
when registeredCron' $
liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $
writeJobCtlBlock JobCtlDetermineCrontab
let jobWorkers = Map.empty
jobWorkerName = const $ error "Unknown worker"
jobCrontab <- liftIO $ newTVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
jobShutdown <- liftIO newEmptyTMVarIO
atomically $ putTMVar appJobState JobState
{ jobContext = JobContext{..}
, ..
}
stopJobCtl :: MonadIO m => UniWorX -> m ()
manageJobPool, manageCrontab :: forall m.
( MonadResource m
, MonadLogger m
)
=> UniWorX -> m ()
manageCrontab foundation@UniWorX{..} = do
context <- atomically . fmap jobContext $ readTMVar appJobState
let awaitTermination = atomically $ do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard shouldTerminate
liftIO . race_ awaitTermination . unsafeHandler foundation . void $ do
atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
runReaderT ?? foundation $
writeJobCtlBlock JobCtlDetermineCrontab
evalRWST (forever execCrontab) context HashMap.empty
manageJobPool foundation@UniWorX{..}
= flip runContT return . forever . join . atomically $ asum
[ spawnMissingWorkers
, reapDeadWorkers
, terminateGracefully
]
where
num :: Int
num = fromIntegral $ foundation ^. _appJobWorkers
spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ())
spawnMissingWorkers = do
oldState <- takeTMVar appJobState
let missing = num - Map.size (jobWorkers oldState)
guard $ missing > 0
return $ do
$logDebugS "manageJobPool" [st|Spawning #{missing} workers|]
endo <- execWriterT . replicateM_ missing $ do
workerId <- newWorkerId
let logIdent = mkLogIdent workerId
(bChan, chan) <- atomically $ newBroadcastTChan >>= (\c -> (c, ) <$> dupTChan c)
let
streamChan = join . atomically $ do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
if
| shouldTerminate ->
return $ return ()
| otherwise -> do
nextVal <- readTChan chan
return $ yield nextVal >> streamChan
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
$logInfoS logIdent "Started"
runConduit $ streamChan .| handleJobs' workerId
$logInfoS logIdent "Stopped"
worker <- allocateLinkedAsync runWorker
tell . Endo $ \cSt -> cSt
{ jobWorkers = Map.insert worker bChan $ jobWorkers cSt
, jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker
}
atomically . putTMVar appJobState $ endo `appEndo` oldState
reapDeadWorkers = do
oldState <- takeTMVar appJobState
deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a
putTMVar appJobState oldState
{ jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers
}
guard . not $ Map.null deadWorkers
return . forM_ (Map.toList deadWorkers) $ \(jobAsync, result) -> do
case result of
Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|]
Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|]
void . lift . allocateLinkedAsync $
let go = do
next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do
nextVal <- MaybeT . lift . tryReadTChan $ jobWorkers oldState ! jobAsync
jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
return (nextVal, receiver)
whenIsJust next $ \(nextVal, receiver) -> do
atomically $ writeTChan receiver nextVal
go
in go
terminateGracefully = do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard shouldTerminate
return . callCC $ \terminate -> do
$logInfoS "JobPoolManager" "Shutting down"
terminate ()
stopJobCtl :: (MonadIO m, MonadBaseControl IO m) => UniWorX -> m ()
-- ^ Stop all worker threads currently running
stopJobCtl UniWorX{appJobCtl, appCronThread} = do
mcData <- atomically $ tryReadTMVar appCronThread
whenIsJust mcData $ \(rKey, _) -> do
liftIO $ release rKey
atomically . guardM $ isEmptyTMVar appCronThread
wMap <- liftIO $ readTVarIO appJobCtl
atomically $ forM_ wMap closeTMChan
atomically $ do
wMap' <- readTVar appJobCtl
guard . none (`Map.member` wMap') $ Map.keysSet wMap
stopJobCtl UniWorX{appJobState} = do
didStop <- atomically $ do
jState <- tryReadTMVar appJobState
for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown ()
whenIsJust didStop $ \jSt' -> void . fork . atomically $ do
workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState
mapM_ (void . waitCatchSTM) $
[ jobPoolManager jSt'
, jobCron jSt'
] ++ workers
execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m ()
execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWorX IO) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab foundation = evalStateT go HashMap.empty
where
go = do
cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do
mapStateT (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
execCrontab = do
mapRWST (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do
crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab
case crontab' of
Nothing -> return Nothing
Just crontab -> Just <$> do
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
(currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do
crontab <- liftBase . readTVar =<< asks jobCrontab
case currentState of
Nothing -> return False
Just (currentCrontab, (jobCtl, nextMatch)) -> do
let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do
newCrontab <- lift . lift . hoist lift $ determineCrontab'
if
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID' <- getsYesod appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
void . lift . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift . lift $ queueDBJob job
other -> writeJobCtl other
| otherwise
-> lift . mapReaderT (liftIO . atomically) $
lift . void . flip swapTMVar newCrontab =<< asks jobCrontab
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
return True
when cont go
where
acc :: NominalDiffTime
acc = 1e-3
debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
debouncingAcc AppSettings{appNotificationRateLimit} = \case
JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
_ -> acc
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = truncate $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
go' (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TMVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = runResourceT $ do
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
waitTime'
| diffT < acc = "Done"
| otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
$logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
let doJob = mapRWST (liftHandlerT . runDBJobs . setSerializable) $ do
newCrontab <- lift . hoist lift $ determineCrontab'
if
| diffT < acc -> return True
| otherwise -> do
retVar <- liftIO newEmptyTMVarIO
void . liftIO . forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar
let
awaitDelayThread = False <$ takeTMVar retVar
awaitCrontabChange = do
crontab' <- tryReadTMVar crontabTV
True <$ guard (Just crontab /= crontab')
crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
foundation <- getYesod
let instanceID' = foundation ^. _appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
void . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift $ queueDBJob job
other -> runReaderT ?? foundation $ writeJobCtl other
| otherwise
-> mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
where
acc :: NominalDiffTime
acc = 1e-3
handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) ()
handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
debouncingAcc AppSettings{appNotificationRateLimit} = \case
JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
_ -> acc
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = truncate $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
go' (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = runResourceT $ do
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
waitTime'
| diffT < acc = "Done"
| otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
$logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
if
| diffT < acc -> return True
| otherwise -> do
delay <- liftIO . newDelay . round $ waitTime * 1e6
let
awaitDelayThread = False <$ waitDelay delay
awaitCrontabChange = do
crontab' <- readTVar crontabTV
True <$ guard (crontab /= crontab')
crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
mkLogIdent :: JobWorkerId -> Text
mkLogIdent wId = "Job-Executor " <> showWorkerId wId
handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
resVars <- mapReaderT (liftIO . atomically) $
HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl
sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
sentRes <- mapReaderT (liftIO . atomically) $ do
resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
_other -> return ()
where
logIdent = "Jobs #" <> tshow wNum
logIdent = mkLogIdent wNum
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd JobCtlNoOp = return ()
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of
@ -285,7 +358,7 @@ handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab'
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . void . flip swapTMVar newCTab =<< asks jobCrontab
lift . void . flip swapTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport
newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind

View File

@ -7,6 +7,7 @@ module Jobs.HealthReport
import Import
import Data.List (genericLength)
import qualified Data.Map.Strict as Map
import qualified Data.Aeson as Aeson
import Data.Proxy (Proxy(..))
@ -27,6 +28,12 @@ import qualified Data.CaseInsensitive as CI
import qualified Network.HaskellNet.SMTP as SMTP
import Data.Pool (withResource)
import System.Timeout
import Jobs.Queue
import Control.Concurrent.Async.Lifted.Safe (forConcurrently)
generateHealthReport :: HealthCheck -> Handler HealthReport
generateHealthReport = $(dispatchTH ''HealthCheck)
@ -135,3 +142,26 @@ dispatchHealthCheckWidgetMemcached = HealthWidgetMemcached <$> do
(== content) . responseBody <$> httpLBS httpRequest
_other -> return False
dispatchHealthCheckActiveJobExecutors :: Handler HealthReport
dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do
app <- getYesod
jState <- atomically . tryReadTMVar $ appJobState app
let configuredNumber = app ^. _appJobWorkers
timeoutLength = app ^. _appHealthCheckActiveJobExecutorsTimeout
case jState of
Nothing
| configuredNumber == 0 -> return Nothing
Nothing -> return $ Just 0
Just JobState{jobWorkers, jobWorkerName} -> do
tid <- liftIO myThreadId
let workers' = Map.fromSet jobWorkerName (Map.keysSet jobWorkers)
workers = Map.filterWithKey (\a _ -> asyncThreadId a /= tid) workers'
timeoutMicro = let (MkFixed micro :: Micro) = realToFrac timeoutLength
in fromInteger micro
$logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers'
responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName)
-> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp)
if
| Map.null workers -> return Nothing
| otherwise -> return . Just $ responders % fromIntegral (Map.size workers)

View File

@ -1,5 +1,6 @@
module Jobs.Queue
( writeJobCtl, writeJobCtlBlock
, writeJobCtl', writeJobCtlBlock'
, queueJob, queueJob'
, YesodJobDB
, runDBJobs, queueDBJob, sinkDBJobs
@ -9,12 +10,14 @@ module Jobs.Queue
import Import hiding ((<>))
import Utils.Sql
import Utils.Lens
import Jobs.Types
import Control.Monad.Trans.Writer (WriterT, runWriterT)
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Trans.Reader (ReaderT, mapReaderT)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.HashMap.Strict as HashMap
@ -27,39 +30,54 @@ import Data.Semigroup ((<>))
data JobQueueException = JobQueuePoolEmpty
| JobQueueWorkerNotFound
deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic)
instance Exception JobQueueException
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m ()
-- | Pass an instruction to the given `Job`-Worker
writeJobCtl' target cmd = do
JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar
if
| null jobWorkers
-> throwM JobQueuePoolEmpty
| [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
-> atomically $ writeTChan chan cmd
| otherwise
-> throwM JobQueueWorkerNotFound
writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers
--
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
writeJobCtl cmd = do
names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar
tid <- liftIO myThreadId
wMap <- getsYesod appJobCtl >>= liftIO . readTVarIO
if
| null wMap -> throwM JobQueuePoolEmpty
| otherwise -> do
let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap
liftIO . atomically $ writeTMChan chan cmd
let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names
writeJobCtl' target cmd
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock cmd = do
getResVar <- asks jobConfirm
resVar <- liftIO . atomically $ do
writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon
writeJobCtlBlock' writeCtl cmd = do
getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar
resVar <- atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
lift $ writeJobCtl cmd
writeCtl cmd
let
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do
now <- liftIO getCurrentTime
@ -83,7 +101,9 @@ queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
queueJob' job = do
app <- getYesod
queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
@ -102,5 +122,6 @@ runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -
-- Jobs get immediately executed if the transaction succeeds
runDBJobs act = do
(ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act
forM_ jIds $ writeJobCtl . JobCtlPerform
app <- getYesod
forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform
return ret

View File

@ -2,15 +2,24 @@ module Jobs.Types
( Job(..), Notification(..)
, JobCtl(..)
, JobContext(..)
, JobState(..)
, jobWorkerNames
, JobWorkerId
, showWorkerId, newWorkerId
) where
import Import.NoFoundation
import Import.NoFoundation hiding (Unique)
import Data.Aeson (defaultOptions, Options(..), SumEncoding(..))
import Data.Aeson.TH (deriveJSON)
import Data.List.NonEmpty (NonEmpty)
import Data.Unique
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@ -70,12 +79,35 @@ data JobCtl = JobCtlFlush
| JobCtlDetermineCrontab
| JobCtlQueue Job
| JobCtlGenerateHealthReport HealthCheck
| JobCtlNoOp
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl
newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
deriving (Eq, Ord)
showWorkerId :: JobWorkerId -> Text
-- ^ Make a `JobWorkerId` somewhat human readable as a small-ish Number
showWorkerId = tshow . hashUnique . jobWorkerUnique
newWorkerId :: MonadIO m => m JobWorkerId
newWorkerId = JobWorkerId <$> liftIO newUnique
data JobContext = JobContext
{ jobCrontab :: TMVar (Crontab JobCtl)
{ jobCrontab :: TVar (Crontab JobCtl)
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
}
data JobState = JobState
{ jobWorkers :: Map (Async ()) (TChan JobCtl)
, jobWorkerName :: Async () -> JobWorkerId
, jobContext :: JobContext
, jobPoolManager :: Async ()
, jobCron :: Async ()
, jobShutdown :: TMVar ()
}
jobWorkerNames :: JobState -> Set JobWorkerId
jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers

View File

@ -15,6 +15,7 @@ data HealthCheck
| HealthCheckLDAPAdmins
| HealthCheckSMTPConnect
| HealthCheckWidgetMemcached
| HealthCheckActiveJobExecutors
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
instance Universe HealthCheck
instance Finite HealthCheck
@ -39,6 +40,8 @@ data HealthReport
-- ^ Can we connect to the SMTP server and say @NOOP@?
| HealthWidgetMemcached { healthWidgetMemcached :: Maybe Bool }
-- ^ Can we store values in memcached and retrieve them via HTTP?
| HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational }
-- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
instance NFData HealthReport
@ -57,6 +60,7 @@ classifyHealthReport HealthLDAPAdmins{} = HealthCheckLDAPAdmins
classifyHealthReport HealthHTTPReachable{} = HealthCheckHTTPReachable
classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect
classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached
classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors
-- | `HealthReport` classified (`classifyHealthReport`) by badness
--
@ -84,4 +88,6 @@ healthReportStatus = \case
| prop <= 0 -> HealthFailure
HealthSMTPConnect (Just False) -> HealthFailure
HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully?
HealthActiveJobExecutors (Just prop )
| prop < 1 -> HealthFailure
_other -> maxBound -- Minimum badness

View File

@ -118,6 +118,7 @@ data AppSettings = AppSettings
, appHealthCheckInterval :: HealthCheck -> Maybe NominalDiffTime
, appHealthCheckDelayNotify :: Bool
, appHealthCheckHTTP :: Bool
, appHealthCheckActiveJobExecutorsTimeout :: NominalDiffTime
, appInitialLogSettings :: LogSettings
@ -389,6 +390,7 @@ instance FromJSON AppSettings where
appHealthCheckInterval <- (assertM' (> 0) . ) <$> o .: "health-check-interval"
appHealthCheckDelayNotify <- o .: "health-check-delay-notify"
appHealthCheckHTTP <- o .: "health-check-http"
appHealthCheckActiveJobExecutorsTimeout <- o .: "health-check-active-job-executors-timeout"
appSessionTimeout <- o .: "session-timeout"

View File

@ -26,6 +26,7 @@ import Utils.Route as Utils
import Utils.Message as Utils
import Utils.Lang as Utils
import Utils.Parameters as Utils
import Control.Concurrent.Async.Lifted.Safe.Utils as Utils
import Text.Blaze (Markup, ToMarkup)

View File

@ -3,7 +3,7 @@ module TestImport
, module X
) where
import Application (makeFoundation, makeLogWare)
import Application (makeFoundation, makeLogWare, shutdownApp)
import ClassyPrelude as X hiding (delete, deleteBy, Handler, Index, (<.>), (<|), index, uncons, unsnoc, cons, snoc)
import Database.Persist as X hiding (get)
import Database.Persist.Sql as X (SqlPersistM)
@ -31,7 +31,7 @@ import Test.QuickCheck.Classes.Binary as X
import Data.Proxy as X
import Data.UUID as X (UUID)
import System.IO as X (hPrint, hPutStrLn, stderr)
import Jobs (handleJobs, stopJobCtl)
import Jobs (handleJobs)
import Numeric.Natural as X
import Control.Lens as X hiding ((<.), elements)
@ -42,7 +42,6 @@ import Database (truncateDb)
import Database as X (fillDb)
import Control.Monad.Trans.Resource (runResourceT, MonadResourceBase)
import Data.Pool (destroyAllResources)
import Settings
@ -51,6 +50,8 @@ import qualified Data.CaseInsensitive as CI
import Data.Typeable
import Handler.Utils (runAppLoggingT)
runDB :: SqlPersistM a -> YesodExample UniWorX a
runDB query = do
@ -74,13 +75,10 @@ withApp = around $ \act -> runResourceT $ do
[]
useEnv
foundation <- makeFoundation settings
let
stopDBAccess = do
stopJobCtl foundation
liftIO . destroyAllResources $ appConnPool foundation
bracket_ stopDBAccess (handleJobs foundation) $ wipeDB foundation
wipeDB foundation
runAppLoggingT foundation $ handleJobs foundation
logWare <- makeLogWare foundation
lift $ act (foundation, logWare)
lift $ act (foundation, logWare) `finally` shutdownApp foundation
-- This function will truncate all of the tables in your database.
-- 'withApp' calls it before each test, creating a clean environment for each