fix(jobs): queue certain jobs at most once

This commit is contained in:
Gregor Kleen 2020-08-03 13:52:37 +02:00
parent 460c133aac
commit 1be971677b
6 changed files with 44 additions and 24 deletions

View File

@ -35,7 +35,7 @@ bearer-expiration: 604800
bearer-encoding: HS256
maximum-content-length: "_env:MAX_UPLOAD_SIZE:134217728"
session-files-expire: 3600
prune-unreferenced-files: 600
prune-unreferenced-files: 28800
keep-unreferenced-files: 86400
health-check-interval:
matching-cluster-config: "_env:HEALTHCHECK_INTERVAL_MATCHING_CLUSTER_CONFIG:600"

View File

@ -298,6 +298,8 @@ instance (PersistEntity val, PersistField typ) => SqlProject val typ (Maybe (E.E
sqlProject = (E.?.)
unSqlProject _ _ = Just
infixl 8 ->.
(->.) :: E.SqlExpr (E.Value a) -> Text -> E.SqlExpr (E.Value b)
(->.) expr t = E.unsafeSqlBinOp "->" expr $ E.val t

View File

@ -86,11 +86,9 @@ postAdminTestR = do
((emailResult, emailWidget), emailEnctype) <- runFormPost . identifyForm ("email" :: Text) $ renderAForm FormStandard emailTestForm
formResultModal emailResult AdminTestR $ \(email, ls) -> do
jId <- mapWriterT runDB $ do
jId <- queueJob $ JobSendTestEmail email ls
tell . pure $ Message Success [shamlet|Email-test gestartet (Job ##{tshow (fromSqlKey jId)})|] (Just IconEmail)
return jId
runReaderT (writeJobCtl $ JobCtlPerform jId) =<< getYesod
mapWriterT runDBJobs $ do
lift . queueDBJob $ JobSendTestEmail email ls
tell . pure $ Message Success [shamlet|Email-test gestartet|] (Just IconEmail)
addMessage Warning [shamlet|Inkorrekt ausgegebener Alert|] -- For testing alert handling when short circuiting; for proper (not fallback-solution) handling always use `tell` within `formResultModal`
let emailWidget' = wrapForm emailWidget def

View File

@ -1,4 +1,4 @@
{-# OPTIONS_GHC -fno-warn-deprecations -fno-warn-incomplete-uni-patterns #-}
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
module Handler.Utils.Exam
( fetchExamAux
@ -519,7 +519,7 @@ examAutoOccurrence (hash -> seed) rule ExamAutoOccurrenceConfig{..} occurrences
)
postprocess result = (resultAscList, resultUsers)
where
resultAscList = pad . Map.fromListWith Set.union . accRes (pure <$> Set.lookupMin rangeAlphabet) $ (\r -> traceShow (over (traverse . _2 . traverse . traverse) CI.original r) r) result
resultAscList = pad . Map.fromListWith Set.union $ accRes (pure <$> Set.lookupMin rangeAlphabet) result
where
accRes _ [] = []
accRes prevEnd ((occA, nsA) : (occB, nsB) : xs)

View File

@ -80,22 +80,28 @@ writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId)
queueJobUnsafe queuedJobWriteLastExec job = do
$logInfoS "queueJob" $ tshow job
queuedJobCreationTime <- liftIO getCurrentTime
queuedJobCreationInstance <- getsYesod appInstanceID
insert QueuedJob
{ queuedJobContent = toJSON job
, queuedJobLockInstance = Nothing
, queuedJobLockTime = Nothing
, ..
}
-- We should not immediately notify a worker; instead wait for the transaction to finish first
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
-- return jId
doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ]
if
| doQueue -> Just <$> do
queuedJobCreationTime <- liftIO getCurrentTime
queuedJobCreationInstance <- getsYesod appInstanceID
insert QueuedJob
{ queuedJobContent = toJSON job
, queuedJobLockInstance = Nothing
, queuedJobLockTime = Nothing
, ..
}
-- We should not immediately notify a worker; instead wait for the transaction to finish first
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
-- return jId
| otherwise -> return Nothing
queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m (Maybe QueuedJobId)
-- ^ Queue a job for later execution
--
-- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`)
@ -105,15 +111,15 @@ queueJob' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
queueJob' job = do
app <- getYesod
queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
queueJob job >>= maybe (return ()) (flip runReaderT app . writeJobCtl . JobCtlPerform)
-- | Slightly modified Version of `DB` for `runDBJobs`
type JobDB = YesodJobDB UniWorX
queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX ()
-- | Queue a job as part of a database transaction and execute it after the transaction succeeds
queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . Set.singleton
queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . Set.singleton
queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . maybe Set.empty Set.singleton
queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . maybe Set.empty Set.singleton
sinkDBJobs :: ConduitT Job Void (YesodJobDB UniWorX) ()
-- | Queue many jobs as part of a database transaction and execute them after the transaction passes

View File

@ -17,6 +17,7 @@ module Jobs.Types
, showWorkerId, newWorkerId
, JobQueue, jqInsert, jqDequeue
, JobPriority(..), prioritiseJob
, jobNoQueueSame
, module Cron
) where
@ -235,6 +236,19 @@ prioritiseJob (JobCtlGenerateHealthReport _) = JobPrioRealtime
prioritiseJob JobCtlDetermineCrontab = JobPrioRealtime
prioritiseJob _ = JobPrioBatch
jobNoQueueSame :: Job -> Bool
jobNoQueueSame = \case
JobSendPasswordReset{} -> True
JobTruncateTransactionLog{} -> True
JobPruneInvitations{} -> True
JobDeleteTransactionLogIPs{} -> True
JobSynchroniseLdapUser{} -> True
JobChangeUserDisplayEmail{} -> True
JobPruneSessionFiles{} -> True
JobPruneUnreferencedFiles{} -> True
JobInjectFiles{} -> True
_ -> False
newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue JobPriority JobCtl }
deriving (Eq, Ord, Read, Show)