fix(jobs): only write CronLastExec after job has executed

This commit is contained in:
Gregor Kleen 2019-08-05 19:04:09 +02:00
parent 8351d8b68e
commit 67eda82bbc
3 changed files with 37 additions and 24 deletions

View File

@ -5,6 +5,7 @@ QueuedJob
creationTime UTCTime creationTime UTCTime
lockInstance InstanceId Maybe -- instance that has started to execute this job lockInstance InstanceId Maybe -- instance that has started to execute this job
lockTime UTCTime Maybe -- time when execution had begun lockTime UTCTime Maybe -- time when execution had begun
writeLastExec Bool default=false -- record successful execution to CronLastExec
deriving Eq Read Show Generic Typeable deriving Eq Read Show Generic Typeable
-- Jobs are deleted from @QueuedJob@ after they are executed successfully and recorded in @CronLastExec@ -- Jobs are deleted from @QueuedJob@ after they are executed successfully and recorded in @CronLastExec@

View File

@ -17,7 +17,7 @@ import qualified Data.Conduit.List as C
import qualified Data.Text.Lazy as LT import qualified Data.Text.Lazy as LT
import Data.Aeson (fromJSON, toJSON) import Data.Aeson (fromJSON)
import qualified Data.Aeson as Aeson import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson import qualified Data.Aeson.Types as Aeson
import Database.Persist.Sql (fromSqlKey) import Database.Persist.Sql (fromSqlKey)
@ -215,11 +215,17 @@ execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWo
execCrontab = do execCrontab = do
mapRWST (liftHandlerT . runDB . setSerializable) $ do mapRWST (liftHandlerT . runDB . setSerializable) $ do
let let
merge (Entity leId CronLastExec{..}) mergeLastExec (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId | otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
mergeQueued (Entity qjId QueuedJob{..})
| Just job <- Aeson.parseMaybe parseJSON queuedJobContent
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime)
| otherwise = lift $ delete qjId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
refT <- liftIO getCurrentTime refT <- liftIO getCurrentTime
settings <- getsYesod appSettings' settings <- getsYesod appSettings'
@ -240,19 +246,9 @@ execCrontab = do
-> do -> do
now <- liftIO $ getCurrentTime now <- liftIO $ getCurrentTime
foundation <- getYesod foundation <- getYesod
let instanceID' = foundation ^. _appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of case jobCtl of
JobCtlQueue job -> do JobCtlQueue job -> lift $ queueDBJobCron job
void . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift $ queueDBJob job
other -> runReaderT ?? foundation $ writeJobCtl other other -> runReaderT ?? foundation $ writeJobCtl other
| otherwise | otherwise
-> mapRWST (liftIO . atomically) $ -> mapRWST (liftIO . atomically) $
@ -351,10 +347,25 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
instanceID' <- getsYesod $ view instanceID
now <- liftIO getCurrentTime
performJob content performJob content
-- `performJob` is expected to throw an exception if it detects that the job was not done -- `performJob` is expected to throw an exception if it detects that the job was not done
runDB $ delete jId runDB . setSerializable $ do
when queuedJobWriteLastExec $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
{ cronLastExecJob = queuedJobContent
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now
, CronLastExecInstance =. instanceID'
]
delete jId
handleCmd JobCtlDetermineCrontab = do handleCmd JobCtlDetermineCrontab = do
newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab' newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab'
-- logDebugS logIdent $ tshow newCTab -- logDebugS logIdent $ tshow newCTab

View File

@ -4,6 +4,7 @@ module Jobs.Queue
, queueJob, queueJob' , queueJob, queueJob'
, YesodJobDB , YesodJobDB
, runDBJobs, queueDBJob, sinkDBJobs , runDBJobs, queueDBJob, sinkDBJobs
, queueDBJobCron
, module Jobs.Types , module Jobs.Types
) where ) where
@ -78,16 +79,15 @@ writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon -- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do queueJobUnsafe queuedJobWriteLastExec job = do
now <- liftIO getCurrentTime queuedJobCreationTime <- liftIO getCurrentTime
self <- getsYesod appInstanceID queuedJobCreationInstance <- getsYesod appInstanceID
insert QueuedJob insert QueuedJob
{ queuedJobContent = toJSON job { queuedJobContent = toJSON job
, queuedJobCreationInstance = self
, queuedJobCreationTime = now
, queuedJobLockInstance = Nothing , queuedJobLockInstance = Nothing
, queuedJobLockTime = Nothing , queuedJobLockTime = Nothing
, ..
} }
-- We should not immediately notify a worker; instead wait for the transaction to finish first -- We should not immediately notify a worker; instead wait for the transaction to finish first
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
@ -97,7 +97,7 @@ queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
-- ^ Queue a job for later execution -- ^ Queue a job for later execution
-- --
-- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`) -- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`)
queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe False
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m () queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap -- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
@ -108,9 +108,10 @@ queueJob' job = do
-- | Slightly modified Version of `YesodDB` for `runDBJobs` -- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO)) type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
queueDBJob :: Job -> YesodJobDB UniWorX () queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX ()
-- | Queue a job as part of a database transaction and execute it after the transaction succeeds -- | Queue a job as part of a database transaction and execute it after the transaction succeeds
queueDBJob job = mapReaderT lift (queueJobUnsafe job) >>= tell . Set.singleton queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . Set.singleton
queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . Set.singleton
sinkDBJobs :: Sink Job (YesodJobDB UniWorX) () sinkDBJobs :: Sink Job (YesodJobDB UniWorX) ()
-- | Queue many jobs as part of a database transaction and execute them after the transaction passes -- | Queue many jobs as part of a database transaction and execute them after the transaction passes