fix(jobs): prevent offloading instances from deleting cron last exec

This commit is contained in:
Gregor Kleen 2021-02-10 16:55:27 +01:00
parent 8a3f76054d
commit e61b5611b1
4 changed files with 38 additions and 23 deletions

View File

@ -500,8 +500,8 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
handleCmd (JobCtlPerform jId) = do
jMode <- getsYesod $ view _appJobMode
case jMode of
JobsLocal{} -> performLocal
JobsOffload -> performOffload
_otherwise -> performLocal
where
performOffload = hoist atomically $ do
JobOffloadHandler{..} <- lift . readTMVar =<< asks jobOffload
@ -519,8 +519,9 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
instanceID' <- getsYesod $ view instanceID
now <- liftIO getCurrentTime
jMode <- getsYesod $ view _appJobMode
let cleanup = do
when queuedJobWriteLastExec $
when (queuedJobWriteLastExec && modeWriteLastExec) $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
@ -532,20 +533,25 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
, CronLastExecInstance =. instanceID'
]
delete jId
modeWriteLastExec = case jMode of
JobsDrop{..} -> jobsWriteFakeLastExec
_otherwise -> True
case performJob content of
JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do
act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializableBatch cleanup
JobHandlerAtomicWithFinalizer act fin -> do
res <- runDBJobs . setSerializableBatch $ do
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
case jMode of
JobsDrop{} -> runDB $ setSerializableBatch cleanup
_otherwise -> case performJob content of
JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do
act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
return res
fin res
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializableBatch cleanup
JobHandlerAtomicWithFinalizer act fin -> do
res <- runDBJobs . setSerializableBatch $ do
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
return res
fin res
handleCmd JobCtlDetermineCrontab = do
$logDebugS logIdent "DetermineCrontab..."
newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab'
@ -614,21 +620,26 @@ jLocked jId act = flip evalStateT False $ do
pruneLastExecs :: Crontab JobCtl -> DB ()
pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab
pruneLastExecs crontab = do
jMode <- getsYesod $ view _appJobMode
when (is _JobsLocal jMode) $ do
Sum deleted <- runConduit $ selectSource [] [] .| C.foldMapM ensureCrontab
when (deleted > 0) $
$logInfoS "pruneLastExeces" [st|Deleted #{deleted} entries|]
where
ensureCrontab (Entity leId CronLastExec{..}) = void . runMaybeT $ do
ensureCrontab :: Entity CronLastExec -> DB (Sum Natural)
ensureCrontab (Entity leId CronLastExec{..}) = maybeT (return mempty) $ do
now <- liftIO getCurrentTime
flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval
if
| abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2
-> return ()
-> return mempty
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
, not $ HashMap.member (JobCtlQueue job) crontab
-> lift $ delete leId
-> Sum 1 <$ lift (delete leId)
| otherwise
-> return ()
-> return mempty
determineCrontab' :: DB (Crontab JobCtl)
determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab

View File

@ -49,7 +49,7 @@ determineCrontab = execWriterT $ do
}
Nothing -> mempty
when (is _JobsLocal appJobMode) $ do
when (isn't _JobsOffload appJobMode) $ do
case appJobFlushInterval of
Just interval -> tell $ HashMap.singleton
JobCtlFlush

View File

@ -29,12 +29,11 @@ mkJobOffloadHandler :: forall m.
=> PostgresConf -> JobMode
-> Maybe (m JobOffloadHandler)
mkJobOffloadHandler dbConf jMode
| is _JobsLocal jMode, hasn't (_jobsAcceptOffload . only True) jMode = Nothing
| not shouldListen = Nothing
| otherwise = Just $ do
jobOffloadOutgoing <- newTVarIO mempty
jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do
myPid <- liftIO $ PG.getBackendPID pgConn
let shouldListen = has (_jobsAcceptOffload . only True) jMode
when shouldListen $
void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel)
@ -68,3 +67,4 @@ mkJobOffloadHandler dbConf jMode
Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId)
return JobOffloadHandler{..}
where shouldListen = has (_jobsAcceptOffload . only True) jMode

View File

@ -216,6 +216,10 @@ data AppSettings = AppSettings
data JobMode = JobsLocal { jobsAcceptOffload :: Bool }
| JobsOffload
| JobsDrop
{ jobsAcceptOffload :: Bool
, jobsWriteFakeLastExec :: Bool
}
deriving (Eq, Ord, Read, Show, Generic, Typeable)
deriving anyclass (Hashable)