From 20686f185b49b790481876b4efae74148dc579c3 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 24 Jul 2019 08:21:31 +0200 Subject: [PATCH 1/3] refactor(jobs): switch to linked asyncs --- src/Application.hs | 9 +- .../Concurrent/Async/Lifted/Safe/Utils.hs | 15 + src/Foundation.hs | 3 +- src/Import/NoModel.hs | 2 +- src/Jobs.hs | 392 +++++++++++------- src/Jobs/Queue.hs | 4 +- src/Jobs/Types.hs | 28 +- src/Utils.hs | 1 + 8 files changed, 281 insertions(+), 173 deletions(-) create mode 100644 src/Control/Concurrent/Async/Lifted/Safe/Utils.hs diff --git a/src/Application.hs b/src/Application.hs index 3e20e6613..6e2d45fd7 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -38,8 +38,6 @@ import System.Log.FastLogger ( defaultBufSize, newStderrLoggerSet import Handler.Utils (runAppLoggingT) -import qualified Data.Map.Strict as Map - import Foreign.Store import qualified Data.UUID as UUID @@ -158,8 +156,7 @@ makeFoundation appSettings'@AppSettings{..} = do appInstanceID <- liftIO $ maybe UUID.nextRandom (either readInstanceIDFile return) appInitialInstanceID - appJobCtl <- liftIO $ newTVarIO Map.empty - appCronThread <- liftIO newEmptyTMVarIO + appJobState <- liftIO newEmptyTMVarIO appHealthReport <- liftIO $ newTVarIO Set.empty -- We need a log function to create a connection pool. We need a connection @@ -371,7 +368,7 @@ develMain = runResourceT $ do wsettings <- liftIO . getDevSettings $ warpSettings foundation app <- makeApplication foundation - handleJobs foundation + runAppLoggingT foundation $ handleJobs foundation liftIO . develMainHelper $ return (wsettings, app) -- | The @main@ function for an executable running this site. @@ -471,7 +468,7 @@ getApplicationRepl :: (MonadResource m, MonadBaseControl IO m) => m (Int, UniWor getApplicationRepl = do settings <- getAppDevSettings foundation <- makeFoundation settings - handleJobs foundation + runAppLoggingT foundation $ handleJobs foundation wsettings <- liftIO . getDevSettings $ warpSettings foundation app1 <- makeApplication foundation diff --git a/src/Control/Concurrent/Async/Lifted/Safe/Utils.hs b/src/Control/Concurrent/Async/Lifted/Safe/Utils.hs new file mode 100644 index 000000000..f7f395b64 --- /dev/null +++ b/src/Control/Concurrent/Async/Lifted/Safe/Utils.hs @@ -0,0 +1,15 @@ +module Control.Concurrent.Async.Lifted.Safe.Utils + ( allocateLinkedAsync + ) where + +import ClassyPrelude hiding (cancel) + +import Control.Concurrent.Async.Lifted.Safe + +import Control.Monad.Trans.Resource + + +allocateLinkedAsync :: forall m a. + MonadResource m + => IO a -> m (Async a) +allocateLinkedAsync act = allocate (async act) cancel >>= (\(_k, a) -> a <$ link a) diff --git a/src/Foundation.hs b/src/Foundation.hs index 8103ebfda..855f5c4a0 100644 --- a/src/Foundation.hs +++ b/src/Foundation.hs @@ -110,8 +110,7 @@ data UniWorX = UniWorX , appCryptoIDKey :: CryptoIDKey , appClusterID :: ClusterId , appInstanceID :: InstanceId - , appJobCtl :: TVar (Map ThreadId (TMChan JobCtl)) - , appCronThread :: TMVar (ReleaseKey, ThreadId) + , appJobState :: TMVar JobState , appSessionKey :: ClientSession.Key , appSecretBoxKey :: SecretBox.Key , appJSONWebKeySet :: Jose.JwkSet diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index cd1bd66c2..0c437f9e9 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -44,7 +44,7 @@ import Data.Hashable as Import import Data.List.NonEmpty as Import (NonEmpty(..), nonEmpty) import Data.Text.Encoding.Error as Import(UnicodeException(..)) import Data.Semigroup as Import (Semigroup) -import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..)) +import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..), Endo(..)) import Data.Binary as Import (Binary) import Numeric.Natural as Import (Natural) diff --git a/src/Jobs.hs b/src/Jobs.hs index 867718bab..3da01ebee 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -7,14 +7,12 @@ module Jobs import Import import Utils.Lens -import Handler.Utils import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Types (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab -import Data.Conduit.TMChan import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT @@ -28,7 +26,7 @@ import Data.Semigroup (Max(..)) import Utils.Sql -import Control.Monad.Random (evalRand, mkStdGen, getRandomR) +import Control.Monad.Random (evalRand, mkStdGen, getRandomR, uniformMay) import Cron import qualified Data.HashMap.Strict as HashMap @@ -38,20 +36,26 @@ import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.Map.Strict as Map +import Data.Map.Strict ((!)) import Data.Foldable (foldrM) import Control.Monad.Trans.Reader (mapReaderT) -import Control.Monad.Trans.State (evalStateT, mapStateT) +import Control.Monad.Trans.Writer (execWriterT) +import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST) import qualified Control.Monad.State.Class as State +import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Reader.Class (MonadReader(..)) -import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT, allocate, release) +import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT) import Control.Monad.Trans.Maybe (MaybeT(..)) +import Control.Monad.Trans.Cont (ContT(..), callCC) +import Control.Monad.Random.Lazy (evalRandTIO, mapRandT) import Control.Monad.Logger import Data.Time.Zones import Control.Concurrent.STM (retry) +import Control.Concurrent.STM.Delay import Jobs.Handler.SendNotification @@ -75,191 +79,259 @@ instance Exception JobQueueException handleJobs :: ( MonadResource m - , MonadIO m + , MonadLoggerIO m ) => UniWorX -> m () -- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... -handleJobs foundation@UniWorX{..} = do - let num = foundation ^. _appJobWorkers +handleJobs foundation@UniWorX{..} + | foundation ^. _appJobWorkers == 0 = return () + | otherwise = do + logger <- askLoggerIO + let runInIO = flip runLoggingT logger . runResourceT - jobCrontab <- liftIO $ newTMVarIO HashMap.empty - jobConfirm <- liftIO $ newTVarIO HashMap.empty + jobPoolManager <- allocateLinkedAsync . runInIO $ manageJobPool foundation - forM_ [1..num] $ \n -> do - (bChan, chan) <- atomically $ newBroadcastTMChan >>= (\c -> (c, ) <$> dupTMChan c) - let - logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" - logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" - removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId - doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n - (_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan) - atomically . modifyTVar' appJobCtl $ Map.insert tId bChan + jobCron <- allocateLinkedAsync . runInIO $ manageCrontab foundation - -- Start cron operation - when (num > 0) $ do - registeredCron <- liftIO newEmptyTMVarIO - let execCrontab' = whenM (atomically $ readTMVar registeredCron) $ - runReaderT (execCrontab foundation) JobContext{..} - unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread - cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab) - registeredCron' <- atomically $ do - registeredCron' <- tryPutTMVar appCronThread cData - registeredCron' <$ putTMVar registeredCron registeredCron' - when registeredCron' $ - liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $ - writeJobCtlBlock JobCtlDetermineCrontab + let jobWorkers = Map.empty + jobWorkerName = const $ error "Unknown worker" + jobCrontab <- liftIO $ newTVarIO HashMap.empty + jobConfirm <- liftIO $ newTVarIO HashMap.empty + jobShutdown <- liftIO newEmptyTMVarIO + atomically $ putTMVar appJobState JobState + { jobContext = JobContext{..} + , .. + } + +manageJobPool, manageCrontab :: forall m. + ( MonadResource m + , MonadLogger m + ) + => UniWorX -> m () +manageCrontab foundation@UniWorX{..} = do + context <- atomically . fmap jobContext $ readTMVar appJobState + liftIO . unsafeHandler foundation . void $ do + runReaderT ?? context $ + writeJobCtlBlock JobCtlDetermineCrontab + evalRWST execCrontab' context HashMap.empty + where + execCrontab' = do + shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + if + | shouldTerminate -> return () + | otherwise -> execCrontab *> execCrontab' + + +manageJobPool foundation@UniWorX{..} + = flip runContT return . forever . join . atomically $ asum + [ spawnMissingWorkers + , reapDeadWorkers + , terminateGracefully + ] + where + num :: Int + num = fromIntegral $ foundation ^. _appJobWorkers + + spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ()) + spawnMissingWorkers = do + oldState <- takeTMVar appJobState + let missing = num - Map.size (jobWorkers oldState) + guard $ missing > 0 + return $ do + $logDebugS "manageJobPool" [st|Spawning #{missing} workers|] + endo <- execWriterT . replicateM_ missing $ do + workerId <- newWorkerId + let logIdent = mkLogIdent workerId + (bChan, chan) <- atomically $ newBroadcastTChan >>= (\c -> (c, ) <$> dupTChan c) + let + streamChan = join . atomically $ do + shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + if + | shouldTerminate -> + return $ return () + | otherwise -> do + nextVal <- readTChan chan + return $ yield nextVal >> streamChan + runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do + $logInfoS logIdent "Started" + runConduit $ streamChan .| handleJobs' workerId + worker <- allocateLinkedAsync runWorker + + tell . Endo $ \cSt -> cSt + { jobWorkers = Map.insert worker bChan $ jobWorkers cSt + , jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker + } + atomically . putTMVar appJobState $ endo `appEndo` oldState + + reapDeadWorkers = do + oldState <- takeTMVar appJobState + deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a + putTMVar appJobState oldState + { jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers + } + guard . not $ Map.null deadWorkers + return . forM_ (Map.toList deadWorkers) $ \(jobAsync, result) -> do + case result of + Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|] + Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|] + void . lift . allocateLinkedAsync $ + let go = do + next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do + nextVal <- MaybeT . lift . tryReadTChan $ jobWorkers oldState ! jobAsync + jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState + receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers' + return (nextVal, receiver) + whenIsJust next $ \(nextVal, receiver) -> do + atomically $ writeTChan receiver nextVal + go + in go + + terminateGracefully = do + shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + guard shouldTerminate + return . callCC $ \terminate -> do + $logInfoS "JobPoolManager" "Shutting down" + terminate () stopJobCtl :: MonadIO m => UniWorX -> m () -- ^ Stop all worker threads currently running -stopJobCtl UniWorX{appJobCtl, appCronThread} = do - mcData <- atomically $ tryReadTMVar appCronThread - whenIsJust mcData $ \(rKey, _) -> do - liftIO $ release rKey - atomically . guardM $ isEmptyTMVar appCronThread - - wMap <- liftIO $ readTVarIO appJobCtl - atomically $ forM_ wMap closeTMChan +stopJobCtl UniWorX{appJobState} = do atomically $ do - wMap' <- readTVar appJobCtl - guard . none (`Map.member` wMap') $ Map.keysSet wMap + JobState{..} <- readTMVar appJobState + putTMVar jobShutdown () + atomically $ do + JobState{..} <- takeTMVar appJobState + mapM_ (void . waitCatchSTM) $ + [ jobPoolManager + , jobCron + ] ++ Map.keys jobWorkers - -execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m () +execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWorX IO) () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it -execCrontab foundation = evalStateT go HashMap.empty - where - go = do - cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do - mapStateT (liftHandlerT . runDB . setSerializable) $ do - let - merge (Entity leId CronLastExec{..}) - | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob - = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) - | otherwise = lift $ delete leId - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge +execCrontab = do + mapRWST (liftHandlerT . runDB . setSerializable) $ do + let + merge (Entity leId CronLastExec{..}) + | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob + = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) + | otherwise = lift $ delete leId + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge - refT <- liftIO getCurrentTime - settings <- getsYesod appSettings' - currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do - crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab - case crontab' of - Nothing -> return Nothing - Just crontab -> Just <$> do - State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab - prevExec <- State.get - case earliestJob settings prevExec crontab refT of - Nothing -> liftBase retry - Just (_, MatchNone) -> liftBase retry - Just x -> return (crontab, x) + refT <- liftIO getCurrentTime + settings <- getsYesod appSettings' + (currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do + crontab <- liftBase . readTVar =<< asks jobCrontab - case currentState of - Nothing -> return False - Just (currentCrontab, (jobCtl, nextMatch)) -> do - let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do - newCrontab <- lift . lift . hoist lift $ determineCrontab' - if - | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab - -> do - now <- liftIO $ getCurrentTime - instanceID' <- getsYesod appInstanceID - State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl - case jobCtl of - JobCtlQueue job -> do - void . lift . lift $ upsertBy - (UniqueCronLastExec $ toJSON job) - CronLastExec - { cronLastExecJob = toJSON job - , cronLastExecTime = now - , cronLastExecInstance = instanceID' - } - [ CronLastExecTime =. now ] - lift . lift $ queueDBJob job - other -> writeJobCtl other - | otherwise - -> lift . mapReaderT (liftIO . atomically) $ - lift . void . flip swapTMVar newCrontab =<< asks jobCrontab + State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab + prevExec <- State.get + case earliestJob settings prevExec crontab refT of + Nothing -> liftBase retry + Just (_, MatchNone) -> liftBase retry + Just x -> return (crontab, x) - case nextMatch of - MatchAsap -> doJob - MatchNone -> return () - MatchAt nextTime -> do - JobContext{jobCrontab} <- ask - nextTime' <- applyJitter jobCtl nextTime - $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] - logFunc <- askLoggerIO - whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') - doJob - - return True - when cont go - where - acc :: NominalDiffTime - acc = 1e-3 - - debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime - debouncingAcc AppSettings{appNotificationRateLimit} = \case - JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit - _ -> acc - - applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime - applyJitter seed t = do - appInstance <- getsYesod appInstanceID - let - halfRange = truncate $ 0.5 / acc - diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) - return $ addUTCTime diff t - - earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) - earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab - where - go' (jobCtl, cron) mbPrev - | Just (_, t') <- mbPrev - , t' < t - = mbPrev - | otherwise - = Just (jobCtl, t) - where - t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron - - waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TMVar a -> a -> UTCTime -> m Bool - waitUntil crontabTV crontab nextTime = runResourceT $ do - diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime - let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc - waitTime' - | diffT < acc = "Done" - | otherwise = tshow (realToFrac waitTime :: NominalDiffTime) - $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|] + let doJob = mapRWST (liftHandlerT . runDBJobs . setSerializable) $ do + newCrontab <- lift . hoist lift $ determineCrontab' if - | diffT < acc -> return True - | otherwise -> do - retVar <- liftIO newEmptyTMVarIO - void . liftIO . forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar - let - awaitDelayThread = False <$ takeTMVar retVar - awaitCrontabChange = do - crontab' <- tryReadTMVar crontabTV - True <$ guard (Just crontab /= crontab') - crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread - bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged + | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab + -> do + now <- liftIO $ getCurrentTime + instanceID' <- getsYesod appInstanceID + State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl + case jobCtl of + JobCtlQueue job -> do + void . lift $ upsertBy + (UniqueCronLastExec $ toJSON job) + CronLastExec + { cronLastExecJob = toJSON job + , cronLastExecTime = now + , cronLastExecInstance = instanceID' + } + [ CronLastExecTime =. now ] + lift $ queueDBJob job + other -> writeJobCtl other + | otherwise + -> mapRWST (liftIO . atomically) $ + liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab + case nextMatch of + MatchAsap -> doJob + MatchNone -> return () + MatchAt nextTime -> do + JobContext{jobCrontab} <- ask + nextTime' <- applyJitter jobCtl nextTime + $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] + logFunc <- askLoggerIO + whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') + doJob + where + acc :: NominalDiffTime + acc = 1e-3 -handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) () -handleJobs' foundation wNum = C.mapM_ $ \jctl -> do + debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime + debouncingAcc AppSettings{appNotificationRateLimit} = \case + JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit + _ -> acc + + applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime + applyJitter seed t = do + appInstance <- getsYesod appInstanceID + let + halfRange = truncate $ 0.5 / acc + diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) + return $ addUTCTime diff t + + earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) + earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab + where + go' (jobCtl, cron) mbPrev + | Just (_, t') <- mbPrev + , t' < t + = mbPrev + | otherwise + = Just (jobCtl, t) + where + t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron + + waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool + waitUntil crontabTV crontab nextTime = runResourceT $ do + diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime + let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc + waitTime' + | diffT < acc = "Done" + | otherwise = tshow (realToFrac waitTime :: NominalDiffTime) + $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|] + if + | diffT < acc -> return True + | otherwise -> do + delay <- liftIO . newDelay . round $ waitTime * 1e6 + let + awaitDelayThread = False <$ waitDelay delay + awaitCrontabChange = do + crontab' <- readTVar crontabTV + True <$ guard (crontab /= crontab') + crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread + bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged + +mkLogIdent :: JobWorkerId -> Text +mkLogIdent wId = "Job-Executor " <> showWorkerId wId + +handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) () +handleJobs' wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent $ tshow jctl resVars <- mapReaderT (liftIO . atomically) $ HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) - res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl + res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err | not sentRes -> $logErrorS logIdent $ tshow err _other -> return () where - logIdent = "Jobs #" <> tshow wNum + logIdent = mkLogIdent wNum handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j @@ -285,7 +357,7 @@ handleJobs' foundation wNum = C.mapM_ $ \jctl -> do newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ - lift . void . flip swapTMVar newCTab =<< asks jobCrontab + lift . void . flip swapTVar newCTab =<< asks jobCrontab handleCmd (JobCtlGenerateHealthReport kind) = do hrStorage <- getsYesod appHealthReport newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index 8152ffbfb..f0ddede48 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -39,12 +39,12 @@ writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () -- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others writeJobCtl cmd = do tid <- liftIO myThreadId - wMap <- getsYesod appJobCtl >>= liftIO . readTVarIO + wMap <- fmap jobWorkers $ getsYesod appJobState >>= atomically . readTMVar if | null wMap -> throwM JobQueuePoolEmpty | otherwise -> do let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap - liftIO . atomically $ writeTMChan chan cmd + liftIO . atomically $ writeTChan chan cmd writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m () -- | Pass an instruction to the `Job`-Workers and block until it was acted upon diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 3522ff802..9e4cbc56b 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -2,15 +2,20 @@ module Jobs.Types ( Job(..), Notification(..) , JobCtl(..) , JobContext(..) + , JobState(..) + , JobWorkerId + , showWorkerId, newWorkerId ) where -import Import.NoFoundation +import Import.NoFoundation hiding (Unique) import Data.Aeson (defaultOptions, Options(..), SumEncoding(..)) import Data.Aeson.TH (deriveJSON) import Data.List.NonEmpty (NonEmpty) +import Data.Unique + data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } | JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext } @@ -75,7 +80,26 @@ data JobCtl = JobCtlFlush instance Hashable JobCtl +newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique } + deriving (Eq, Ord) + +showWorkerId :: JobWorkerId -> Text +-- ^ Make a `JobWorkerId` somewhat human readable as a small-ish Number +showWorkerId = tshow . hashUnique . jobWorkerUnique + +newWorkerId :: MonadIO m => m JobWorkerId +newWorkerId = JobWorkerId <$> liftIO newUnique + data JobContext = JobContext - { jobCrontab :: TMVar (Crontab JobCtl) + { jobCrontab :: TVar (Crontab JobCtl) , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) } + +data JobState = JobState + { jobWorkers :: Map (Async ()) (TChan JobCtl) + , jobWorkerName :: Async () -> JobWorkerId + , jobContext :: JobContext + , jobPoolManager :: Async () + , jobCron :: Async () + , jobShutdown :: TMVar () + } diff --git a/src/Utils.hs b/src/Utils.hs index 7fbe88857..1792d9af8 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -26,6 +26,7 @@ import Utils.Route as Utils import Utils.Message as Utils import Utils.Lang as Utils import Utils.Parameters as Utils +import Control.Concurrent.Async.Lifted.Safe.Utils as Utils import Text.Blaze (Markup, ToMarkup) From d1abe530b60939f69289b60216f52eab7e7ba6a4 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 24 Jul 2019 09:41:17 +0200 Subject: [PATCH 2/3] feat(health): check for active job workers --- config/settings.yml | 2 ++ messages/uniworx/de.msg | 1 + src/Handler/Health.hs | 3 +++ src/Jobs.hs | 16 ++++++------ src/Jobs/HealthReport.hs | 30 ++++++++++++++++++++++ src/Jobs/Queue.hs | 53 +++++++++++++++++++++++++++------------ src/Jobs/Types.hs | 8 ++++++ src/Model/Types/Health.hs | 6 +++++ src/Settings.hs | 2 ++ 9 files changed, 98 insertions(+), 23 deletions(-) diff --git a/config/settings.yml b/config/settings.yml index edd971e64..d35732623 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -36,8 +36,10 @@ health-check-interval: ldap-admins: "_env:HEALTHCHECK_INTERVAL_LDAP_ADMINS:600" smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600" widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600" + active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60" health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true" health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)? +health-check-active-job-executors-timeout: "_env:HEALTHCHECK_ACTIVE_JOB_EXECUTORS_TIMEOUT:5" log-settings: detailed: "_env:DETAILED_LOGGING:false" diff --git a/messages/uniworx/de.msg b/messages/uniworx/de.msg index 99ed87ddf..563aede8a 100644 --- a/messages/uniworx/de.msg +++ b/messages/uniworx/de.msg @@ -1061,6 +1061,7 @@ HealthHTTPReachable: Cluster kann an der erwarteten URL über HTTP erreicht werd HealthLDAPAdmins: Anteil der Administratoren, die im LDAP-Verzeichnis gefunden werden können HealthSMTPConnect: SMTP-Server kann erreicht werden HealthWidgetMemcached: Memcached-Server liefert Widgets korrekt aus +HealthActiveJobExecutors: Anteil der job-workers, die neue Befehle annehmen CourseParticipants n@Int: Derzeit #{n} angemeldete Kursteilnehmer CourseParticipantsInvited n@Int: #{n} #{pluralDE n "Einladung" "Einladungen"} per E-Mail verschickt diff --git a/src/Handler/Health.hs b/src/Handler/Health.hs index 7b29e2bbd..36649a436 100644 --- a/src/Handler/Health.hs +++ b/src/Handler/Health.hs @@ -70,6 +70,9 @@ getHealthR = do $of HealthWidgetMemcached (Just passed)
_{MsgHealthWidgetMemcached}
#{boolSymbol passed} + $of HealthActiveJobExecutors (Just active) +
_{MsgHealthActiveJobExecutors} +
#{textPercent active 1} $of _ |] provideJson healthReports diff --git a/src/Jobs.hs b/src/Jobs.hs index 3da01ebee..f8cdb2ee5 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -114,7 +114,7 @@ manageJobPool, manageCrontab :: forall m. manageCrontab foundation@UniWorX{..} = do context <- atomically . fmap jobContext $ readTMVar appJobState liftIO . unsafeHandler foundation . void $ do - runReaderT ?? context $ + runReaderT ?? foundation $ writeJobCtlBlock JobCtlDetermineCrontab evalRWST execCrontab' context HashMap.empty where @@ -239,7 +239,8 @@ execCrontab = do | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab -> do now <- liftIO $ getCurrentTime - instanceID' <- getsYesod appInstanceID + foundation <- getYesod + let instanceID' = foundation ^. _appInstanceID State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl case jobCtl of JobCtlQueue job -> do @@ -252,7 +253,7 @@ execCrontab = do } [ CronLastExecTime =. now ] lift $ queueDBJob job - other -> writeJobCtl other + other -> runReaderT ?? foundation $ writeJobCtl other | otherwise -> mapRWST (liftIO . atomically) $ liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab @@ -322,10 +323,10 @@ mkLogIdent wId = "Job-Executor " <> showWorkerId wId handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) () handleJobs' wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent $ tshow jctl - resVars <- mapReaderT (liftIO . atomically) $ - HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl - sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) + sentRes <- mapReaderT (liftIO . atomically) $ do + resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) + lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err | not sentRes -> $logErrorS logIdent $ tshow err @@ -338,7 +339,8 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) - handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) + handleCmd JobCtlNoOp = return () + handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform) handleCmd (JobCtlQueue job) = lift $ queueJob' job handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of diff --git a/src/Jobs/HealthReport.hs b/src/Jobs/HealthReport.hs index bf65049f9..6aecd01f6 100644 --- a/src/Jobs/HealthReport.hs +++ b/src/Jobs/HealthReport.hs @@ -7,6 +7,7 @@ module Jobs.HealthReport import Import import Data.List (genericLength) +import qualified Data.Map.Strict as Map import qualified Data.Aeson as Aeson import Data.Proxy (Proxy(..)) @@ -27,6 +28,12 @@ import qualified Data.CaseInsensitive as CI import qualified Network.HaskellNet.SMTP as SMTP import Data.Pool (withResource) +import System.Timeout + +import Jobs.Queue + +import Control.Concurrent.Async.Lifted.Safe (forConcurrently) + generateHealthReport :: HealthCheck -> Handler HealthReport generateHealthReport = $(dispatchTH ''HealthCheck) @@ -135,3 +142,26 @@ dispatchHealthCheckWidgetMemcached = HealthWidgetMemcached <$> do (== content) . responseBody <$> httpLBS httpRequest _other -> return False + +dispatchHealthCheckActiveJobExecutors :: Handler HealthReport +dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do + app <- getYesod + jState <- atomically . tryReadTMVar $ appJobState app + let configuredNumber = app ^. _appJobWorkers + timeoutLength = app ^. _appHealthCheckActiveJobExecutorsTimeout + case jState of + Nothing + | configuredNumber == 0 -> return Nothing + Nothing -> return $ Just 0 + Just JobState{jobWorkers, jobWorkerName} -> do + tid <- liftIO myThreadId + let workers' = Map.fromSet jobWorkerName (Map.keysSet jobWorkers) + workers = Map.filterWithKey (\a _ -> asyncThreadId a /= tid) workers' + timeoutMicro = let (MkFixed micro :: Micro) = realToFrac timeoutLength + in fromInteger micro + $logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers' + responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName) + -> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp) + if + | Map.null workers -> return Nothing + | otherwise -> return . Just $ responders % fromIntegral (Map.size workers) diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index f0ddede48..8b71c2960 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -1,5 +1,6 @@ module Jobs.Queue ( writeJobCtl, writeJobCtlBlock + , writeJobCtl', writeJobCtlBlock' , queueJob, queueJob' , YesodJobDB , runDBJobs, queueDBJob, sinkDBJobs @@ -9,12 +10,14 @@ module Jobs.Queue import Import hiding ((<>)) import Utils.Sql +import Utils.Lens import Jobs.Types import Control.Monad.Trans.Writer (WriterT, runWriterT) import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Reader (ReaderT, mapReaderT) +import qualified Data.Map.Strict as Map import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.HashMap.Strict as HashMap @@ -27,39 +30,54 @@ import Data.Semigroup ((<>)) data JobQueueException = JobQueuePoolEmpty + | JobQueueWorkerNotFound deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic) instance Exception JobQueueException -writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () +writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m () +-- | Pass an instruction to the given `Job`-Worker +writeJobCtl' target cmd = do + JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar + if + | null jobWorkers + -> throwM JobQueuePoolEmpty + | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers + -> atomically $ writeTChan chan cmd + | otherwise + -> throwM JobQueueWorkerNotFound + +writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () -- | Pass an instruction to the `Job`-Workers -- -- Instructions are assigned deterministically and pseudo-randomly to one specific worker. -- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others writeJobCtl cmd = do + names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar tid <- liftIO myThreadId - wMap <- fmap jobWorkers $ getsYesod appJobState >>= atomically . readTMVar - if - | null wMap -> throwM JobQueuePoolEmpty - | otherwise -> do - let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap - liftIO . atomically $ writeTChan chan cmd + let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names + writeJobCtl' target cmd -writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m () --- | Pass an instruction to the `Job`-Workers and block until it was acted upon -writeJobCtlBlock cmd = do - getResVar <- asks jobConfirm - resVar <- liftIO . atomically $ do + +writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m () +-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon +writeJobCtlBlock' writeCtl cmd = do + getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar + resVar <- atomically $ do var <- newEmptyTMVar modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) return var - lift $ writeJobCtl cmd + writeCtl cmd let removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd - mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar + mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar maybe (return ()) throwM mExc +writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () +-- | Pass an instruction to the `Job`-Workers and block until it was acted upon +writeJobCtlBlock = writeJobCtlBlock' writeJobCtl + queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId queueJobUnsafe job = do now <- liftIO getCurrentTime @@ -83,7 +101,9 @@ queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m () -- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap -queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform +queueJob' job = do + app <- getYesod + queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform -- | Slightly modified Version of `YesodDB` for `runDBJobs` type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO)) @@ -102,5 +122,6 @@ runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a - -- Jobs get immediately executed if the transaction succeeds runDBJobs act = do (ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act - forM_ jIds $ writeJobCtl . JobCtlPerform + app <- getYesod + forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform return ret diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 9e4cbc56b..74fd7afe3 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -3,6 +3,7 @@ module Jobs.Types , JobCtl(..) , JobContext(..) , JobState(..) + , jobWorkerNames , JobWorkerId , showWorkerId, newWorkerId ) where @@ -16,6 +17,9 @@ import Data.List.NonEmpty (NonEmpty) import Data.Unique +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set + data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } | JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext } @@ -75,6 +79,7 @@ data JobCtl = JobCtlFlush | JobCtlDetermineCrontab | JobCtlQueue Job | JobCtlGenerateHealthReport HealthCheck + | JobCtlNoOp deriving (Eq, Ord, Read, Show, Generic, Typeable) instance Hashable JobCtl @@ -103,3 +108,6 @@ data JobState = JobState , jobCron :: Async () , jobShutdown :: TMVar () } + +jobWorkerNames :: JobState -> Set JobWorkerId +jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers diff --git a/src/Model/Types/Health.hs b/src/Model/Types/Health.hs index aea99d735..ce0f53e23 100644 --- a/src/Model/Types/Health.hs +++ b/src/Model/Types/Health.hs @@ -15,6 +15,7 @@ data HealthCheck | HealthCheckLDAPAdmins | HealthCheckSMTPConnect | HealthCheckWidgetMemcached + | HealthCheckActiveJobExecutors deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) instance Universe HealthCheck instance Finite HealthCheck @@ -39,6 +40,8 @@ data HealthReport -- ^ Can we connect to the SMTP server and say @NOOP@? | HealthWidgetMemcached { healthWidgetMemcached :: Maybe Bool } -- ^ Can we store values in memcached and retrieve them via HTTP? + | HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational } + -- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout deriving (Eq, Ord, Read, Show, Data, Generic, Typeable) instance NFData HealthReport @@ -57,6 +60,7 @@ classifyHealthReport HealthLDAPAdmins{} = HealthCheckLDAPAdmins classifyHealthReport HealthHTTPReachable{} = HealthCheckHTTPReachable classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached +classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors -- | `HealthReport` classified (`classifyHealthReport`) by badness -- @@ -84,4 +88,6 @@ healthReportStatus = \case | prop <= 0 -> HealthFailure HealthSMTPConnect (Just False) -> HealthFailure HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully? + HealthActiveJobExecutors (Just prop ) + | prop < 1 -> HealthFailure _other -> maxBound -- Minimum badness diff --git a/src/Settings.hs b/src/Settings.hs index c53e90269..191e1ca1d 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -118,6 +118,7 @@ data AppSettings = AppSettings , appHealthCheckInterval :: HealthCheck -> Maybe NominalDiffTime , appHealthCheckDelayNotify :: Bool , appHealthCheckHTTP :: Bool + , appHealthCheckActiveJobExecutorsTimeout :: NominalDiffTime , appInitialLogSettings :: LogSettings @@ -389,6 +390,7 @@ instance FromJSON AppSettings where appHealthCheckInterval <- (assertM' (> 0) . ) <$> o .: "health-check-interval" appHealthCheckDelayNotify <- o .: "health-check-delay-notify" appHealthCheckHTTP <- o .: "health-check-http" + appHealthCheckActiveJobExecutorsTimeout <- o .: "health-check-active-job-executors-timeout" appSessionTimeout <- o .: "session-timeout" From da59a2f9da0c3500ded597e9aa801cd97add945a Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 24 Jul 2019 11:12:47 +0200 Subject: [PATCH 3/3] chore(tests): fix tests --- src/Application.hs | 2 +- src/Jobs.hs | 33 ++++++++++++++++----------------- test/TestImport.hs | 16 +++++++--------- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/Application.hs b/src/Application.hs index 6e2d45fd7..7291fda1c 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -478,7 +478,7 @@ getApplicationRepl = do return (getPort wsettings, foundation, app1) -shutdownApp :: MonadIO m => UniWorX -> m () +shutdownApp :: (MonadIO m, MonadBaseControl IO m) => UniWorX -> m () shutdownApp app = do stopJobCtl app liftIO $ do diff --git a/src/Jobs.hs b/src/Jobs.hs index f8cdb2ee5..4769178ff 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -113,16 +113,14 @@ manageJobPool, manageCrontab :: forall m. => UniWorX -> m () manageCrontab foundation@UniWorX{..} = do context <- atomically . fmap jobContext $ readTMVar appJobState - liftIO . unsafeHandler foundation . void $ do + let awaitTermination = atomically $ do + shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + guard shouldTerminate + liftIO . race_ awaitTermination . unsafeHandler foundation . void $ do + atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState runReaderT ?? foundation $ writeJobCtlBlock JobCtlDetermineCrontab - evalRWST execCrontab' context HashMap.empty - where - execCrontab' = do - shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown - if - | shouldTerminate -> return () - | otherwise -> execCrontab *> execCrontab' + evalRWST (forever execCrontab) context HashMap.empty manageJobPool foundation@UniWorX{..} @@ -158,6 +156,7 @@ manageJobPool foundation@UniWorX{..} runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId + $logInfoS logIdent "Stopped" worker <- allocateLinkedAsync runWorker tell . Endo $ \cSt -> cSt @@ -196,18 +195,18 @@ manageJobPool foundation@UniWorX{..} $logInfoS "JobPoolManager" "Shutting down" terminate () -stopJobCtl :: MonadIO m => UniWorX -> m () +stopJobCtl :: (MonadIO m, MonadBaseControl IO m) => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobState} = do - atomically $ do - JobState{..} <- readTMVar appJobState - putTMVar jobShutdown () - atomically $ do - JobState{..} <- takeTMVar appJobState + didStop <- atomically $ do + jState <- tryReadTMVar appJobState + for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown () + whenIsJust didStop $ \jSt' -> void . fork . atomically $ do + workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState mapM_ (void . waitCatchSTM) $ - [ jobPoolManager - , jobCron - ] ++ Map.keys jobWorkers + [ jobPoolManager jSt' + , jobCron jSt' + ] ++ workers execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWorX IO) () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have diff --git a/test/TestImport.hs b/test/TestImport.hs index 9164c3144..48e0b5d27 100644 --- a/test/TestImport.hs +++ b/test/TestImport.hs @@ -3,7 +3,7 @@ module TestImport , module X ) where -import Application (makeFoundation, makeLogWare) +import Application (makeFoundation, makeLogWare, shutdownApp) import ClassyPrelude as X hiding (delete, deleteBy, Handler, Index, (<.>), (<|), index, uncons, unsnoc, cons, snoc) import Database.Persist as X hiding (get) import Database.Persist.Sql as X (SqlPersistM) @@ -31,7 +31,7 @@ import Test.QuickCheck.Classes.Binary as X import Data.Proxy as X import Data.UUID as X (UUID) import System.IO as X (hPrint, hPutStrLn, stderr) -import Jobs (handleJobs, stopJobCtl) +import Jobs (handleJobs) import Numeric.Natural as X import Control.Lens as X hiding ((<.), elements) @@ -42,7 +42,6 @@ import Database (truncateDb) import Database as X (fillDb) import Control.Monad.Trans.Resource (runResourceT, MonadResourceBase) -import Data.Pool (destroyAllResources) import Settings @@ -51,6 +50,8 @@ import qualified Data.CaseInsensitive as CI import Data.Typeable +import Handler.Utils (runAppLoggingT) + runDB :: SqlPersistM a -> YesodExample UniWorX a runDB query = do @@ -74,13 +75,10 @@ withApp = around $ \act -> runResourceT $ do [] useEnv foundation <- makeFoundation settings - let - stopDBAccess = do - stopJobCtl foundation - liftIO . destroyAllResources $ appConnPool foundation - bracket_ stopDBAccess (handleJobs foundation) $ wipeDB foundation + wipeDB foundation + runAppLoggingT foundation $ handleJobs foundation logWare <- makeLogWare foundation - lift $ act (foundation, logWare) + lift $ act (foundation, logWare) `finally` shutdownApp foundation -- This function will truncate all of the tables in your database. -- 'withApp' calls it before each test, creating a clean environment for each