fix(cron-exec): consider lastExec before executing job

This commit is contained in:
Gregor Kleen 2019-11-19 17:09:49 +01:00
parent 6b10299345
commit 43833db3e1

View File

@ -251,23 +251,26 @@ execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerFor Uni
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab = do
mapRWST (liftHandler . runDB . setSerializable) $ do
let
mergeLastExec (Entity _leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = return ()
let
mergeState :: MonadResource m => RWST _ () _ (ReaderT SqlBackend m) ()
mergeState = do
let
mergeLastExec (Entity _leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = return ()
mergeQueued (Entity _qjId QueuedJob{..})
| Just job <- Aeson.parseMaybe parseJSON queuedJobContent
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime)
| otherwise = return ()
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
mergeQueued (Entity _qjId QueuedJob{..})
| Just job <- Aeson.parseMaybe parseJSON queuedJobContent
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime)
| otherwise = return ()
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
mapRWST (liftHandler . runDB . setSerializable) $ mergeState
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
(currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do
(currentCrontab, (jobCtl, nextMatch), currentState) <- mapRWST (liftIO . atomically) $ do
crontab <- liftBase . readTVar =<< asks jobCrontab
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
@ -275,7 +278,7 @@ execCrontab = do
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
Just x -> return (crontab, x, prevExec)
do
lastTimes <- State.get
@ -284,18 +287,24 @@ execCrontab = do
let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do
newCrontab <- lift . hoist lift $ determineCrontab'
if
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
foundation <- getYesod
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> lift $ queueDBJobCron job
other -> runReaderT ?? foundation $ writeJobCtl other
| otherwise
-> mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
when (newCrontab /= currentCrontab) $
mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
mergeState
newState <- State.get
let upToDate = and
[ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
, ((==) `on` HashMap.lookup jobCtl) newState currentState
]
when upToDate $ do
now <- liftIO $ getCurrentTime
foundation <- getYesod
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> lift $ queueDBJobCron job
other -> runReaderT ?? foundation $ writeJobCtl other
case nextMatch of
MatchAsap -> doJob