fix: shutdown behaviour & tests
This commit is contained in:
parent
975ebc65c1
commit
19b8b0616f
@ -587,9 +587,11 @@ shutdownApp :: (MonadIO m, MonadUnliftIO m) => UniWorX -> m ()
|
||||
shutdownApp app = do
|
||||
stopJobCtl app
|
||||
liftIO $ do
|
||||
for_ (appWidgetMemcached app) Memcached.close
|
||||
for_ (appSmtpPool app) destroyAllResources
|
||||
destroyAllResources $ appConnPool app
|
||||
for_ (appSmtpPool app) destroyAllResources
|
||||
for_ (appLdapPool app) . mapFailover $ views _2 destroyAllResources
|
||||
for_ (appWidgetMemcached app) Memcached.close
|
||||
for_ (appMemcached app) $ views _2 Memcached.close
|
||||
release . fst $ appLogger app
|
||||
|
||||
liftIO $ threadDelay 1e6
|
||||
|
||||
@ -73,7 +73,7 @@ import Data.Ratio as Import ((%))
|
||||
|
||||
import Net.IP as Import (IP)
|
||||
|
||||
import Database.Persist.Sql as Import (SqlReadT, SqlWriteT, IsSqlBackend, fromSqlKey, toSqlKey)
|
||||
import Database.Persist.Sql as Import (SqlReadBackend, SqlReadT, SqlWriteT, IsSqlBackend, fromSqlKey, toSqlKey)
|
||||
|
||||
import Ldap.Client.Pool as Import
|
||||
|
||||
|
||||
115
src/Jobs.hs
115
src/Jobs.hs
@ -44,7 +44,7 @@ import Data.Time.Zones
|
||||
import Control.Concurrent.STM (retry)
|
||||
import Control.Concurrent.STM.Delay
|
||||
|
||||
import UnliftIO.Concurrent (forkIO)
|
||||
import UnliftIO.Concurrent (forkIO, myThreadId)
|
||||
|
||||
|
||||
import Jobs.Handler.SendNotification
|
||||
@ -90,9 +90,9 @@ handleJobs :: ( MonadResource m
|
||||
handleJobs foundation@UniWorX{..}
|
||||
| foundation ^. _appJobWorkers == 0 = return ()
|
||||
| otherwise = do
|
||||
jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> manageJobPool foundation unmask
|
||||
jobPoolManager <- allocateLinkedAsyncWithUnmask $ manageJobPool foundation
|
||||
|
||||
jobCron <- allocateLinkedAsync $ manageCrontab foundation
|
||||
jobCron <- allocateLinkedAsyncWithUnmask $ manageCrontab foundation
|
||||
|
||||
let jobWorkers = Map.empty
|
||||
jobWorkerName = const $ error "Unknown worker"
|
||||
@ -105,18 +105,29 @@ handleJobs foundation@UniWorX{..}
|
||||
}
|
||||
|
||||
manageCrontab :: forall m.
|
||||
MonadResource m
|
||||
=> UniWorX -> m ()
|
||||
manageCrontab foundation@UniWorX{..} = do
|
||||
context <- atomically . fmap jobContext $ readTMVar appJobState
|
||||
let awaitTermination = atomically $ do
|
||||
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
guard shouldTerminate
|
||||
liftIO . race_ awaitTermination . unsafeHandler foundation . void $ do
|
||||
atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
|
||||
runReaderT ?? foundation $
|
||||
writeJobCtlBlock JobCtlDetermineCrontab
|
||||
evalRWST (forever execCrontab) context HashMap.empty
|
||||
( MonadResource m
|
||||
, MonadUnliftIO m
|
||||
)
|
||||
=> UniWorX -> (forall a. m a -> m a) -> m ()
|
||||
manageCrontab foundation@UniWorX{..} unmask = do
|
||||
ch <- allocateLinkedAsync $ do
|
||||
context <- atomically . fmap jobContext $ readTMVar appJobState
|
||||
liftIO . unsafeHandler foundation . void $ do
|
||||
atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
|
||||
runReaderT ?? foundation $
|
||||
writeJobCtlBlock JobCtlDetermineCrontab
|
||||
void $ evalRWST (forever execCrontab) context HashMap.empty
|
||||
|
||||
let awaitTermination = guardM $
|
||||
readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
|
||||
handleAny (throwTo $ asyncThreadId ch) . unmask $ do
|
||||
doCancel <- atomically $ asum
|
||||
[ False <$ waitSTM ch
|
||||
, True <$ awaitTermination
|
||||
]
|
||||
when doCancel $
|
||||
cancel ch
|
||||
|
||||
manageJobPool :: forall m.
|
||||
( MonadResource m
|
||||
@ -125,18 +136,25 @@ manageJobPool :: forall m.
|
||||
, MonadMask m
|
||||
)
|
||||
=> UniWorX -> (forall a. m a -> m a) -> m ()
|
||||
manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
flip runContT return . forever . join . atomically $ asum
|
||||
[ spawnMissingWorkers
|
||||
, reapDeadWorkers
|
||||
, terminateGracefully
|
||||
]
|
||||
manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
|
||||
flip runContT return . callCC $ \terminate' ->
|
||||
forever . join . lift . routeExc . atomically $ asum
|
||||
[ spawnMissingWorkers
|
||||
, reapDeadWorkers
|
||||
, terminateGracefully terminate'
|
||||
]
|
||||
where
|
||||
shutdownOnException :: m a -> m a
|
||||
shutdownOnException :: ((forall m'. Monad m' => m (m' ()) -> m (m' ())) -> m a) -> m a
|
||||
shutdownOnException act = do
|
||||
actAsync <- allocateLinkedAsyncMasked act
|
||||
me <- myThreadId
|
||||
let
|
||||
routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ())
|
||||
routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask'
|
||||
|
||||
actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask')
|
||||
|
||||
let handleExc e = do
|
||||
$logWarnS "manageJobPool" [st|Received exception: #{displayException e}|]
|
||||
atomically $ do
|
||||
jState <- tryReadTMVar appJobState
|
||||
for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown ()
|
||||
@ -149,7 +167,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
num :: Int
|
||||
num = fromIntegral $ foundation ^. _appJobWorkers
|
||||
|
||||
spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ())
|
||||
spawnMissingWorkers, reapDeadWorkers :: STM (ContT () m ())
|
||||
spawnMissingWorkers = do
|
||||
shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
guard $ not shouldTerminate'
|
||||
@ -178,8 +196,8 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
|
||||
$logInfoS logIdent "Started"
|
||||
runConduit $ streamChan .| handleJobs' workerId
|
||||
$logInfoS logIdent "Stopped"
|
||||
worker <- lift . lift . allocateLinkedAsync $ liftIO runWorker
|
||||
$logInfoS logIdent "Stopping"
|
||||
worker <- lift . lift . allocateAsync $ liftIO runWorker
|
||||
|
||||
tell . Endo $ \cSt -> cSt
|
||||
{ jobWorkers = Map.insert worker chan $ jobWorkers cSt
|
||||
@ -212,14 +230,14 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
go
|
||||
in go
|
||||
|
||||
terminateGracefully = do
|
||||
terminateGracefully terminate = do
|
||||
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
guard shouldTerminate
|
||||
|
||||
oldState <- takeTMVar appJobState
|
||||
guard $ 0 == Map.size (jobWorkers oldState)
|
||||
|
||||
return . callCC $ \terminate -> do
|
||||
return $ do
|
||||
$logInfoS "JobPoolManager" "Shutting down"
|
||||
terminate ()
|
||||
|
||||
@ -255,7 +273,7 @@ execCrontab = do
|
||||
| otherwise = return ()
|
||||
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
|
||||
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
|
||||
mapRWST (liftHandler . runDB . setSerializable) $ mergeState
|
||||
mapRWST (liftHandler . runDB . setSerializable) mergeState
|
||||
|
||||
refT <- liftIO getCurrentTime
|
||||
settings <- getsYesod appSettings'
|
||||
@ -275,26 +293,26 @@ execCrontab = do
|
||||
$logDebugS "Crontab" . intercalate "\n" . map tshow . sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
|
||||
|
||||
let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do
|
||||
newCrontab <- lift . hoist lift $ determineCrontab'
|
||||
when (newCrontab /= currentCrontab) $
|
||||
mapRWST (liftIO . atomically) $
|
||||
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
|
||||
newCrontab <- lift $ hoist lift determineCrontab'
|
||||
when (newCrontab /= currentCrontab) $
|
||||
mapRWST (liftIO . atomically) $
|
||||
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
|
||||
|
||||
mergeState
|
||||
newState <- State.get
|
||||
|
||||
let upToDate = and
|
||||
[ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
|
||||
, ((==) `on` HashMap.lookup jobCtl) newState currentState
|
||||
]
|
||||
when upToDate $ do
|
||||
now <- liftIO $ getCurrentTime
|
||||
foundation <- getYesod
|
||||
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
|
||||
case jobCtl of
|
||||
JobCtlQueue job -> lift $ queueDBJobCron job
|
||||
other -> runReaderT ?? foundation $ writeJobCtl other
|
||||
mergeState
|
||||
newState <- State.get
|
||||
|
||||
let upToDate = and
|
||||
[ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
|
||||
, ((==) `on` HashMap.lookup jobCtl) newState currentState
|
||||
]
|
||||
when upToDate $ do
|
||||
now <- liftIO getCurrentTime
|
||||
foundation <- getYesod
|
||||
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
|
||||
case jobCtl of
|
||||
JobCtlQueue job -> lift $ queueDBJobCron job
|
||||
other -> runReaderT ?? foundation $ writeJobCtl other
|
||||
|
||||
case nextMatch of
|
||||
MatchAsap -> doJob
|
||||
MatchNone -> return ()
|
||||
@ -305,6 +323,7 @@ execCrontab = do
|
||||
logFunc <- askLoggerIO
|
||||
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
|
||||
doJob
|
||||
|
||||
where
|
||||
acc :: NominalDiffTime
|
||||
acc = 1e-3
|
||||
@ -335,7 +354,7 @@ execCrontab = do
|
||||
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
|
||||
|
||||
waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
|
||||
waitUntil crontabTV crontab nextTime = runResourceT $ do
|
||||
waitUntil crontabTV crontab nextTime = do
|
||||
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
|
||||
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
|
||||
waitTime'
|
||||
|
||||
@ -5,6 +5,7 @@ module Utils.Failover
|
||||
, FailoverException(..)
|
||||
, withFailover, withFailoverReTest
|
||||
, FailoverMetrics, registerFailoverMetrics
|
||||
, mapFailover
|
||||
) where
|
||||
|
||||
import ClassyPrelude hiding (try, Vector, finally, onException)
|
||||
@ -39,6 +40,8 @@ import qualified Data.Foldable as F
|
||||
|
||||
import Data.Unique
|
||||
|
||||
import Utils (foldMapM)
|
||||
|
||||
|
||||
data FailoverItem a = FailoverItem
|
||||
{ failoverValue :: a
|
||||
@ -81,6 +84,14 @@ mkFailoverLabeled opts = fmap Failover . liftIO $ newTVarIO opts'
|
||||
where opts' = flip map opts $ \(failoverLabel, failoverValue) -> FailoverItem{ failoverLastTest = Nothing, failoverReferences = Set.empty, .. }
|
||||
|
||||
|
||||
mapFailover :: ( MonadIO m, Monoid b)
|
||||
=> (a -> m b)
|
||||
-> Failover a
|
||||
-> m b
|
||||
mapFailover f Failover{..} = do
|
||||
as <- toListOf (folded . _failoverValue) <$> readTVarIO failover
|
||||
foldMapM f as
|
||||
|
||||
withFailoverReference :: (MonadIO m, MonadMask m)
|
||||
=> Failover a
|
||||
-> (Unique -> m b)
|
||||
|
||||
@ -6,6 +6,6 @@ import ModelSpec ()
|
||||
|
||||
|
||||
spec :: Spec
|
||||
spec = withApp $ do
|
||||
describe "CorrectionsR" $ do
|
||||
spec = withApp $
|
||||
xdescribe "CorrectionsR" $
|
||||
return ()
|
||||
|
||||
@ -83,15 +83,15 @@ runHandler handler = do
|
||||
|
||||
withApp :: YSpec UniWorX -> Spec
|
||||
withApp = around $ \act -> runResourceT $ do
|
||||
settings <- liftIO $ loadYamlSettings
|
||||
["config/test-settings.yml", "config/settings.yml"]
|
||||
[]
|
||||
useEnv
|
||||
foundation <- makeFoundation settings
|
||||
wipeDB foundation
|
||||
runAppLoggingT foundation $ handleJobs foundation
|
||||
logWare <- makeMiddleware foundation
|
||||
lift $ act (foundation, logWare) `finally` shutdownApp foundation
|
||||
settings <- liftIO $ loadYamlSettings
|
||||
["config/test-settings.yml", "config/settings.yml"]
|
||||
[]
|
||||
useEnv
|
||||
foundation <- makeFoundation settings
|
||||
wipeDB foundation
|
||||
runAppLoggingT foundation $ handleJobs foundation
|
||||
logWare <- makeMiddleware foundation
|
||||
lift $ act (foundation, logWare) `finally` shutdownApp foundation
|
||||
|
||||
-- This function will truncate all of the tables in your database.
|
||||
-- 'withApp' calls it before each test, creating a clean environment for each
|
||||
|
||||
Loading…
Reference in New Issue
Block a user