diff --git a/config/settings.yml b/config/settings.yml
index edd971e64..d35732623 100644
--- a/config/settings.yml
+++ b/config/settings.yml
@@ -36,8 +36,10 @@ health-check-interval:
ldap-admins: "_env:HEALTHCHECK_INTERVAL_LDAP_ADMINS:600"
smtp-connect: "_env:HEALTHCHECK_INTERVAL_SMTP_CONNECT:600"
widget-memcached: "_env:HEALTHCHECK_INTERVAL_WIDGET_MEMCACHED:600"
+ active-job-executors: "_env:HEALTHCHECK_INTERVAL_ACTIVE_JOB_EXECUTORS:60"
health-check-delay-notify: "_env:HEALTHCHECK_DELAY_NOTIFY:true"
health-check-http: "_env:HEALTHCHECK_HTTP:true" # Can we assume, that we can reach ourselves under APPROOT via HTTP (reverse proxies or firewalls might prevent this)?
+health-check-active-job-executors-timeout: "_env:HEALTHCHECK_ACTIVE_JOB_EXECUTORS_TIMEOUT:5"
log-settings:
detailed: "_env:DETAILED_LOGGING:false"
diff --git a/messages/uniworx/de.msg b/messages/uniworx/de.msg
index 99ed87ddf..563aede8a 100644
--- a/messages/uniworx/de.msg
+++ b/messages/uniworx/de.msg
@@ -1061,6 +1061,7 @@ HealthHTTPReachable: Cluster kann an der erwarteten URL über HTTP erreicht werd
HealthLDAPAdmins: Anteil der Administratoren, die im LDAP-Verzeichnis gefunden werden können
HealthSMTPConnect: SMTP-Server kann erreicht werden
HealthWidgetMemcached: Memcached-Server liefert Widgets korrekt aus
+HealthActiveJobExecutors: Anteil der job-workers, die neue Befehle annehmen
CourseParticipants n@Int: Derzeit #{n} angemeldete Kursteilnehmer
CourseParticipantsInvited n@Int: #{n} #{pluralDE n "Einladung" "Einladungen"} per E-Mail verschickt
diff --git a/src/Application.hs b/src/Application.hs
index 3e20e6613..7291fda1c 100644
--- a/src/Application.hs
+++ b/src/Application.hs
@@ -38,8 +38,6 @@ import System.Log.FastLogger ( defaultBufSize, newStderrLoggerSet
import Handler.Utils (runAppLoggingT)
-import qualified Data.Map.Strict as Map
-
import Foreign.Store
import qualified Data.UUID as UUID
@@ -158,8 +156,7 @@ makeFoundation appSettings'@AppSettings{..} = do
appInstanceID <- liftIO $ maybe UUID.nextRandom (either readInstanceIDFile return) appInitialInstanceID
- appJobCtl <- liftIO $ newTVarIO Map.empty
- appCronThread <- liftIO newEmptyTMVarIO
+ appJobState <- liftIO newEmptyTMVarIO
appHealthReport <- liftIO $ newTVarIO Set.empty
-- We need a log function to create a connection pool. We need a connection
@@ -371,7 +368,7 @@ develMain = runResourceT $ do
wsettings <- liftIO . getDevSettings $ warpSettings foundation
app <- makeApplication foundation
- handleJobs foundation
+ runAppLoggingT foundation $ handleJobs foundation
liftIO . develMainHelper $ return (wsettings, app)
-- | The @main@ function for an executable running this site.
@@ -471,7 +468,7 @@ getApplicationRepl :: (MonadResource m, MonadBaseControl IO m) => m (Int, UniWor
getApplicationRepl = do
settings <- getAppDevSettings
foundation <- makeFoundation settings
- handleJobs foundation
+ runAppLoggingT foundation $ handleJobs foundation
wsettings <- liftIO . getDevSettings $ warpSettings foundation
app1 <- makeApplication foundation
@@ -481,7 +478,7 @@ getApplicationRepl = do
return (getPort wsettings, foundation, app1)
-shutdownApp :: MonadIO m => UniWorX -> m ()
+shutdownApp :: (MonadIO m, MonadBaseControl IO m) => UniWorX -> m ()
shutdownApp app = do
stopJobCtl app
liftIO $ do
diff --git a/src/Control/Concurrent/Async/Lifted/Safe/Utils.hs b/src/Control/Concurrent/Async/Lifted/Safe/Utils.hs
new file mode 100644
index 000000000..f7f395b64
--- /dev/null
+++ b/src/Control/Concurrent/Async/Lifted/Safe/Utils.hs
@@ -0,0 +1,15 @@
+module Control.Concurrent.Async.Lifted.Safe.Utils
+ ( allocateLinkedAsync
+ ) where
+
+import ClassyPrelude hiding (cancel)
+
+import Control.Concurrent.Async.Lifted.Safe
+
+import Control.Monad.Trans.Resource
+
+
+allocateLinkedAsync :: forall m a.
+ MonadResource m
+ => IO a -> m (Async a)
+allocateLinkedAsync act = allocate (async act) cancel >>= (\(_k, a) -> a <$ link a)
diff --git a/src/Foundation.hs b/src/Foundation.hs
index aa4cafd6a..3233cda52 100644
--- a/src/Foundation.hs
+++ b/src/Foundation.hs
@@ -110,8 +110,7 @@ data UniWorX = UniWorX
, appCryptoIDKey :: CryptoIDKey
, appClusterID :: ClusterId
, appInstanceID :: InstanceId
- , appJobCtl :: TVar (Map ThreadId (TMChan JobCtl))
- , appCronThread :: TMVar (ReleaseKey, ThreadId)
+ , appJobState :: TMVar JobState
, appSessionKey :: ClientSession.Key
, appSecretBoxKey :: SecretBox.Key
, appJSONWebKeySet :: Jose.JwkSet
diff --git a/src/Handler/Health.hs b/src/Handler/Health.hs
index 7b29e2bbd..36649a436 100644
--- a/src/Handler/Health.hs
+++ b/src/Handler/Health.hs
@@ -70,6 +70,9 @@ getHealthR = do
$of HealthWidgetMemcached (Just passed)
_{MsgHealthWidgetMemcached}
#{boolSymbol passed}
+ $of HealthActiveJobExecutors (Just active)
+ _{MsgHealthActiveJobExecutors}
+ #{textPercent active 1}
$of _
|]
provideJson healthReports
diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs
index 41d12dcff..e2805fc6e 100644
--- a/src/Import/NoModel.hs
+++ b/src/Import/NoModel.hs
@@ -45,7 +45,7 @@ import Data.Hashable as Import
import Data.List.NonEmpty as Import (NonEmpty(..), nonEmpty)
import Data.Text.Encoding.Error as Import(UnicodeException(..))
import Data.Semigroup as Import (Semigroup)
-import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..))
+import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..), Endo(..))
import Data.Binary as Import (Binary)
import Numeric.Natural as Import (Natural)
diff --git a/src/Jobs.hs b/src/Jobs.hs
index 867718bab..4769178ff 100644
--- a/src/Jobs.hs
+++ b/src/Jobs.hs
@@ -7,14 +7,12 @@ module Jobs
import Import
import Utils.Lens
-import Handler.Utils
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Types (JobCtl(JobCtlQueue))
import Jobs.Queue
import Jobs.Crontab
-import Data.Conduit.TMChan
import qualified Data.Conduit.List as C
import qualified Data.Text.Lazy as LT
@@ -28,7 +26,7 @@ import Data.Semigroup (Max(..))
import Utils.Sql
-import Control.Monad.Random (evalRand, mkStdGen, getRandomR)
+import Control.Monad.Random (evalRand, mkStdGen, getRandomR, uniformMay)
import Cron
import qualified Data.HashMap.Strict as HashMap
@@ -38,20 +36,26 @@ import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Map.Strict as Map
+import Data.Map.Strict ((!))
import Data.Foldable (foldrM)
import Control.Monad.Trans.Reader (mapReaderT)
-import Control.Monad.Trans.State (evalStateT, mapStateT)
+import Control.Monad.Trans.Writer (execWriterT)
+import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST)
import qualified Control.Monad.State.Class as State
+import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Reader.Class (MonadReader(..))
-import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT, allocate, release)
+import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT)
import Control.Monad.Trans.Maybe (MaybeT(..))
+import Control.Monad.Trans.Cont (ContT(..), callCC)
+import Control.Monad.Random.Lazy (evalRandTIO, mapRandT)
import Control.Monad.Logger
import Data.Time.Zones
import Control.Concurrent.STM (retry)
+import Control.Concurrent.STM.Delay
import Jobs.Handler.SendNotification
@@ -75,198 +79,267 @@ instance Exception JobQueueException
handleJobs :: ( MonadResource m
- , MonadIO m
+ , MonadLoggerIO m
)
=> UniWorX -> m ()
-- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
-handleJobs foundation@UniWorX{..} = do
- let num = foundation ^. _appJobWorkers
+handleJobs foundation@UniWorX{..}
+ | foundation ^. _appJobWorkers == 0 = return ()
+ | otherwise = do
+ logger <- askLoggerIO
+ let runInIO = flip runLoggingT logger . runResourceT
- jobCrontab <- liftIO $ newTMVarIO HashMap.empty
- jobConfirm <- liftIO $ newTVarIO HashMap.empty
+ jobPoolManager <- allocateLinkedAsync . runInIO $ manageJobPool foundation
- forM_ [1..num] $ \n -> do
- (bChan, chan) <- atomically $ newBroadcastTMChan >>= (\c -> (c, ) <$> dupTMChan c)
- let
- logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
- logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
- removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId
- doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n
- (_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan)
- atomically . modifyTVar' appJobCtl $ Map.insert tId bChan
+ jobCron <- allocateLinkedAsync . runInIO $ manageCrontab foundation
- -- Start cron operation
- when (num > 0) $ do
- registeredCron <- liftIO newEmptyTMVarIO
- let execCrontab' = whenM (atomically $ readTMVar registeredCron) $
- runReaderT (execCrontab foundation) JobContext{..}
- unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread
- cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab)
- registeredCron' <- atomically $ do
- registeredCron' <- tryPutTMVar appCronThread cData
- registeredCron' <$ putTMVar registeredCron registeredCron'
- when registeredCron' $
- liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $
- writeJobCtlBlock JobCtlDetermineCrontab
+ let jobWorkers = Map.empty
+ jobWorkerName = const $ error "Unknown worker"
+ jobCrontab <- liftIO $ newTVarIO HashMap.empty
+ jobConfirm <- liftIO $ newTVarIO HashMap.empty
+ jobShutdown <- liftIO newEmptyTMVarIO
+ atomically $ putTMVar appJobState JobState
+ { jobContext = JobContext{..}
+ , ..
+ }
-stopJobCtl :: MonadIO m => UniWorX -> m ()
+manageJobPool, manageCrontab :: forall m.
+ ( MonadResource m
+ , MonadLogger m
+ )
+ => UniWorX -> m ()
+manageCrontab foundation@UniWorX{..} = do
+ context <- atomically . fmap jobContext $ readTMVar appJobState
+ let awaitTermination = atomically $ do
+ shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
+ guard shouldTerminate
+ liftIO . race_ awaitTermination . unsafeHandler foundation . void $ do
+ atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
+ runReaderT ?? foundation $
+ writeJobCtlBlock JobCtlDetermineCrontab
+ evalRWST (forever execCrontab) context HashMap.empty
+
+
+manageJobPool foundation@UniWorX{..}
+ = flip runContT return . forever . join . atomically $ asum
+ [ spawnMissingWorkers
+ , reapDeadWorkers
+ , terminateGracefully
+ ]
+ where
+ num :: Int
+ num = fromIntegral $ foundation ^. _appJobWorkers
+
+ spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ())
+ spawnMissingWorkers = do
+ oldState <- takeTMVar appJobState
+ let missing = num - Map.size (jobWorkers oldState)
+ guard $ missing > 0
+ return $ do
+ $logDebugS "manageJobPool" [st|Spawning #{missing} workers|]
+ endo <- execWriterT . replicateM_ missing $ do
+ workerId <- newWorkerId
+ let logIdent = mkLogIdent workerId
+ (bChan, chan) <- atomically $ newBroadcastTChan >>= (\c -> (c, ) <$> dupTChan c)
+ let
+ streamChan = join . atomically $ do
+ shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
+ if
+ | shouldTerminate ->
+ return $ return ()
+ | otherwise -> do
+ nextVal <- readTChan chan
+ return $ yield nextVal >> streamChan
+ runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
+ $logInfoS logIdent "Started"
+ runConduit $ streamChan .| handleJobs' workerId
+ $logInfoS logIdent "Stopped"
+ worker <- allocateLinkedAsync runWorker
+
+ tell . Endo $ \cSt -> cSt
+ { jobWorkers = Map.insert worker bChan $ jobWorkers cSt
+ , jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker
+ }
+ atomically . putTMVar appJobState $ endo `appEndo` oldState
+
+ reapDeadWorkers = do
+ oldState <- takeTMVar appJobState
+ deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a
+ putTMVar appJobState oldState
+ { jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers
+ }
+ guard . not $ Map.null deadWorkers
+ return . forM_ (Map.toList deadWorkers) $ \(jobAsync, result) -> do
+ case result of
+ Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|]
+ Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|]
+ void . lift . allocateLinkedAsync $
+ let go = do
+ next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do
+ nextVal <- MaybeT . lift . tryReadTChan $ jobWorkers oldState ! jobAsync
+ jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
+ receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
+ return (nextVal, receiver)
+ whenIsJust next $ \(nextVal, receiver) -> do
+ atomically $ writeTChan receiver nextVal
+ go
+ in go
+
+ terminateGracefully = do
+ shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
+ guard shouldTerminate
+ return . callCC $ \terminate -> do
+ $logInfoS "JobPoolManager" "Shutting down"
+ terminate ()
+
+stopJobCtl :: (MonadIO m, MonadBaseControl IO m) => UniWorX -> m ()
-- ^ Stop all worker threads currently running
-stopJobCtl UniWorX{appJobCtl, appCronThread} = do
- mcData <- atomically $ tryReadTMVar appCronThread
- whenIsJust mcData $ \(rKey, _) -> do
- liftIO $ release rKey
- atomically . guardM $ isEmptyTMVar appCronThread
-
- wMap <- liftIO $ readTVarIO appJobCtl
- atomically $ forM_ wMap closeTMChan
- atomically $ do
- wMap' <- readTVar appJobCtl
- guard . none (`Map.member` wMap') $ Map.keysSet wMap
+stopJobCtl UniWorX{appJobState} = do
+ didStop <- atomically $ do
+ jState <- tryReadTMVar appJobState
+ for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown ()
+ whenIsJust didStop $ \jSt' -> void . fork . atomically $ do
+ workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState
+ mapM_ (void . waitCatchSTM) $
+ [ jobPoolManager jSt'
+ , jobCron jSt'
+ ] ++ workers
-
-execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m ()
+execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWorX IO) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
-execCrontab foundation = evalStateT go HashMap.empty
- where
- go = do
- cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do
- mapStateT (liftHandlerT . runDB . setSerializable) $ do
- let
- merge (Entity leId CronLastExec{..})
- | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
- = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
- | otherwise = lift $ delete leId
- runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
+execCrontab = do
+ mapRWST (liftHandlerT . runDB . setSerializable) $ do
+ let
+ merge (Entity leId CronLastExec{..})
+ | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
+ = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
+ | otherwise = lift $ delete leId
+ runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
- refT <- liftIO getCurrentTime
- settings <- getsYesod appSettings'
- currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do
- crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab
- case crontab' of
- Nothing -> return Nothing
- Just crontab -> Just <$> do
- State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
- prevExec <- State.get
- case earliestJob settings prevExec crontab refT of
- Nothing -> liftBase retry
- Just (_, MatchNone) -> liftBase retry
- Just x -> return (crontab, x)
+ refT <- liftIO getCurrentTime
+ settings <- getsYesod appSettings'
+ (currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do
+ crontab <- liftBase . readTVar =<< asks jobCrontab
- case currentState of
- Nothing -> return False
- Just (currentCrontab, (jobCtl, nextMatch)) -> do
- let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do
- newCrontab <- lift . lift . hoist lift $ determineCrontab'
- if
- | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
- -> do
- now <- liftIO $ getCurrentTime
- instanceID' <- getsYesod appInstanceID
- State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
- case jobCtl of
- JobCtlQueue job -> do
- void . lift . lift $ upsertBy
- (UniqueCronLastExec $ toJSON job)
- CronLastExec
- { cronLastExecJob = toJSON job
- , cronLastExecTime = now
- , cronLastExecInstance = instanceID'
- }
- [ CronLastExecTime =. now ]
- lift . lift $ queueDBJob job
- other -> writeJobCtl other
- | otherwise
- -> lift . mapReaderT (liftIO . atomically) $
- lift . void . flip swapTMVar newCrontab =<< asks jobCrontab
+ State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
+ prevExec <- State.get
+ case earliestJob settings prevExec crontab refT of
+ Nothing -> liftBase retry
+ Just (_, MatchNone) -> liftBase retry
+ Just x -> return (crontab, x)
- case nextMatch of
- MatchAsap -> doJob
- MatchNone -> return ()
- MatchAt nextTime -> do
- JobContext{jobCrontab} <- ask
- nextTime' <- applyJitter jobCtl nextTime
- $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
- logFunc <- askLoggerIO
- whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
- doJob
-
- return True
- when cont go
- where
- acc :: NominalDiffTime
- acc = 1e-3
-
- debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
- debouncingAcc AppSettings{appNotificationRateLimit} = \case
- JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
- _ -> acc
-
- applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
- applyJitter seed t = do
- appInstance <- getsYesod appInstanceID
- let
- halfRange = truncate $ 0.5 / acc
- diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
- return $ addUTCTime diff t
-
- earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
- earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
- where
- go' (jobCtl, cron) mbPrev
- | Just (_, t') <- mbPrev
- , t' < t
- = mbPrev
- | otherwise
- = Just (jobCtl, t)
- where
- t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
-
- waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TMVar a -> a -> UTCTime -> m Bool
- waitUntil crontabTV crontab nextTime = runResourceT $ do
- diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
- let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
- waitTime'
- | diffT < acc = "Done"
- | otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
- $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
+ let doJob = mapRWST (liftHandlerT . runDBJobs . setSerializable) $ do
+ newCrontab <- lift . hoist lift $ determineCrontab'
if
- | diffT < acc -> return True
- | otherwise -> do
- retVar <- liftIO newEmptyTMVarIO
- void . liftIO . forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar
- let
- awaitDelayThread = False <$ takeTMVar retVar
- awaitCrontabChange = do
- crontab' <- tryReadTMVar crontabTV
- True <$ guard (Just crontab /= crontab')
- crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
- bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
+ | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
+ -> do
+ now <- liftIO $ getCurrentTime
+ foundation <- getYesod
+ let instanceID' = foundation ^. _appInstanceID
+ State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
+ case jobCtl of
+ JobCtlQueue job -> do
+ void . lift $ upsertBy
+ (UniqueCronLastExec $ toJSON job)
+ CronLastExec
+ { cronLastExecJob = toJSON job
+ , cronLastExecTime = now
+ , cronLastExecInstance = instanceID'
+ }
+ [ CronLastExecTime =. now ]
+ lift $ queueDBJob job
+ other -> runReaderT ?? foundation $ writeJobCtl other
+ | otherwise
+ -> mapRWST (liftIO . atomically) $
+ liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
+ case nextMatch of
+ MatchAsap -> doJob
+ MatchNone -> return ()
+ MatchAt nextTime -> do
+ JobContext{jobCrontab} <- ask
+ nextTime' <- applyJitter jobCtl nextTime
+ $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
+ logFunc <- askLoggerIO
+ whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
+ doJob
+ where
+ acc :: NominalDiffTime
+ acc = 1e-3
-handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) ()
-handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
+ debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
+ debouncingAcc AppSettings{appNotificationRateLimit} = \case
+ JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
+ _ -> acc
+
+ applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
+ applyJitter seed t = do
+ appInstance <- getsYesod appInstanceID
+ let
+ halfRange = truncate $ 0.5 / acc
+ diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
+ return $ addUTCTime diff t
+
+ earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
+ earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
+ where
+ go' (jobCtl, cron) mbPrev
+ | Just (_, t') <- mbPrev
+ , t' < t
+ = mbPrev
+ | otherwise
+ = Just (jobCtl, t)
+ where
+ t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
+
+ waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
+ waitUntil crontabTV crontab nextTime = runResourceT $ do
+ diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
+ let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
+ waitTime'
+ | diffT < acc = "Done"
+ | otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
+ $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
+ if
+ | diffT < acc -> return True
+ | otherwise -> do
+ delay <- liftIO . newDelay . round $ waitTime * 1e6
+ let
+ awaitDelayThread = False <$ waitDelay delay
+ awaitCrontabChange = do
+ crontab' <- readTVar crontabTV
+ True <$ guard (crontab /= crontab')
+ crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
+ bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
+
+mkLogIdent :: JobWorkerId -> Text
+mkLogIdent wId = "Job-Executor " <> showWorkerId wId
+
+handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) ()
+handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
- resVars <- mapReaderT (liftIO . atomically) $
- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
- res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl
- sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
+ res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
+ sentRes <- mapReaderT (liftIO . atomically) $ do
+ resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
+ lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
_other -> return ()
where
- logIdent = "Jobs #" <> tshow wNum
+ logIdent = mkLogIdent wNum
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
- handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
+ handleCmd JobCtlNoOp = return ()
+ handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of
@@ -285,7 +358,7 @@ handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab'
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
- lift . void . flip swapTMVar newCTab =<< asks jobCrontab
+ lift . void . flip swapTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport
newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind
diff --git a/src/Jobs/HealthReport.hs b/src/Jobs/HealthReport.hs
index bf65049f9..6aecd01f6 100644
--- a/src/Jobs/HealthReport.hs
+++ b/src/Jobs/HealthReport.hs
@@ -7,6 +7,7 @@ module Jobs.HealthReport
import Import
import Data.List (genericLength)
+import qualified Data.Map.Strict as Map
import qualified Data.Aeson as Aeson
import Data.Proxy (Proxy(..))
@@ -27,6 +28,12 @@ import qualified Data.CaseInsensitive as CI
import qualified Network.HaskellNet.SMTP as SMTP
import Data.Pool (withResource)
+import System.Timeout
+
+import Jobs.Queue
+
+import Control.Concurrent.Async.Lifted.Safe (forConcurrently)
+
generateHealthReport :: HealthCheck -> Handler HealthReport
generateHealthReport = $(dispatchTH ''HealthCheck)
@@ -135,3 +142,26 @@ dispatchHealthCheckWidgetMemcached = HealthWidgetMemcached <$> do
(== content) . responseBody <$> httpLBS httpRequest
_other -> return False
+
+dispatchHealthCheckActiveJobExecutors :: Handler HealthReport
+dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do
+ app <- getYesod
+ jState <- atomically . tryReadTMVar $ appJobState app
+ let configuredNumber = app ^. _appJobWorkers
+ timeoutLength = app ^. _appHealthCheckActiveJobExecutorsTimeout
+ case jState of
+ Nothing
+ | configuredNumber == 0 -> return Nothing
+ Nothing -> return $ Just 0
+ Just JobState{jobWorkers, jobWorkerName} -> do
+ tid <- liftIO myThreadId
+ let workers' = Map.fromSet jobWorkerName (Map.keysSet jobWorkers)
+ workers = Map.filterWithKey (\a _ -> asyncThreadId a /= tid) workers'
+ timeoutMicro = let (MkFixed micro :: Micro) = realToFrac timeoutLength
+ in fromInteger micro
+ $logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers'
+ responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName)
+ -> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp)
+ if
+ | Map.null workers -> return Nothing
+ | otherwise -> return . Just $ responders % fromIntegral (Map.size workers)
diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs
index 8152ffbfb..8b71c2960 100644
--- a/src/Jobs/Queue.hs
+++ b/src/Jobs/Queue.hs
@@ -1,5 +1,6 @@
module Jobs.Queue
( writeJobCtl, writeJobCtlBlock
+ , writeJobCtl', writeJobCtlBlock'
, queueJob, queueJob'
, YesodJobDB
, runDBJobs, queueDBJob, sinkDBJobs
@@ -9,12 +10,14 @@ module Jobs.Queue
import Import hiding ((<>))
import Utils.Sql
+import Utils.Lens
import Jobs.Types
import Control.Monad.Trans.Writer (WriterT, runWriterT)
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Trans.Reader (ReaderT, mapReaderT)
+import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.HashMap.Strict as HashMap
@@ -27,39 +30,54 @@ import Data.Semigroup ((<>))
data JobQueueException = JobQueuePoolEmpty
+ | JobQueueWorkerNotFound
deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic)
instance Exception JobQueueException
-writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
+writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m ()
+-- | Pass an instruction to the given `Job`-Worker
+writeJobCtl' target cmd = do
+ JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar
+ if
+ | null jobWorkers
+ -> throwM JobQueuePoolEmpty
+ | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
+ -> atomically $ writeTChan chan cmd
+ | otherwise
+ -> throwM JobQueueWorkerNotFound
+
+writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers
--
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
writeJobCtl cmd = do
+ names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar
tid <- liftIO myThreadId
- wMap <- getsYesod appJobCtl >>= liftIO . readTVarIO
- if
- | null wMap -> throwM JobQueuePoolEmpty
- | otherwise -> do
- let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap
- liftIO . atomically $ writeTMChan chan cmd
+ let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names
+ writeJobCtl' target cmd
-writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
--- | Pass an instruction to the `Job`-Workers and block until it was acted upon
-writeJobCtlBlock cmd = do
- getResVar <- asks jobConfirm
- resVar <- liftIO . atomically $ do
+
+writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
+-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon
+writeJobCtlBlock' writeCtl cmd = do
+ getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar
+ resVar <- atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
- lift $ writeJobCtl cmd
+ writeCtl cmd
let
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
- mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
+ mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
+writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
+-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
+writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
+
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do
now <- liftIO getCurrentTime
@@ -83,7 +101,9 @@ queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
-queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
+queueJob' job = do
+ app <- getYesod
+ queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
@@ -102,5 +122,6 @@ runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -
-- Jobs get immediately executed if the transaction succeeds
runDBJobs act = do
(ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act
- forM_ jIds $ writeJobCtl . JobCtlPerform
+ app <- getYesod
+ forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform
return ret
diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs
index 3522ff802..74fd7afe3 100644
--- a/src/Jobs/Types.hs
+++ b/src/Jobs/Types.hs
@@ -2,15 +2,24 @@ module Jobs.Types
( Job(..), Notification(..)
, JobCtl(..)
, JobContext(..)
+ , JobState(..)
+ , jobWorkerNames
+ , JobWorkerId
+ , showWorkerId, newWorkerId
) where
-import Import.NoFoundation
+import Import.NoFoundation hiding (Unique)
import Data.Aeson (defaultOptions, Options(..), SumEncoding(..))
import Data.Aeson.TH (deriveJSON)
import Data.List.NonEmpty (NonEmpty)
+import Data.Unique
+
+import qualified Data.Map.Strict as Map
+import qualified Data.Set as Set
+
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@@ -70,12 +79,35 @@ data JobCtl = JobCtlFlush
| JobCtlDetermineCrontab
| JobCtlQueue Job
| JobCtlGenerateHealthReport HealthCheck
+ | JobCtlNoOp
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl
+newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
+ deriving (Eq, Ord)
+
+showWorkerId :: JobWorkerId -> Text
+-- ^ Make a `JobWorkerId` somewhat human readable as a small-ish Number
+showWorkerId = tshow . hashUnique . jobWorkerUnique
+
+newWorkerId :: MonadIO m => m JobWorkerId
+newWorkerId = JobWorkerId <$> liftIO newUnique
+
data JobContext = JobContext
- { jobCrontab :: TMVar (Crontab JobCtl)
+ { jobCrontab :: TVar (Crontab JobCtl)
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
}
+
+data JobState = JobState
+ { jobWorkers :: Map (Async ()) (TChan JobCtl)
+ , jobWorkerName :: Async () -> JobWorkerId
+ , jobContext :: JobContext
+ , jobPoolManager :: Async ()
+ , jobCron :: Async ()
+ , jobShutdown :: TMVar ()
+ }
+
+jobWorkerNames :: JobState -> Set JobWorkerId
+jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers
diff --git a/src/Model/Types/Health.hs b/src/Model/Types/Health.hs
index aea99d735..ce0f53e23 100644
--- a/src/Model/Types/Health.hs
+++ b/src/Model/Types/Health.hs
@@ -15,6 +15,7 @@ data HealthCheck
| HealthCheckLDAPAdmins
| HealthCheckSMTPConnect
| HealthCheckWidgetMemcached
+ | HealthCheckActiveJobExecutors
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
instance Universe HealthCheck
instance Finite HealthCheck
@@ -39,6 +40,8 @@ data HealthReport
-- ^ Can we connect to the SMTP server and say @NOOP@?
| HealthWidgetMemcached { healthWidgetMemcached :: Maybe Bool }
-- ^ Can we store values in memcached and retrieve them via HTTP?
+ | HealthActiveJobExecutors { healthActiveJobExecutors :: Maybe Rational }
+ -- ^ Proportion of job executors (excluding the one running the healthcheck) responding within a timeout
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
instance NFData HealthReport
@@ -57,6 +60,7 @@ classifyHealthReport HealthLDAPAdmins{} = HealthCheckLDAPAdmins
classifyHealthReport HealthHTTPReachable{} = HealthCheckHTTPReachable
classifyHealthReport HealthSMTPConnect{} = HealthCheckSMTPConnect
classifyHealthReport HealthWidgetMemcached{} = HealthCheckWidgetMemcached
+classifyHealthReport HealthActiveJobExecutors{} = HealthCheckActiveJobExecutors
-- | `HealthReport` classified (`classifyHealthReport`) by badness
--
@@ -84,4 +88,6 @@ healthReportStatus = \case
| prop <= 0 -> HealthFailure
HealthSMTPConnect (Just False) -> HealthFailure
HealthWidgetMemcached (Just False) -> HealthFailure -- TODO: investigate this failure mode; do we just handle it gracefully?
+ HealthActiveJobExecutors (Just prop )
+ | prop < 1 -> HealthFailure
_other -> maxBound -- Minimum badness
diff --git a/src/Settings.hs b/src/Settings.hs
index c53e90269..191e1ca1d 100644
--- a/src/Settings.hs
+++ b/src/Settings.hs
@@ -118,6 +118,7 @@ data AppSettings = AppSettings
, appHealthCheckInterval :: HealthCheck -> Maybe NominalDiffTime
, appHealthCheckDelayNotify :: Bool
, appHealthCheckHTTP :: Bool
+ , appHealthCheckActiveJobExecutorsTimeout :: NominalDiffTime
, appInitialLogSettings :: LogSettings
@@ -389,6 +390,7 @@ instance FromJSON AppSettings where
appHealthCheckInterval <- (assertM' (> 0) . ) <$> o .: "health-check-interval"
appHealthCheckDelayNotify <- o .: "health-check-delay-notify"
appHealthCheckHTTP <- o .: "health-check-http"
+ appHealthCheckActiveJobExecutorsTimeout <- o .: "health-check-active-job-executors-timeout"
appSessionTimeout <- o .: "session-timeout"
diff --git a/src/Utils.hs b/src/Utils.hs
index 7fbe88857..1792d9af8 100644
--- a/src/Utils.hs
+++ b/src/Utils.hs
@@ -26,6 +26,7 @@ import Utils.Route as Utils
import Utils.Message as Utils
import Utils.Lang as Utils
import Utils.Parameters as Utils
+import Control.Concurrent.Async.Lifted.Safe.Utils as Utils
import Text.Blaze (Markup, ToMarkup)
diff --git a/test/TestImport.hs b/test/TestImport.hs
index 9164c3144..48e0b5d27 100644
--- a/test/TestImport.hs
+++ b/test/TestImport.hs
@@ -3,7 +3,7 @@ module TestImport
, module X
) where
-import Application (makeFoundation, makeLogWare)
+import Application (makeFoundation, makeLogWare, shutdownApp)
import ClassyPrelude as X hiding (delete, deleteBy, Handler, Index, (<.>), (<|), index, uncons, unsnoc, cons, snoc)
import Database.Persist as X hiding (get)
import Database.Persist.Sql as X (SqlPersistM)
@@ -31,7 +31,7 @@ import Test.QuickCheck.Classes.Binary as X
import Data.Proxy as X
import Data.UUID as X (UUID)
import System.IO as X (hPrint, hPutStrLn, stderr)
-import Jobs (handleJobs, stopJobCtl)
+import Jobs (handleJobs)
import Numeric.Natural as X
import Control.Lens as X hiding ((<.), elements)
@@ -42,7 +42,6 @@ import Database (truncateDb)
import Database as X (fillDb)
import Control.Monad.Trans.Resource (runResourceT, MonadResourceBase)
-import Data.Pool (destroyAllResources)
import Settings
@@ -51,6 +50,8 @@ import qualified Data.CaseInsensitive as CI
import Data.Typeable
+import Handler.Utils (runAppLoggingT)
+
runDB :: SqlPersistM a -> YesodExample UniWorX a
runDB query = do
@@ -74,13 +75,10 @@ withApp = around $ \act -> runResourceT $ do
[]
useEnv
foundation <- makeFoundation settings
- let
- stopDBAccess = do
- stopJobCtl foundation
- liftIO . destroyAllResources $ appConnPool foundation
- bracket_ stopDBAccess (handleJobs foundation) $ wipeDB foundation
+ wipeDB foundation
+ runAppLoggingT foundation $ handleJobs foundation
logWare <- makeLogWare foundation
- lift $ act (foundation, logWare)
+ lift $ act (foundation, logWare) `finally` shutdownApp foundation
-- This function will truncate all of the tables in your database.
-- 'withApp' calls it before each test, creating a clean environment for each