fix(jobs): reduce likelihood for multiple queueing of notifications

This commit is contained in:
Gregor Kleen 2020-05-05 17:18:29 +02:00
parent d57f4b0139
commit 970ca784b0
18 changed files with 80 additions and 65 deletions

View File

@ -7,3 +7,5 @@ import Import.NoFoundation as Import
import Utils.SystemMessage as Import
import Utils.Metrics as Import
import Jobs.Types as Import (JobHandler(..))

View File

@ -385,24 +385,28 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
instanceID' <- getsYesod $ view instanceID
now <- liftIO getCurrentTime
performJob content
& withJobWorkerState wNum (JobWorkerExecJob content)
-- `performJob` is expected to throw an exception if it detects that the job was not done
runDB . setSerializable $ do
when queuedJobWriteLastExec $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
{ cronLastExecJob = queuedJobContent
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now
, CronLastExecInstance =. instanceID'
]
delete jId
let cleanup = do
when queuedJobWriteLastExec $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
{ cronLastExecJob = queuedJobContent
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now
, CronLastExecInstance =. instanceID'
]
delete jId
case performJob content of
JobHandlerAtomic act -> runDBJobs . setSerializable $ do
act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializable cleanup
handleCmd JobCtlDetermineCrontab = do
newCTab <- liftHandler . runDB $ setSerializable determineCrontab'
-- logDebugS logIdent $ tshow newCTab
@ -479,5 +483,5 @@ determineCrontab' :: DB (Crontab JobCtl)
determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab
performJob :: Job -> HandlerFor UniWorX ()
performJob :: Job -> JobHandler UniWorX
performJob = $(dispatchTH ''Job)

View File

@ -10,8 +10,8 @@ import qualified Data.CaseInsensitive as CI
import Text.Hamlet
dispatchJobChangeUserDisplayEmail :: UserId -> UserEmail -> Handler ()
dispatchJobChangeUserDisplayEmail jUser jDisplayEmail = do
dispatchJobChangeUserDisplayEmail :: UserId -> UserEmail -> JobHandler UniWorX
dispatchJobChangeUserDisplayEmail jUser jDisplayEmail = JobHandlerException $ do
bearer <- bearerRestrict SetDisplayEmailR jDisplayEmail <$> bearerToken (HashSet.singleton $ Right jUser) (Just $ HashSet.singleton SetDisplayEmailR) Nothing Nothing Nothing
jwt <- encodeBearer bearer
let

View File

@ -12,8 +12,8 @@ import qualified Data.Set as Set
dispatchJobDistributeCorrections :: SheetId
-> Handler ()
dispatchJobDistributeCorrections jSheet = runDBJobs $ do
(_, unassigned) <- mapReaderT lift $ assignSubmissions jSheet Nothing
unless (Set.null unassigned) $
-> JobHandler UniWorX
dispatchJobDistributeCorrections jSheet = JobHandlerAtomic $ do
unassigned <- runMaybeT . catchMaybeT (Proxy @AssignSubmissionException) . fmap (view _2) . hoist lift $ assignSubmissions jSheet Nothing
unless (maybe False Set.null unassigned) $
queueDBJob . JobQueueNotification $ NotificationCorrectionsNotDistributed jSheet

View File

@ -18,8 +18,8 @@ dispatchJobHelpRequest :: Either (Maybe Address) UserId
-> Maybe Html -- ^ Help Request
-> Maybe Text -- ^ Referer
-> Maybe ErrorResponse
-> Handler ()
dispatchJobHelpRequest jSender jRequestTime jHelpSubject jHelpRequest jReferer jError = do
-> JobHandler UniWorX
dispatchJobHelpRequest jSender jRequestTime jHelpSubject jHelpRequest jReferer jError = JobHandlerException $ do
supportAddress <- getsYesod $ view _appMailSupport
userInfo <- bitraverse return (runDB . getEntity) jSender
let senderAddress = either

View File

@ -14,8 +14,8 @@ dispatchJobInvitation :: UserId
-> Text
-> Text
-> Html
-> Handler ()
dispatchJobInvitation jInviter jInvitee jInvitationUrl jInvitationSubject jInvitationExplanation = do
-> JobHandler UniWorX
dispatchJobInvitation jInviter jInvitee jInvitationUrl jInvitationSubject jInvitationExplanation = JobHandlerException $ do
mInviter <- runDB $ get jInviter
whenIsJust mInviter $ \jInviter' -> mailT def $ do

View File

@ -11,17 +11,17 @@ import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
dispatchJobPruneSessionFiles :: Handler ()
dispatchJobPruneSessionFiles = do
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
now <- liftIO getCurrentTime
expires <- getsYesod $ view _appSessionFilesExpire
n <- runDB $ deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
$logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
dispatchJobPruneUnreferencedFiles :: Handler ()
dispatchJobPruneUnreferencedFiles = do
n <- runDB . E.deleteCount . E.from $ \file ->
dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX
dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
n <- E.deleteCount . E.from $ \file ->
E.where_ . E.not_ . E.any E.exists $ references file
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|]
where

View File

@ -6,8 +6,8 @@ import Import
import Database.Persist.Sql (deleteWhereCount)
dispatchJobPruneInvitations :: Handler ()
dispatchJobPruneInvitations = do
dispatchJobPruneInvitations :: JobHandler UniWorX
dispatchJobPruneInvitations = JobHandlerAtomic . hoist lift $ do
now <- liftIO getCurrentTime
n <- runDB $ deleteWhereCount [ InvitationExpiresAt <. Just now ]
$logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|]
n <- deleteWhereCount [ InvitationExpiresAt <. Just now ]
$logInfoS "PruneInvitations" [st|Deleted #{n} expired invitations|]

View File

@ -8,7 +8,6 @@ import Jobs.Types
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
import Utils.Sql
import Jobs.Queue
import qualified Data.Set as Set
@ -17,8 +16,8 @@ import Handler.Utils.ExamOffice.Exam
import Handler.Utils.ExamOffice.ExternalExam
dispatchJobQueueNotification :: Notification -> Handler ()
dispatchJobQueueNotification jNotification = runDBJobs . setSerializable $ do
dispatchJobQueueNotification :: Notification -> JobHandler UniWorX
dispatchJobQueueNotification jNotification = JobHandlerAtomic $ do
candidates <- hoist lift $ determineNotificationCandidates jNotification
nClass <- hoist lift $ classifyNotification jNotification
mapM_ (queueDBJob . flip JobSendNotification jNotification) $ do

View File

@ -22,8 +22,8 @@ dispatchJobSendCourseCommunication :: Either UserEmail UserId
-> UUID
-> Maybe Text
-> Html
-> Handler ()
dispatchJobSendCourseCommunication jRecipientEmail jAllRecipientAddresses jCourse jSender jMailObjectUUID jSubject jMailContent = do
-> JobHandler UniWorX
dispatchJobSendCourseCommunication jRecipientEmail jAllRecipientAddresses jCourse jSender jMailObjectUUID jSubject jMailContent = JobHandlerException $ do
(sender, Course{..}) <- runDB $ (,)
<$> getJust jSender
<*> getJust jCourse

View File

@ -22,8 +22,8 @@ import Jobs.Handler.SendNotification.CourseRegistered
import Jobs.Handler.SendNotification.SubmissionEdited
dispatchJobSendNotification :: UserId -> Notification -> Handler ()
dispatchJobSendNotification jRecipient jNotification = do
dispatchJobSendNotification :: UserId -> Notification -> JobHandler UniWorX
dispatchJobSendNotification jRecipient jNotification = JobHandlerException $ do
$(dispatchTH ''Notification) jNotification jRecipient
instanceID' <- getsYesod $ view instanceID

View File

@ -14,8 +14,8 @@ import qualified Data.HashSet as HashSet
import Text.Hamlet
dispatchJobSendPasswordReset :: UserId
-> Handler ()
dispatchJobSendPasswordReset jRecipient = userMailT jRecipient $ do
-> JobHandler UniWorX
dispatchJobSendPasswordReset jRecipient = JobHandlerException . userMailT jRecipient $ do
cID <- encrypt jRecipient
User{..} <- liftHandler . runDB $ getJust jRecipient

View File

@ -7,8 +7,8 @@ import Import
import Handler.Utils.Mail
import Handler.Utils.DateTime
dispatchJobSendTestEmail :: Email -> MailContext -> Handler ()
dispatchJobSendTestEmail jEmail jMailContext = mailT jMailContext $ do
dispatchJobSendTestEmail :: Email -> MailContext -> JobHandler UniWorX
dispatchJobSendTestEmail jEmail jMailContext = JobHandlerException . mailT jMailContext $ do
_mailTo .= [Address Nothing jEmail]
replaceMailHeader "Auto-Submitted" $ Just "auto-generated"
setSubjectI MsgMailTestSubject

View File

@ -4,8 +4,8 @@ module Jobs.Handler.SetLogSettings
import Import
dispatchJobSetLogSettings :: InstanceId -> LogSettings -> Handler ()
dispatchJobSetLogSettings jInstance jLogSettings = do
dispatchJobSetLogSettings :: InstanceId -> LogSettings -> JobHandler UniWorX
dispatchJobSetLogSettings jInstance jLogSettings = JobHandlerException $ do
instanceId <- getsYesod appInstanceID
unless (instanceId == jInstance) . liftIO $ fail "Incorrect instance"
lSettings <- getsYesod appLogSettings

View File

@ -17,9 +17,9 @@ data SynchroniseLdapException
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable)
instance Exception SynchroniseLdapException
dispatchJobSynchroniseLdap :: Natural -> Natural -> Natural -> Handler ()
dispatchJobSynchroniseLdap :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobSynchroniseLdap numIterations epoch iteration
= runDBJobs . runConduit $
= JobHandlerAtomic . runConduit $
readUsers .| filterIteration .| sinkDBJobs
where
readUsers :: ConduitT () UserId (YesodJobDB UniWorX) ()
@ -36,8 +36,8 @@ dispatchJobSynchroniseLdap numIterations epoch iteration
return $ JobSynchroniseLdapUser userId
dispatchJobSynchroniseLdapUser :: UserId -> Handler ()
dispatchJobSynchroniseLdapUser jUser = do
dispatchJobSynchroniseLdapUser :: UserId -> JobHandler UniWorX
dispatchJobSynchroniseLdapUser jUser = JobHandlerException $ do
UniWorX{ appSettings' = AppSettings{..}, .. } <- getYesod
case appLdapPool of
Just ldapPool ->

View File

@ -8,8 +8,8 @@ import Handler.Utils.DateTime
import Database.Persist.Sql (updateWhereCount, deleteWhereCount)
dispatchJobTruncateTransactionLog, dispatchJobDeleteTransactionLogIPs :: Handler ()
dispatchJobTruncateTransactionLog = do
dispatchJobTruncateTransactionLog, dispatchJobDeleteTransactionLogIPs :: JobHandler UniWorX
dispatchJobTruncateTransactionLog = JobHandlerAtomic . hoist lift $ do
now <- liftIO getCurrentTime
let localNow = utcToLocalTime now
(localCurrentYear, _, _) = toGregorian $ localDay localNow
@ -20,12 +20,12 @@ dispatchJobTruncateTransactionLog = do
LTUUnique utc' _ -> utc'
_other -> startOfPreviousYear
n <- runDB $ deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ]
n <- deleteWhereCount [ TransactionLogTime <. startOfPreviousYear' ]
$logInfoS "TruncateTransactionLog" [st|Deleted #{n} transaction log entries|]
dispatchJobDeleteTransactionLogIPs = do
dispatchJobDeleteTransactionLogIPs = JobHandlerAtomic . hoist lift $ do
now <- liftIO getCurrentTime
retentionTime <- getsYesod $ view _appTransactionLogIPRetentionTime
let cutoff = addUTCTime (- retentionTime) now
n <- runDB $ updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ]
n <- updateWhereCount [ TransactionLogTime <. cutoff ] [ TransactionLogRemote =. Nothing ]
$logInfoS "DeleteTransactionLogIPs" [st|Deleted #{n} IP entries from transaction log|]

View File

@ -2,7 +2,7 @@ module Jobs.Queue
( writeJobCtl, writeJobCtlBlock
, writeJobCtl', writeJobCtlBlock'
, queueJob, queueJob'
, YesodJobDB, JobDB
, JobDB
, runDBJobs, queueDBJob, sinkDBJobs
, runDBJobs'
, queueDBJobCron
@ -108,9 +108,6 @@ queueJob' job = do
app <- getYesod
queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerFor site))
-- | Slightly modified Version of `DB` for `runDBJobs`
type JobDB = YesodJobDB UniWorX

View File

@ -6,6 +6,8 @@ module Jobs.Types
, classifyJob
, JobCtl(..)
, classifyJobCtl
, YesodJobDB
, JobHandler(..), _JobHandlerAtomic, _JobHandlerException
, JobContext(..)
, JobState(..)
, jobWorkerNames
@ -149,13 +151,24 @@ deriveJSON defaultOptions
, sumEncoding = TaggedObject "instruction" "data"
} ''JobCtl
classifyJobCtl :: JobCtl -> String
classifyJobCtl jobctl = unpack tag
where
Aeson.Object obj = Aeson.toJSON jobctl
Aeson.String tag = obj HashMap.! "instruction"
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerFor site))
data JobHandler site
= JobHandlerAtomic (YesodJobDB site ())
| JobHandlerException (HandlerFor site ())
deriving (Generic, Typeable)
makePrisms ''JobHandler
data JobWorkerState
= JobWorkerBusy
| JobWorkerExecJobCtl { jobWorkerJobCtl :: JobCtl }