refactor(jobs): switch to linked asyncs

This commit is contained in:
Gregor Kleen 2019-07-24 08:21:31 +02:00
parent 30e22bae74
commit 20686f185b
8 changed files with 281 additions and 173 deletions

View File

@ -38,8 +38,6 @@ import System.Log.FastLogger ( defaultBufSize, newStderrLoggerSet
import Handler.Utils (runAppLoggingT)
import qualified Data.Map.Strict as Map
import Foreign.Store
import qualified Data.UUID as UUID
@ -158,8 +156,7 @@ makeFoundation appSettings'@AppSettings{..} = do
appInstanceID <- liftIO $ maybe UUID.nextRandom (either readInstanceIDFile return) appInitialInstanceID
appJobCtl <- liftIO $ newTVarIO Map.empty
appCronThread <- liftIO newEmptyTMVarIO
appJobState <- liftIO newEmptyTMVarIO
appHealthReport <- liftIO $ newTVarIO Set.empty
-- We need a log function to create a connection pool. We need a connection
@ -371,7 +368,7 @@ develMain = runResourceT $ do
wsettings <- liftIO . getDevSettings $ warpSettings foundation
app <- makeApplication foundation
handleJobs foundation
runAppLoggingT foundation $ handleJobs foundation
liftIO . develMainHelper $ return (wsettings, app)
-- | The @main@ function for an executable running this site.
@ -471,7 +468,7 @@ getApplicationRepl :: (MonadResource m, MonadBaseControl IO m) => m (Int, UniWor
getApplicationRepl = do
settings <- getAppDevSettings
foundation <- makeFoundation settings
handleJobs foundation
runAppLoggingT foundation $ handleJobs foundation
wsettings <- liftIO . getDevSettings $ warpSettings foundation
app1 <- makeApplication foundation

View File

@ -0,0 +1,15 @@
module Control.Concurrent.Async.Lifted.Safe.Utils
( allocateLinkedAsync
) where
import ClassyPrelude hiding (cancel)
import Control.Concurrent.Async.Lifted.Safe
import Control.Monad.Trans.Resource
allocateLinkedAsync :: forall m a.
MonadResource m
=> IO a -> m (Async a)
allocateLinkedAsync act = allocate (async act) cancel >>= (\(_k, a) -> a <$ link a)

View File

@ -110,8 +110,7 @@ data UniWorX = UniWorX
, appCryptoIDKey :: CryptoIDKey
, appClusterID :: ClusterId
, appInstanceID :: InstanceId
, appJobCtl :: TVar (Map ThreadId (TMChan JobCtl))
, appCronThread :: TMVar (ReleaseKey, ThreadId)
, appJobState :: TMVar JobState
, appSessionKey :: ClientSession.Key
, appSecretBoxKey :: SecretBox.Key
, appJSONWebKeySet :: Jose.JwkSet

View File

@ -44,7 +44,7 @@ import Data.Hashable as Import
import Data.List.NonEmpty as Import (NonEmpty(..), nonEmpty)
import Data.Text.Encoding.Error as Import(UnicodeException(..))
import Data.Semigroup as Import (Semigroup)
import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..))
import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), Sum(..), Endo(..))
import Data.Binary as Import (Binary)
import Numeric.Natural as Import (Natural)

View File

@ -7,14 +7,12 @@ module Jobs
import Import
import Utils.Lens
import Handler.Utils
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Types (JobCtl(JobCtlQueue))
import Jobs.Queue
import Jobs.Crontab
import Data.Conduit.TMChan
import qualified Data.Conduit.List as C
import qualified Data.Text.Lazy as LT
@ -28,7 +26,7 @@ import Data.Semigroup (Max(..))
import Utils.Sql
import Control.Monad.Random (evalRand, mkStdGen, getRandomR)
import Control.Monad.Random (evalRand, mkStdGen, getRandomR, uniformMay)
import Cron
import qualified Data.HashMap.Strict as HashMap
@ -38,20 +36,26 @@ import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Map.Strict as Map
import Data.Map.Strict ((!))
import Data.Foldable (foldrM)
import Control.Monad.Trans.Reader (mapReaderT)
import Control.Monad.Trans.State (evalStateT, mapStateT)
import Control.Monad.Trans.Writer (execWriterT)
import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST)
import qualified Control.Monad.State.Class as State
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT, allocate, release)
import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT)
import Control.Monad.Trans.Maybe (MaybeT(..))
import Control.Monad.Trans.Cont (ContT(..), callCC)
import Control.Monad.Random.Lazy (evalRandTIO, mapRandT)
import Control.Monad.Logger
import Data.Time.Zones
import Control.Concurrent.STM (retry)
import Control.Concurrent.STM.Delay
import Jobs.Handler.SendNotification
@ -75,191 +79,259 @@ instance Exception JobQueueException
handleJobs :: ( MonadResource m
, MonadIO m
, MonadLoggerIO m
)
=> UniWorX -> m ()
-- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs foundation@UniWorX{..} = do
let num = foundation ^. _appJobWorkers
handleJobs foundation@UniWorX{..}
| foundation ^. _appJobWorkers == 0 = return ()
| otherwise = do
logger <- askLoggerIO
let runInIO = flip runLoggingT logger . runResourceT
jobCrontab <- liftIO $ newTMVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
jobPoolManager <- allocateLinkedAsync . runInIO $ manageJobPool foundation
forM_ [1..num] $ \n -> do
(bChan, chan) <- atomically $ newBroadcastTMChan >>= (\c -> (c, ) <$> dupTMChan c)
let
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId
doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n
(_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan)
atomically . modifyTVar' appJobCtl $ Map.insert tId bChan
jobCron <- allocateLinkedAsync . runInIO $ manageCrontab foundation
-- Start cron operation
when (num > 0) $ do
registeredCron <- liftIO newEmptyTMVarIO
let execCrontab' = whenM (atomically $ readTMVar registeredCron) $
runReaderT (execCrontab foundation) JobContext{..}
unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread
cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab)
registeredCron' <- atomically $ do
registeredCron' <- tryPutTMVar appCronThread cData
registeredCron' <$ putTMVar registeredCron registeredCron'
when registeredCron' $
liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $
writeJobCtlBlock JobCtlDetermineCrontab
let jobWorkers = Map.empty
jobWorkerName = const $ error "Unknown worker"
jobCrontab <- liftIO $ newTVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
jobShutdown <- liftIO newEmptyTMVarIO
atomically $ putTMVar appJobState JobState
{ jobContext = JobContext{..}
, ..
}
manageJobPool, manageCrontab :: forall m.
( MonadResource m
, MonadLogger m
)
=> UniWorX -> m ()
manageCrontab foundation@UniWorX{..} = do
context <- atomically . fmap jobContext $ readTMVar appJobState
liftIO . unsafeHandler foundation . void $ do
runReaderT ?? context $
writeJobCtlBlock JobCtlDetermineCrontab
evalRWST execCrontab' context HashMap.empty
where
execCrontab' = do
shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
if
| shouldTerminate -> return ()
| otherwise -> execCrontab *> execCrontab'
manageJobPool foundation@UniWorX{..}
= flip runContT return . forever . join . atomically $ asum
[ spawnMissingWorkers
, reapDeadWorkers
, terminateGracefully
]
where
num :: Int
num = fromIntegral $ foundation ^. _appJobWorkers
spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ())
spawnMissingWorkers = do
oldState <- takeTMVar appJobState
let missing = num - Map.size (jobWorkers oldState)
guard $ missing > 0
return $ do
$logDebugS "manageJobPool" [st|Spawning #{missing} workers|]
endo <- execWriterT . replicateM_ missing $ do
workerId <- newWorkerId
let logIdent = mkLogIdent workerId
(bChan, chan) <- atomically $ newBroadcastTChan >>= (\c -> (c, ) <$> dupTChan c)
let
streamChan = join . atomically $ do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
if
| shouldTerminate ->
return $ return ()
| otherwise -> do
nextVal <- readTChan chan
return $ yield nextVal >> streamChan
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
$logInfoS logIdent "Started"
runConduit $ streamChan .| handleJobs' workerId
worker <- allocateLinkedAsync runWorker
tell . Endo $ \cSt -> cSt
{ jobWorkers = Map.insert worker bChan $ jobWorkers cSt
, jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker
}
atomically . putTMVar appJobState $ endo `appEndo` oldState
reapDeadWorkers = do
oldState <- takeTMVar appJobState
deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a
putTMVar appJobState oldState
{ jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers
}
guard . not $ Map.null deadWorkers
return . forM_ (Map.toList deadWorkers) $ \(jobAsync, result) -> do
case result of
Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|]
Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|]
void . lift . allocateLinkedAsync $
let go = do
next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do
nextVal <- MaybeT . lift . tryReadTChan $ jobWorkers oldState ! jobAsync
jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
return (nextVal, receiver)
whenIsJust next $ \(nextVal, receiver) -> do
atomically $ writeTChan receiver nextVal
go
in go
terminateGracefully = do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard shouldTerminate
return . callCC $ \terminate -> do
$logInfoS "JobPoolManager" "Shutting down"
terminate ()
stopJobCtl :: MonadIO m => UniWorX -> m ()
-- ^ Stop all worker threads currently running
stopJobCtl UniWorX{appJobCtl, appCronThread} = do
mcData <- atomically $ tryReadTMVar appCronThread
whenIsJust mcData $ \(rKey, _) -> do
liftIO $ release rKey
atomically . guardM $ isEmptyTMVar appCronThread
wMap <- liftIO $ readTVarIO appJobCtl
atomically $ forM_ wMap closeTMChan
stopJobCtl UniWorX{appJobState} = do
atomically $ do
wMap' <- readTVar appJobCtl
guard . none (`Map.member` wMap') $ Map.keysSet wMap
JobState{..} <- readTMVar appJobState
putTMVar jobShutdown ()
atomically $ do
JobState{..} <- takeTMVar appJobState
mapM_ (void . waitCatchSTM) $
[ jobPoolManager
, jobCron
] ++ Map.keys jobWorkers
execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m ()
execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWorX IO) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab foundation = evalStateT go HashMap.empty
where
go = do
cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do
mapStateT (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
execCrontab = do
mapRWST (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do
crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab
case crontab' of
Nothing -> return Nothing
Just crontab -> Just <$> do
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
(currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do
crontab <- liftBase . readTVar =<< asks jobCrontab
case currentState of
Nothing -> return False
Just (currentCrontab, (jobCtl, nextMatch)) -> do
let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do
newCrontab <- lift . lift . hoist lift $ determineCrontab'
if
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID' <- getsYesod appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
void . lift . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift . lift $ queueDBJob job
other -> writeJobCtl other
| otherwise
-> lift . mapReaderT (liftIO . atomically) $
lift . void . flip swapTMVar newCrontab =<< asks jobCrontab
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
return True
when cont go
where
acc :: NominalDiffTime
acc = 1e-3
debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
debouncingAcc AppSettings{appNotificationRateLimit} = \case
JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
_ -> acc
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = truncate $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
go' (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TMVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = runResourceT $ do
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
waitTime'
| diffT < acc = "Done"
| otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
$logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
let doJob = mapRWST (liftHandlerT . runDBJobs . setSerializable) $ do
newCrontab <- lift . hoist lift $ determineCrontab'
if
| diffT < acc -> return True
| otherwise -> do
retVar <- liftIO newEmptyTMVarIO
void . liftIO . forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar
let
awaitDelayThread = False <$ takeTMVar retVar
awaitCrontabChange = do
crontab' <- tryReadTMVar crontabTV
True <$ guard (Just crontab /= crontab')
crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID' <- getsYesod appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
void . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift $ queueDBJob job
other -> writeJobCtl other
| otherwise
-> mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
where
acc :: NominalDiffTime
acc = 1e-3
handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) ()
handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
debouncingAcc AppSettings{appNotificationRateLimit} = \case
JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
_ -> acc
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = truncate $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
go' (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = runResourceT $ do
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
waitTime'
| diffT < acc = "Done"
| otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
$logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
if
| diffT < acc -> return True
| otherwise -> do
delay <- liftIO . newDelay . round $ waitTime * 1e6
let
awaitDelayThread = False <$ waitDelay delay
awaitCrontabChange = do
crontab' <- readTVar crontabTV
True <$ guard (crontab /= crontab')
crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
mkLogIdent :: JobWorkerId -> Text
mkLogIdent wId = "Job-Executor " <> showWorkerId wId
handleJobs' :: JobWorkerId -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
resVars <- mapReaderT (liftIO . atomically) $
HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
_other -> return ()
where
logIdent = "Jobs #" <> tshow wNum
logIdent = mkLogIdent wNum
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
@ -285,7 +357,7 @@ handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab'
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . void . flip swapTMVar newCTab =<< asks jobCrontab
lift . void . flip swapTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport
newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind

View File

@ -39,12 +39,12 @@ writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
writeJobCtl cmd = do
tid <- liftIO myThreadId
wMap <- getsYesod appJobCtl >>= liftIO . readTVarIO
wMap <- fmap jobWorkers $ getsYesod appJobState >>= atomically . readTMVar
if
| null wMap -> throwM JobQueuePoolEmpty
| otherwise -> do
let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap
liftIO . atomically $ writeTMChan chan cmd
liftIO . atomically $ writeTChan chan cmd
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon

View File

@ -2,15 +2,20 @@ module Jobs.Types
( Job(..), Notification(..)
, JobCtl(..)
, JobContext(..)
, JobState(..)
, JobWorkerId
, showWorkerId, newWorkerId
) where
import Import.NoFoundation
import Import.NoFoundation hiding (Unique)
import Data.Aeson (defaultOptions, Options(..), SumEncoding(..))
import Data.Aeson.TH (deriveJSON)
import Data.List.NonEmpty (NonEmpty)
import Data.Unique
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@ -75,7 +80,26 @@ data JobCtl = JobCtlFlush
instance Hashable JobCtl
newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
deriving (Eq, Ord)
showWorkerId :: JobWorkerId -> Text
-- ^ Make a `JobWorkerId` somewhat human readable as a small-ish Number
showWorkerId = tshow . hashUnique . jobWorkerUnique
newWorkerId :: MonadIO m => m JobWorkerId
newWorkerId = JobWorkerId <$> liftIO newUnique
data JobContext = JobContext
{ jobCrontab :: TMVar (Crontab JobCtl)
{ jobCrontab :: TVar (Crontab JobCtl)
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
}
data JobState = JobState
{ jobWorkers :: Map (Async ()) (TChan JobCtl)
, jobWorkerName :: Async () -> JobWorkerId
, jobContext :: JobContext
, jobPoolManager :: Async ()
, jobCron :: Async ()
, jobShutdown :: TMVar ()
}

View File

@ -26,6 +26,7 @@ import Utils.Route as Utils
import Utils.Message as Utils
import Utils.Lang as Utils
import Utils.Parameters as Utils
import Control.Concurrent.Async.Lifted.Safe.Utils as Utils
import Text.Blaze (Markup, ToMarkup)