diff --git a/models/jobs b/models/jobs index fcf0006b8..06be9fbeb 100644 --- a/models/jobs +++ b/models/jobs @@ -5,6 +5,7 @@ QueuedJob creationTime UTCTime lockInstance InstanceId Maybe -- instance that has started to execute this job lockTime UTCTime Maybe -- time when execution had begun + writeLastExec Bool default=false -- record successful execution to CronLastExec deriving Eq Read Show Generic Typeable -- Jobs are deleted from @QueuedJob@ after they are executed successfully and recorded in @CronLastExec@ diff --git a/src/Jobs.hs b/src/Jobs.hs index 63fc3f75d..82914d8d9 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -17,7 +17,7 @@ import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT -import Data.Aeson (fromJSON, toJSON) +import Data.Aeson (fromJSON) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson import Database.Persist.Sql (fromSqlKey) @@ -215,11 +215,17 @@ execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWo execCrontab = do mapRWST (liftHandlerT . runDB . setSerializable) $ do let - merge (Entity leId CronLastExec{..}) + mergeLastExec (Entity leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) | otherwise = lift $ delete leId - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge + + mergeQueued (Entity qjId QueuedJob{..}) + | Just job <- Aeson.parseMaybe parseJSON queuedJobContent + = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime) + | otherwise = lift $ delete qjId + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued refT <- liftIO getCurrentTime settings <- getsYesod appSettings' @@ -240,19 +246,9 @@ execCrontab = do -> do now <- liftIO $ getCurrentTime foundation <- getYesod - let instanceID' = foundation ^. _appInstanceID State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl case jobCtl of - JobCtlQueue job -> do - void . lift $ upsertBy - (UniqueCronLastExec $ toJSON job) - CronLastExec - { cronLastExecJob = toJSON job - , cronLastExecTime = now - , cronLastExecInstance = instanceID' - } - [ CronLastExecTime =. now ] - lift $ queueDBJob job + JobCtlQueue job -> lift $ queueDBJobCron job other -> runReaderT ?? foundation $ writeJobCtl other | otherwise -> mapRWST (liftIO . atomically) $ @@ -351,10 +347,25 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content + instanceID' <- getsYesod $ view instanceID + now <- liftIO getCurrentTime + performJob content -- `performJob` is expected to throw an exception if it detects that the job was not done - runDB $ delete jId + runDB . setSerializable $ do + when queuedJobWriteLastExec $ + void $ upsertBy + (UniqueCronLastExec queuedJobContent) + CronLastExec + { cronLastExecJob = queuedJobContent + , cronLastExecTime = now + , cronLastExecInstance = instanceID' + } + [ CronLastExecTime =. now + , CronLastExecInstance =. instanceID' + ] + delete jId handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index 8b71c2960..83b4fe72d 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -4,6 +4,7 @@ module Jobs.Queue , queueJob, queueJob' , YesodJobDB , runDBJobs, queueDBJob, sinkDBJobs + , queueDBJobCron , module Jobs.Types ) where @@ -78,16 +79,15 @@ writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl - -- | Pass an instruction to the `Job`-Workers and block until it was acted upon writeJobCtlBlock = writeJobCtlBlock' writeJobCtl -queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId -queueJobUnsafe job = do - now <- liftIO getCurrentTime - self <- getsYesod appInstanceID +queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX QueuedJobId +queueJobUnsafe queuedJobWriteLastExec job = do + queuedJobCreationTime <- liftIO getCurrentTime + queuedJobCreationInstance <- getsYesod appInstanceID insert QueuedJob { queuedJobContent = toJSON job - , queuedJobCreationInstance = self - , queuedJobCreationTime = now , queuedJobLockInstance = Nothing , queuedJobLockTime = Nothing + , .. } -- We should not immediately notify a worker; instead wait for the transaction to finish first -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) @@ -97,7 +97,7 @@ queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId -- ^ Queue a job for later execution -- -- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`) -queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe +queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe False queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m () -- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap @@ -108,9 +108,10 @@ queueJob' job = do -- | Slightly modified Version of `YesodDB` for `runDBJobs` type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO)) -queueDBJob :: Job -> YesodJobDB UniWorX () +queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX () -- | Queue a job as part of a database transaction and execute it after the transaction succeeds -queueDBJob job = mapReaderT lift (queueJobUnsafe job) >>= tell . Set.singleton +queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . Set.singleton +queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . Set.singleton sinkDBJobs :: Sink Job (YesodJobDB UniWorX) () -- | Queue many jobs as part of a database transaction and execute them after the transaction passes