From 19b8b0616f306b57390b7a6e26023e8d59aa1239 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 21 Jul 2020 15:52:45 +0200 Subject: [PATCH] fix: shutdown behaviour & tests --- src/Application.hs | 6 +- src/Import/NoModel.hs | 2 +- src/Jobs.hs | 115 +++++++++++++++++++------------- src/Utils/Failover.hs | 11 +++ test/Handler/CorrectionsSpec.hs | 4 +- test/TestImport.hs | 18 ++--- 6 files changed, 94 insertions(+), 62 deletions(-) diff --git a/src/Application.hs b/src/Application.hs index 205c56a07..d6f72c080 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -587,9 +587,11 @@ shutdownApp :: (MonadIO m, MonadUnliftIO m) => UniWorX -> m () shutdownApp app = do stopJobCtl app liftIO $ do - for_ (appWidgetMemcached app) Memcached.close - for_ (appSmtpPool app) destroyAllResources destroyAllResources $ appConnPool app + for_ (appSmtpPool app) destroyAllResources + for_ (appLdapPool app) . mapFailover $ views _2 destroyAllResources + for_ (appWidgetMemcached app) Memcached.close + for_ (appMemcached app) $ views _2 Memcached.close release . fst $ appLogger app liftIO $ threadDelay 1e6 diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index 9f26ad2cf..bbb67eb6b 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -73,7 +73,7 @@ import Data.Ratio as Import ((%)) import Net.IP as Import (IP) -import Database.Persist.Sql as Import (SqlReadT, SqlWriteT, IsSqlBackend, fromSqlKey, toSqlKey) +import Database.Persist.Sql as Import (SqlReadBackend, SqlReadT, SqlWriteT, IsSqlBackend, fromSqlKey, toSqlKey) import Ldap.Client.Pool as Import diff --git a/src/Jobs.hs b/src/Jobs.hs index 2833f6091..5faf681e0 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -44,7 +44,7 @@ import Data.Time.Zones import Control.Concurrent.STM (retry) import Control.Concurrent.STM.Delay -import UnliftIO.Concurrent (forkIO) +import UnliftIO.Concurrent (forkIO, myThreadId) import Jobs.Handler.SendNotification @@ -90,9 +90,9 @@ handleJobs :: ( MonadResource m handleJobs foundation@UniWorX{..} | foundation ^. _appJobWorkers == 0 = return () | otherwise = do - jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> manageJobPool foundation unmask + jobPoolManager <- allocateLinkedAsyncWithUnmask $ manageJobPool foundation - jobCron <- allocateLinkedAsync $ manageCrontab foundation + jobCron <- allocateLinkedAsyncWithUnmask $ manageCrontab foundation let jobWorkers = Map.empty jobWorkerName = const $ error "Unknown worker" @@ -105,18 +105,29 @@ handleJobs foundation@UniWorX{..} } manageCrontab :: forall m. - MonadResource m - => UniWorX -> m () -manageCrontab foundation@UniWorX{..} = do - context <- atomically . fmap jobContext $ readTMVar appJobState - let awaitTermination = atomically $ do - shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown - guard shouldTerminate - liftIO . race_ awaitTermination . unsafeHandler foundation . void $ do - atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState - runReaderT ?? foundation $ - writeJobCtlBlock JobCtlDetermineCrontab - evalRWST (forever execCrontab) context HashMap.empty + ( MonadResource m + , MonadUnliftIO m + ) + => UniWorX -> (forall a. m a -> m a) -> m () +manageCrontab foundation@UniWorX{..} unmask = do + ch <- allocateLinkedAsync $ do + context <- atomically . fmap jobContext $ readTMVar appJobState + liftIO . unsafeHandler foundation . void $ do + atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState + runReaderT ?? foundation $ + writeJobCtlBlock JobCtlDetermineCrontab + void $ evalRWST (forever execCrontab) context HashMap.empty + + let awaitTermination = guardM $ + readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + + handleAny (throwTo $ asyncThreadId ch) . unmask $ do + doCancel <- atomically $ asum + [ False <$ waitSTM ch + , True <$ awaitTermination + ] + when doCancel $ + cancel ch manageJobPool :: forall m. ( MonadResource m @@ -125,18 +136,25 @@ manageJobPool :: forall m. , MonadMask m ) => UniWorX -> (forall a. m a -> m a) -> m () -manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ - flip runContT return . forever . join . atomically $ asum - [ spawnMissingWorkers - , reapDeadWorkers - , terminateGracefully - ] +manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> + flip runContT return . callCC $ \terminate' -> + forever . join . lift . routeExc . atomically $ asum + [ spawnMissingWorkers + , reapDeadWorkers + , terminateGracefully terminate' + ] where - shutdownOnException :: m a -> m a + shutdownOnException :: ((forall m'. Monad m' => m (m' ()) -> m (m' ())) -> m a) -> m a shutdownOnException act = do - actAsync <- allocateLinkedAsyncMasked act + me <- myThreadId + let + routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ()) + routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask' + + actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask') let handleExc e = do + $logWarnS "manageJobPool" [st|Received exception: #{displayException e}|] atomically $ do jState <- tryReadTMVar appJobState for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown () @@ -149,7 +167,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ num :: Int num = fromIntegral $ foundation ^. _appJobWorkers - spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ()) + spawnMissingWorkers, reapDeadWorkers :: STM (ContT () m ()) spawnMissingWorkers = do shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard $ not shouldTerminate' @@ -178,8 +196,8 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId - $logInfoS logIdent "Stopped" - worker <- lift . lift . allocateLinkedAsync $ liftIO runWorker + $logInfoS logIdent "Stopping" + worker <- lift . lift . allocateAsync $ liftIO runWorker tell . Endo $ \cSt -> cSt { jobWorkers = Map.insert worker chan $ jobWorkers cSt @@ -212,14 +230,14 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ go in go - terminateGracefully = do + terminateGracefully terminate = do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard shouldTerminate oldState <- takeTMVar appJobState guard $ 0 == Map.size (jobWorkers oldState) - return . callCC $ \terminate -> do + return $ do $logInfoS "JobPoolManager" "Shutting down" terminate () @@ -255,7 +273,7 @@ execCrontab = do | otherwise = return () runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued - mapRWST (liftHandler . runDB . setSerializable) $ mergeState + mapRWST (liftHandler . runDB . setSerializable) mergeState refT <- liftIO getCurrentTime settings <- getsYesod appSettings' @@ -275,26 +293,26 @@ execCrontab = do $logDebugS "Crontab" . intercalate "\n" . map tshow . sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do - newCrontab <- lift . hoist lift $ determineCrontab' - when (newCrontab /= currentCrontab) $ - mapRWST (liftIO . atomically) $ - liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab + newCrontab <- lift $ hoist lift determineCrontab' + when (newCrontab /= currentCrontab) $ + mapRWST (liftIO . atomically) $ + liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab - mergeState - newState <- State.get - - let upToDate = and - [ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab - , ((==) `on` HashMap.lookup jobCtl) newState currentState - ] - when upToDate $ do - now <- liftIO $ getCurrentTime - foundation <- getYesod - State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl - case jobCtl of - JobCtlQueue job -> lift $ queueDBJobCron job - other -> runReaderT ?? foundation $ writeJobCtl other + mergeState + newState <- State.get + let upToDate = and + [ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab + , ((==) `on` HashMap.lookup jobCtl) newState currentState + ] + when upToDate $ do + now <- liftIO getCurrentTime + foundation <- getYesod + State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl + case jobCtl of + JobCtlQueue job -> lift $ queueDBJobCron job + other -> runReaderT ?? foundation $ writeJobCtl other + case nextMatch of MatchAsap -> doJob MatchNone -> return () @@ -305,6 +323,7 @@ execCrontab = do logFunc <- askLoggerIO whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') doJob + where acc :: NominalDiffTime acc = 1e-3 @@ -335,7 +354,7 @@ execCrontab = do t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool - waitUntil crontabTV crontab nextTime = runResourceT $ do + waitUntil crontabTV crontab nextTime = do diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc waitTime' diff --git a/src/Utils/Failover.hs b/src/Utils/Failover.hs index db666a5d1..6eed49d4b 100644 --- a/src/Utils/Failover.hs +++ b/src/Utils/Failover.hs @@ -5,6 +5,7 @@ module Utils.Failover , FailoverException(..) , withFailover, withFailoverReTest , FailoverMetrics, registerFailoverMetrics + , mapFailover ) where import ClassyPrelude hiding (try, Vector, finally, onException) @@ -39,6 +40,8 @@ import qualified Data.Foldable as F import Data.Unique +import Utils (foldMapM) + data FailoverItem a = FailoverItem { failoverValue :: a @@ -81,6 +84,14 @@ mkFailoverLabeled opts = fmap Failover . liftIO $ newTVarIO opts' where opts' = flip map opts $ \(failoverLabel, failoverValue) -> FailoverItem{ failoverLastTest = Nothing, failoverReferences = Set.empty, .. } +mapFailover :: ( MonadIO m, Monoid b) + => (a -> m b) + -> Failover a + -> m b +mapFailover f Failover{..} = do + as <- toListOf (folded . _failoverValue) <$> readTVarIO failover + foldMapM f as + withFailoverReference :: (MonadIO m, MonadMask m) => Failover a -> (Unique -> m b) diff --git a/test/Handler/CorrectionsSpec.hs b/test/Handler/CorrectionsSpec.hs index a26d1c1bd..f78a1fa2f 100644 --- a/test/Handler/CorrectionsSpec.hs +++ b/test/Handler/CorrectionsSpec.hs @@ -6,6 +6,6 @@ import ModelSpec () spec :: Spec -spec = withApp $ do - describe "CorrectionsR" $ do +spec = withApp $ + xdescribe "CorrectionsR" $ return () diff --git a/test/TestImport.hs b/test/TestImport.hs index 2e897248f..f2832a9ac 100644 --- a/test/TestImport.hs +++ b/test/TestImport.hs @@ -83,15 +83,15 @@ runHandler handler = do withApp :: YSpec UniWorX -> Spec withApp = around $ \act -> runResourceT $ do - settings <- liftIO $ loadYamlSettings - ["config/test-settings.yml", "config/settings.yml"] - [] - useEnv - foundation <- makeFoundation settings - wipeDB foundation - runAppLoggingT foundation $ handleJobs foundation - logWare <- makeMiddleware foundation - lift $ act (foundation, logWare) `finally` shutdownApp foundation + settings <- liftIO $ loadYamlSettings + ["config/test-settings.yml", "config/settings.yml"] + [] + useEnv + foundation <- makeFoundation settings + wipeDB foundation + runAppLoggingT foundation $ handleJobs foundation + logWare <- makeMiddleware foundation + lift $ act (foundation, logWare) `finally` shutdownApp foundation -- This function will truncate all of the tables in your database. -- 'withApp' calls it before each test, creating a clean environment for each