diff --git a/src/Jobs.hs b/src/Jobs.hs index 98ebf0209..88e66b146 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -251,23 +251,26 @@ execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerFor Uni -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it execCrontab = do - mapRWST (liftHandler . runDB . setSerializable) $ do - let - mergeLastExec (Entity _leId CronLastExec{..}) - | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob - = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) - | otherwise = return () + let + mergeState :: MonadResource m => RWST _ () _ (ReaderT SqlBackend m) () + mergeState = do + let + mergeLastExec (Entity _leId CronLastExec{..}) + | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob + = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) + | otherwise = return () - mergeQueued (Entity _qjId QueuedJob{..}) - | Just job <- Aeson.parseMaybe parseJSON queuedJobContent - = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime) - | otherwise = return () - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued + mergeQueued (Entity _qjId QueuedJob{..}) + | Just job <- Aeson.parseMaybe parseJSON queuedJobContent + = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime) + | otherwise = return () + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued + mapRWST (liftHandler . runDB . setSerializable) $ mergeState refT <- liftIO getCurrentTime settings <- getsYesod appSettings' - (currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do + (currentCrontab, (jobCtl, nextMatch), currentState) <- mapRWST (liftIO . atomically) $ do crontab <- liftBase . readTVar =<< asks jobCrontab State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab @@ -275,7 +278,7 @@ execCrontab = do case earliestJob settings prevExec crontab refT of Nothing -> liftBase retry Just (_, MatchNone) -> liftBase retry - Just x -> return (crontab, x) + Just x -> return (crontab, x, prevExec) do lastTimes <- State.get @@ -284,18 +287,24 @@ execCrontab = do let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do newCrontab <- lift . hoist lift $ determineCrontab' - if - | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab - -> do - now <- liftIO $ getCurrentTime - foundation <- getYesod - State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl - case jobCtl of - JobCtlQueue job -> lift $ queueDBJobCron job - other -> runReaderT ?? foundation $ writeJobCtl other - | otherwise - -> mapRWST (liftIO . atomically) $ - liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab + when (newCrontab /= currentCrontab) $ + mapRWST (liftIO . atomically) $ + liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab + + mergeState + newState <- State.get + + let upToDate = and + [ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab + , ((==) `on` HashMap.lookup jobCtl) newState currentState + ] + when upToDate $ do + now <- liftIO $ getCurrentTime + foundation <- getYesod + State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl + case jobCtl of + JobCtlQueue job -> lift $ queueDBJobCron job + other -> runReaderT ?? foundation $ writeJobCtl other case nextMatch of MatchAsap -> doJob