From e61b5611b1568180aa3ccfc3e3b981eb9a13cd53 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 10 Feb 2021 16:55:27 +0100 Subject: [PATCH] fix(jobs): prevent offloading instances from deleting cron last exec --- src/Jobs.hs | 51 +++++++++++++++++++++++++++------------------ src/Jobs/Crontab.hs | 2 +- src/Jobs/Offload.hs | 4 ++-- src/Settings.hs | 4 ++++ 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/Jobs.hs b/src/Jobs.hs index e3ab2c09c..00fe2b1ef 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -500,8 +500,8 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker handleCmd (JobCtlPerform jId) = do jMode <- getsYesod $ view _appJobMode case jMode of - JobsLocal{} -> performLocal JobsOffload -> performOffload + _otherwise -> performLocal where performOffload = hoist atomically $ do JobOffloadHandler{..} <- lift . readTMVar =<< asks jobOffload @@ -519,8 +519,9 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker instanceID' <- getsYesod $ view instanceID now <- liftIO getCurrentTime + jMode <- getsYesod $ view _appJobMode let cleanup = do - when queuedJobWriteLastExec $ + when (queuedJobWriteLastExec && modeWriteLastExec) $ void $ upsertBy (UniqueCronLastExec queuedJobContent) CronLastExec @@ -532,20 +533,25 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker , CronLastExecInstance =. instanceID' ] delete jId + modeWriteLastExec = case jMode of + JobsDrop{..} -> jobsWriteFakeLastExec + _otherwise -> True - case performJob content of - JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do - act & withJobWorkerState wNum (JobWorkerExecJob content) - hoist lift cleanup - JobHandlerException act -> do - act & withJobWorkerState wNum (JobWorkerExecJob content) - runDB $ setSerializableBatch cleanup - JobHandlerAtomicWithFinalizer act fin -> do - res <- runDBJobs . setSerializableBatch $ do - res <- act & withJobWorkerState wNum (JobWorkerExecJob content) + case jMode of + JobsDrop{} -> runDB $ setSerializableBatch cleanup + _otherwise -> case performJob content of + JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do + act & withJobWorkerState wNum (JobWorkerExecJob content) hoist lift cleanup - return res - fin res + JobHandlerException act -> do + act & withJobWorkerState wNum (JobWorkerExecJob content) + runDB $ setSerializableBatch cleanup + JobHandlerAtomicWithFinalizer act fin -> do + res <- runDBJobs . setSerializableBatch $ do + res <- act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + return res + fin res handleCmd JobCtlDetermineCrontab = do $logDebugS logIdent "DetermineCrontab..." newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab' @@ -614,21 +620,26 @@ jLocked jId act = flip evalStateT False $ do pruneLastExecs :: Crontab JobCtl -> DB () -pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab +pruneLastExecs crontab = do + jMode <- getsYesod $ view _appJobMode + when (is _JobsLocal jMode) $ do + Sum deleted <- runConduit $ selectSource [] [] .| C.foldMapM ensureCrontab + when (deleted > 0) $ + $logInfoS "pruneLastExeces" [st|Deleted #{deleted} entries|] where - ensureCrontab (Entity leId CronLastExec{..}) = void . runMaybeT $ do + ensureCrontab :: Entity CronLastExec -> DB (Sum Natural) + ensureCrontab (Entity leId CronLastExec{..}) = maybeT (return mempty) $ do now <- liftIO getCurrentTime flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval - if | abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2 - -> return () + -> return mempty | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob , not $ HashMap.member (JobCtlQueue job) crontab - -> lift $ delete leId + -> Sum 1 <$ lift (delete leId) | otherwise - -> return () + -> return mempty determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index 5bdc9f277..79adf7a33 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -49,7 +49,7 @@ determineCrontab = execWriterT $ do } Nothing -> mempty - when (is _JobsLocal appJobMode) $ do + when (isn't _JobsOffload appJobMode) $ do case appJobFlushInterval of Just interval -> tell $ HashMap.singleton JobCtlFlush diff --git a/src/Jobs/Offload.hs b/src/Jobs/Offload.hs index 2991e03f2..1176823e9 100644 --- a/src/Jobs/Offload.hs +++ b/src/Jobs/Offload.hs @@ -29,12 +29,11 @@ mkJobOffloadHandler :: forall m. => PostgresConf -> JobMode -> Maybe (m JobOffloadHandler) mkJobOffloadHandler dbConf jMode - | is _JobsLocal jMode, hasn't (_jobsAcceptOffload . only True) jMode = Nothing + | not shouldListen = Nothing | otherwise = Just $ do jobOffloadOutgoing <- newTVarIO mempty jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do myPid <- liftIO $ PG.getBackendPID pgConn - let shouldListen = has (_jobsAcceptOffload . only True) jMode when shouldListen $ void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel) @@ -68,3 +67,4 @@ mkJobOffloadHandler dbConf jMode Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId) return JobOffloadHandler{..} + where shouldListen = has (_jobsAcceptOffload . only True) jMode diff --git a/src/Settings.hs b/src/Settings.hs index f77a06aa9..22726addd 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -216,6 +216,10 @@ data AppSettings = AppSettings data JobMode = JobsLocal { jobsAcceptOffload :: Bool } | JobsOffload + | JobsDrop + { jobsAcceptOffload :: Bool + , jobsWriteFakeLastExec :: Bool + } deriving (Eq, Ord, Read, Show, Generic, Typeable) deriving anyclass (Hashable)