From 970ca784b0286a0f8341356e9769d9a80ab60903 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 5 May 2020 17:18:29 +0200 Subject: [PATCH] fix(jobs): reduce likelihood for multiple queueing of notifications --- src/Import.hs | 2 ++ src/Jobs.hs | 40 +++++++++++---------- src/Jobs/Handler/ChangeUserDisplayEmail.hs | 4 +-- src/Jobs/Handler/DistributeCorrections.hs | 8 ++--- src/Jobs/Handler/HelpRequest.hs | 4 +-- src/Jobs/Handler/Invitation.hs | 4 +-- src/Jobs/Handler/PruneFiles.hs | 12 +++---- src/Jobs/Handler/PruneInvitations.hs | 8 ++--- src/Jobs/Handler/QueueNotification.hs | 5 ++- src/Jobs/Handler/SendCourseCommunication.hs | 4 +-- src/Jobs/Handler/SendNotification.hs | 4 +-- src/Jobs/Handler/SendPasswordReset.hs | 4 +-- src/Jobs/Handler/SendTestEmail.hs | 4 +-- src/Jobs/Handler/SetLogSettings.hs | 4 +-- src/Jobs/Handler/SynchroniseLdap.hs | 8 ++--- src/Jobs/Handler/TransactionLog.hs | 10 +++--- src/Jobs/Queue.hs | 5 +-- src/Jobs/Types.hs | 15 +++++++- 18 files changed, 80 insertions(+), 65 deletions(-) diff --git a/src/Import.hs b/src/Import.hs index cf2787a10..b93336ae8 100644 --- a/src/Import.hs +++ b/src/Import.hs @@ -7,3 +7,5 @@ import Import.NoFoundation as Import import Utils.SystemMessage as Import import Utils.Metrics as Import + +import Jobs.Types as Import (JobHandler(..)) diff --git a/src/Jobs.hs b/src/Jobs.hs index ef140a7ad..65e0335e2 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -385,24 +385,28 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do instanceID' <- getsYesod $ view instanceID now <- liftIO getCurrentTime - - performJob content - & withJobWorkerState wNum (JobWorkerExecJob content) - -- `performJob` is expected to throw an exception if it detects that the job was not done - runDB . setSerializable $ do - when queuedJobWriteLastExec $ - void $ upsertBy - (UniqueCronLastExec queuedJobContent) - CronLastExec - { cronLastExecJob = queuedJobContent - , cronLastExecTime = now - , cronLastExecInstance = instanceID' - } - [ CronLastExecTime =. now - , CronLastExecInstance =. instanceID' - ] - delete jId + let cleanup = do + when queuedJobWriteLastExec $ + void $ upsertBy + (UniqueCronLastExec queuedJobContent) + CronLastExec + { cronLastExecJob = queuedJobContent + , cronLastExecTime = now + , cronLastExecInstance = instanceID' + } + [ CronLastExecTime =. now + , CronLastExecInstance =. instanceID' + ] + delete jId + + case performJob content of + JobHandlerAtomic act -> runDBJobs . setSerializable $ do + act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + JobHandlerException act -> do + act & withJobWorkerState wNum (JobWorkerExecJob content) + runDB $ setSerializable cleanup handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandler . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab @@ -479,5 +483,5 @@ determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab -performJob :: Job -> HandlerFor UniWorX () +performJob :: Job -> JobHandler UniWorX performJob = $(dispatchTH ''Job) diff --git a/src/Jobs/Handler/ChangeUserDisplayEmail.hs b/src/Jobs/Handler/ChangeUserDisplayEmail.hs index 0d256c0ca..52daa4188 100644 --- a/src/Jobs/Handler/ChangeUserDisplayEmail.hs +++ b/src/Jobs/Handler/ChangeUserDisplayEmail.hs @@ -10,8 +10,8 @@ import qualified Data.CaseInsensitive as CI import Text.Hamlet -dispatchJobChangeUserDisplayEmail :: UserId -> UserEmail -> Handler () -dispatchJobChangeUserDisplayEmail jUser jDisplayEmail = do +dispatchJobChangeUserDisplayEmail :: UserId -> UserEmail -> JobHandler UniWorX +dispatchJobChangeUserDisplayEmail jUser jDisplayEmail = JobHandlerException $ do bearer <- bearerRestrict SetDisplayEmailR jDisplayEmail <$> bearerToken (HashSet.singleton $ Right jUser) (Just $ HashSet.singleton SetDisplayEmailR) Nothing Nothing Nothing jwt <- encodeBearer bearer let diff --git a/src/Jobs/Handler/DistributeCorrections.hs b/src/Jobs/Handler/DistributeCorrections.hs index af61ddfb7..21e91d905 100644 --- a/src/Jobs/Handler/DistributeCorrections.hs +++ b/src/Jobs/Handler/DistributeCorrections.hs @@ -12,8 +12,8 @@ import qualified Data.Set as Set dispatchJobDistributeCorrections :: SheetId - -> Handler () -dispatchJobDistributeCorrections jSheet = runDBJobs $ do - (_, unassigned) <- mapReaderT lift $ assignSubmissions jSheet Nothing - unless (Set.null unassigned) $ + -> JobHandler UniWorX +dispatchJobDistributeCorrections jSheet = JobHandlerAtomic $ do + unassigned <- runMaybeT . catchMaybeT (Proxy @AssignSubmissionException) . fmap (view _2) . hoist lift $ assignSubmissions jSheet Nothing + unless (maybe False Set.null unassigned) $ queueDBJob . JobQueueNotification $ NotificationCorrectionsNotDistributed jSheet diff --git a/src/Jobs/Handler/HelpRequest.hs b/src/Jobs/Handler/HelpRequest.hs index 10b4e310e..ca07dee2b 100644 --- a/src/Jobs/Handler/HelpRequest.hs +++ b/src/Jobs/Handler/HelpRequest.hs @@ -18,8 +18,8 @@ dispatchJobHelpRequest :: Either (Maybe Address) UserId -> Maybe Html -- ^ Help Request -> Maybe Text -- ^ Referer -> Maybe ErrorResponse - -> Handler () -dispatchJobHelpRequest jSender jRequestTime jHelpSubject jHelpRequest jReferer jError = do + -> JobHandler UniWorX +dispatchJobHelpRequest jSender jRequestTime jHelpSubject jHelpRequest jReferer jError = JobHandlerException $ do supportAddress <- getsYesod $ view _appMailSupport userInfo <- bitraverse return (runDB . getEntity) jSender let senderAddress = either diff --git a/src/Jobs/Handler/Invitation.hs b/src/Jobs/Handler/Invitation.hs index 01339cffd..56d7203c9 100644 --- a/src/Jobs/Handler/Invitation.hs +++ b/src/Jobs/Handler/Invitation.hs @@ -14,8 +14,8 @@ dispatchJobInvitation :: UserId -> Text -> Text -> Html - -> Handler () -dispatchJobInvitation jInviter jInvitee jInvitationUrl jInvitationSubject jInvitationExplanation = do + -> JobHandler UniWorX +dispatchJobInvitation jInviter jInvitee jInvitationUrl jInvitationSubject jInvitationExplanation = JobHandlerException $ do mInviter <- runDB $ get jInviter whenIsJust mInviter $ \jInviter' -> mailT def $ do diff --git a/src/Jobs/Handler/PruneFiles.hs b/src/Jobs/Handler/PruneFiles.hs index be2870fef..d49b198dc 100644 --- a/src/Jobs/Handler/PruneFiles.hs +++ b/src/Jobs/Handler/PruneFiles.hs @@ -11,17 +11,17 @@ import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E -dispatchJobPruneSessionFiles :: Handler () -dispatchJobPruneSessionFiles = do +dispatchJobPruneSessionFiles :: JobHandler UniWorX +dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do now <- liftIO getCurrentTime expires <- getsYesod $ view _appSessionFilesExpire - n <- runDB $ deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] + n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|] -dispatchJobPruneUnreferencedFiles :: Handler () -dispatchJobPruneUnreferencedFiles = do - n <- runDB . E.deleteCount . E.from $ \file -> +dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX +dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do + n <- E.deleteCount . E.from $ \file -> E.where_ . E.not_ . E.any E.exists $ references file $logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|] where diff --git a/src/Jobs/Handler/PruneInvitations.hs b/src/Jobs/Handler/PruneInvitations.hs index c9456454c..e7516f204 100644 --- a/src/Jobs/Handler/PruneInvitations.hs +++ b/src/Jobs/Handler/PruneInvitations.hs @@ -6,8 +6,8 @@ import Import import Database.Persist.Sql (deleteWhereCount) -dispatchJobPruneInvitations :: Handler () -dispatchJobPruneInvitations = do +dispatchJobPruneInvitations :: JobHandler UniWorX +dispatchJobPruneInvitations = JobHandlerAtomic . hoist lift $ do now <- liftIO getCurrentTime - n <- runDB $ deleteWhereCount [ InvitationExpiresAt <. Just now ] - $logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|] \ No newline at end of file + n <- deleteWhereCount [ InvitationExpiresAt <. Just now ] + $logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|] diff --git a/src/Jobs/Handler/QueueNotification.hs b/src/Jobs/Handler/QueueNotification.hs index eaa63d135..cb62e91ed 100644 --- a/src/Jobs/Handler/QueueNotification.hs +++ b/src/Jobs/Handler/QueueNotification.hs @@ -8,7 +8,6 @@ import Jobs.Types import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E -import Utils.Sql import Jobs.Queue import qualified Data.Set as Set @@ -17,8 +16,8 @@ import Handler.Utils.ExamOffice.Exam import Handler.Utils.ExamOffice.ExternalExam -dispatchJobQueueNotification :: Notification -> Handler () -dispatchJobQueueNotification jNotification = runDBJobs . setSerializable $ do +dispatchJobQueueNotification :: Notification -> JobHandler UniWorX +dispatchJobQueueNotification jNotification = JobHandlerAtomic $ do candidates <- hoist lift $ determineNotificationCandidates jNotification nClass <- hoist lift $ classifyNotification jNotification mapM_ (queueDBJob . flip JobSendNotification jNotification) $ do diff --git a/src/Jobs/Handler/SendCourseCommunication.hs b/src/Jobs/Handler/SendCourseCommunication.hs index 2030d0c29..cff458364 100644 --- a/src/Jobs/Handler/SendCourseCommunication.hs +++ b/src/Jobs/Handler/SendCourseCommunication.hs @@ -22,8 +22,8 @@ dispatchJobSendCourseCommunication :: Either UserEmail UserId -> UUID -> Maybe Text -> Html - -> Handler () -dispatchJobSendCourseCommunication jRecipientEmail jAllRecipientAddresses jCourse jSender jMailObjectUUID jSubject jMailContent = do + -> JobHandler UniWorX +dispatchJobSendCourseCommunication jRecipientEmail jAllRecipientAddresses jCourse jSender jMailObjectUUID jSubject jMailContent = JobHandlerException $ do (sender, Course{..}) <- runDB $ (,) <$> getJust jSender <*> getJust jCourse diff --git a/src/Jobs/Handler/SendNotification.hs b/src/Jobs/Handler/SendNotification.hs index cec2387bd..32e7c71e5 100644 --- a/src/Jobs/Handler/SendNotification.hs +++ b/src/Jobs/Handler/SendNotification.hs @@ -22,8 +22,8 @@ import Jobs.Handler.SendNotification.CourseRegistered import Jobs.Handler.SendNotification.SubmissionEdited -dispatchJobSendNotification :: UserId -> Notification -> Handler () -dispatchJobSendNotification jRecipient jNotification = do +dispatchJobSendNotification :: UserId -> Notification -> JobHandler UniWorX +dispatchJobSendNotification jRecipient jNotification = JobHandlerException $ do $(dispatchTH ''Notification) jNotification jRecipient instanceID' <- getsYesod $ view instanceID diff --git a/src/Jobs/Handler/SendPasswordReset.hs b/src/Jobs/Handler/SendPasswordReset.hs index 352b37b7f..ce2aadd93 100644 --- a/src/Jobs/Handler/SendPasswordReset.hs +++ b/src/Jobs/Handler/SendPasswordReset.hs @@ -14,8 +14,8 @@ import qualified Data.HashSet as HashSet import Text.Hamlet dispatchJobSendPasswordReset :: UserId - -> Handler () -dispatchJobSendPasswordReset jRecipient = userMailT jRecipient $ do + -> JobHandler UniWorX +dispatchJobSendPasswordReset jRecipient = JobHandlerException . userMailT jRecipient $ do cID <- encrypt jRecipient User{..} <- liftHandler . runDB $ getJust jRecipient diff --git a/src/Jobs/Handler/SendTestEmail.hs b/src/Jobs/Handler/SendTestEmail.hs index b37264906..d396cc7c1 100644 --- a/src/Jobs/Handler/SendTestEmail.hs +++ b/src/Jobs/Handler/SendTestEmail.hs @@ -7,8 +7,8 @@ import Import import Handler.Utils.Mail import Handler.Utils.DateTime -dispatchJobSendTestEmail :: Email -> MailContext -> Handler () -dispatchJobSendTestEmail jEmail jMailContext = mailT jMailContext $ do +dispatchJobSendTestEmail :: Email -> MailContext -> JobHandler UniWorX +dispatchJobSendTestEmail jEmail jMailContext = JobHandlerException . mailT jMailContext $ do _mailTo .= [Address Nothing jEmail] replaceMailHeader "Auto-Submitted" $ Just "auto-generated" setSubjectI MsgMailTestSubject diff --git a/src/Jobs/Handler/SetLogSettings.hs b/src/Jobs/Handler/SetLogSettings.hs index 8d7ecbbff..5aef4aebc 100644 --- a/src/Jobs/Handler/SetLogSettings.hs +++ b/src/Jobs/Handler/SetLogSettings.hs @@ -4,8 +4,8 @@ module Jobs.Handler.SetLogSettings import Import -dispatchJobSetLogSettings :: InstanceId -> LogSettings -> Handler () -dispatchJobSetLogSettings jInstance jLogSettings = do +dispatchJobSetLogSettings :: InstanceId -> LogSettings -> JobHandler UniWorX +dispatchJobSetLogSettings jInstance jLogSettings = JobHandlerException $ do instanceId <- getsYesod appInstanceID unless (instanceId == jInstance) . liftIO $ fail "Incorrect instance" lSettings <- getsYesod appLogSettings diff --git a/src/Jobs/Handler/SynchroniseLdap.hs b/src/Jobs/Handler/SynchroniseLdap.hs index 42c40db87..26dcf2b82 100644 --- a/src/Jobs/Handler/SynchroniseLdap.hs +++ b/src/Jobs/Handler/SynchroniseLdap.hs @@ -17,9 +17,9 @@ data SynchroniseLdapException deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable) instance Exception SynchroniseLdapException -dispatchJobSynchroniseLdap :: Natural -> Natural -> Natural -> Handler () +dispatchJobSynchroniseLdap :: Natural -> Natural -> Natural -> JobHandler UniWorX dispatchJobSynchroniseLdap numIterations epoch iteration - = runDBJobs . runConduit $ + = JobHandlerAtomic . runConduit $ readUsers .| filterIteration .| sinkDBJobs where readUsers :: ConduitT () UserId (YesodJobDB UniWorX) () @@ -36,8 +36,8 @@ dispatchJobSynchroniseLdap numIterations epoch iteration return $ JobSynchroniseLdapUser userId -dispatchJobSynchroniseLdapUser :: UserId -> Handler () -dispatchJobSynchroniseLdapUser jUser = do +dispatchJobSynchroniseLdapUser :: UserId -> JobHandler UniWorX +dispatchJobSynchroniseLdapUser jUser = JobHandlerException $ do UniWorX{ appSettings' = AppSettings{..}, .. } <- getYesod case appLdapPool of Just ldapPool -> diff --git a/src/Jobs/Handler/TransactionLog.hs b/src/Jobs/Handler/TransactionLog.hs index 0e131b05f..fb856ba2c 100644 --- a/src/Jobs/Handler/TransactionLog.hs +++ b/src/Jobs/Handler/TransactionLog.hs @@ -8,8 +8,8 @@ import Handler.Utils.DateTime import Database.Persist.Sql (updateWhereCount, deleteWhereCount) -dispatchJobTruncateTransactionLog, dispatchJobDeleteTransactionLogIPs :: Handler () -dispatchJobTruncateTransactionLog = do +dispatchJobTruncateTransactionLog, dispatchJobDeleteTransactionLogIPs :: JobHandler UniWorX +dispatchJobTruncateTransactionLog = JobHandlerAtomic . hoist lift $ do now <- liftIO getCurrentTime let localNow = utcToLocalTime now (localCurrentYear, _, _) = toGregorian $ localDay localNow @@ -20,12 +20,12 @@ dispatchJobTruncateTransactionLog = do LTUUnique utc' _ -> utc' _other -> startOfPreviousYear - n <- runDB $ deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ] + n <- deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ] $logInfoS "TruncateTransactionLog" [st|Deleted #{n} transaction log entries|] -dispatchJobDeleteTransactionLogIPs = do +dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do now <- liftIO getCurrentTime retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime let cutoff = addUTCTime (- retentionTime) now - n <- runDB $ updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ] + n <- updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ] $logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|] diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index 38158efe0..5275304d7 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -2,7 +2,7 @@ module Jobs.Queue ( writeJobCtl, writeJobCtlBlock , writeJobCtl', writeJobCtlBlock' , queueJob, queueJob' - , YesodJobDB, JobDB + , JobDB , runDBJobs, queueDBJob, sinkDBJobs , runDBJobs' , queueDBJobCron @@ -108,9 +108,6 @@ queueJob' job = do app <- getYesod queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform --- | Slightly modified Version of `YesodDB` for `runDBJobs` -type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerFor site)) - -- | Slightly modified Version of `DB` for `runDBJobs` type JobDB = YesodJobDB UniWorX diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 79acc5c7c..98f824381 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -6,6 +6,8 @@ module Jobs.Types , classifyJob , JobCtl(..) , classifyJobCtl + , YesodJobDB + , JobHandler(..), _JobHandlerAtomic, _JobHandlerException , JobContext(..) , JobState(..) , jobWorkerNames @@ -149,13 +151,24 @@ deriveJSON defaultOptions , sumEncoding = TaggedObject "instruction" "data" } ''JobCtl - classifyJobCtl :: JobCtl -> String classifyJobCtl jobctl = unpack tag where Aeson.Object obj = Aeson.toJSON jobctl Aeson.String tag = obj HashMap.! "instruction" + +-- | Slightly modified Version of `YesodDB` for `runDBJobs` +type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerFor site)) + +data JobHandler site + = JobHandlerAtomic (YesodJobDB site ()) + | JobHandlerException (HandlerFor site ()) + deriving (Generic, Typeable) + +makePrisms ''JobHandler + + data JobWorkerState = JobWorkerBusy | JobWorkerExecJobCtl { jobWorkerJobCtl :: JobCtl }