+ |
+ $case match
+ $of MatchAsap
+ _{MsgCronMatchAsap}
+ $of MatchNone
+ _{MsgCronMatchNone}
+ $of MatchAt t
+ ^{formatTimeW SelFormatDateTime t}
+ |
+
+ #{encodePrettyToTextBuilder job}
+ $nothing
+ _{MsgAdminCrontabNotGenerated}
+ |]
diff --git a/src/Jobs.hs b/src/Jobs.hs
index b917354f0..bdf6b847f 100644
--- a/src/Jobs.hs
+++ b/src/Jobs.hs
@@ -97,6 +97,7 @@ handleJobs foundation@UniWorX{..}
jobCrontab <- liftIO $ newTVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
jobShutdown <- liftIO newEmptyTMVarIO
+ jobCurrentCrontab <- liftIO $ newTVarIO Nothing
atomically $ putTMVar appJobState JobState
{ jobContext = JobContext{..}
, ..
@@ -109,12 +110,12 @@ manageCrontab :: forall m.
=> UniWorX -> (forall a. m a -> m a) -> m ()
manageCrontab foundation@UniWorX{..} unmask = do
ch <- allocateLinkedAsync $ do
- context <- atomically . fmap jobContext $ readTMVar appJobState
+ jState <- atomically $ readTMVar appJobState
liftIO . unsafeHandler foundation . void $ do
atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
runReaderT ?? foundation $
writeJobCtlBlock JobCtlDetermineCrontab
- void $ evalRWST (forever execCrontab) context HashMap.empty
+ void $ evalRWST (forever execCrontab) jState HashMap.empty
let awaitTermination = guardM $
readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
@@ -252,7 +253,7 @@ stopJobCtl UniWorX{appJobState} = do
, jobCron jSt'
] ++ workers
-execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWorX) ()
+execCrontab :: RWST JobState () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWorX) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab = do
@@ -276,7 +277,7 @@ execCrontab = do
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
(currentCrontab, (jobCtl, nextMatch), currentState) <- mapRWST (liftIO . atomically) $ do
- crontab <- liftBase . readTVar =<< asks jobCrontab
+ crontab <- liftBase . readTVar =<< asks (jobCrontab . jobContext)
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
@@ -288,13 +289,16 @@ execCrontab = do
do
lastTimes <- State.get
now <- liftIO getCurrentTime
- $logDebugS "Crontab" . intercalate "\n" . map tshow . sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
+ let currentCrontab' = sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
+ crontabTVar <- asks jobCurrentCrontab
+ atomically . writeTVar crontabTVar $ Just (now, currentCrontab')
+ $logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab'
let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do
newCrontab <- lift $ hoist lift determineCrontab'
when (newCrontab /= currentCrontab) $
mapRWST (liftIO . atomically) $
- liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab
+ liftBase . void . flip swapTVar newCrontab =<< asks (jobCrontab . jobContext)
mergeState
newState <- State.get
@@ -315,11 +319,11 @@ execCrontab = do
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
- JobContext{jobCrontab} <- ask
+ crontab <- asks $ jobCrontab . jobContext
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
- whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
+ whenM (liftIO . flip runLoggingT logFunc $ waitUntil crontab currentCrontab nextTime')
doJob
where
diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs
index 7b36c2801..a7b56be8d 100644
--- a/src/Jobs/Types.hs
+++ b/src/Jobs/Types.hs
@@ -17,6 +17,7 @@ module Jobs.Types
, showWorkerId, newWorkerId
, JobQueue, jqInsert, jqDequeue
, JobPriority(..), prioritiseJob
+ , module Cron
) where
import Import.NoFoundation hiding (Unique, state)
@@ -37,6 +38,8 @@ import Utils.Metrics (withJobWorkerStateLbls)
import qualified Prometheus (Label4)
+import Cron (CronNextMatch(..), _MatchAsap, _MatchAt, _MatchNone)
+
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@@ -253,6 +256,7 @@ data JobState = JobState
, jobPoolManager :: Async ()
, jobCron :: Async ()
, jobShutdown :: TMVar ()
+ , jobCurrentCrontab :: TVar (Maybe (UTCTime, [(CronNextMatch UTCTime, JobCtl)]))
}
jobWorkerNames :: JobState -> Set JobWorkerId
From 1be971677b25689a895734d9efa5898fcbf0ca08 Mon Sep 17 00:00:00 2001
From: Gregor Kleen
Date: Mon, 3 Aug 2020 13:52:37 +0200
Subject: [PATCH 15/17] fix(jobs): queue certain jobs at most once
---
config/settings.yml | 2 +-
src/Database/Esqueleto/Utils.hs | 2 ++
src/Handler/Admin/Test.hs | 8 +++----
src/Handler/Utils/Exam.hs | 4 ++--
src/Jobs/Queue.hs | 38 +++++++++++++++++++--------------
src/Jobs/Types.hs | 14 ++++++++++++
6 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/config/settings.yml b/config/settings.yml
index aea998f4b..5a120e906 100644
--- a/config/settings.yml
+++ b/config/settings.yml
@@ -35,7 +35,7 @@ bearer-expiration: 604800
bearer-encoding: HS256
maximum-content-length: "_env:MAX_UPLOAD_SIZE:134217728"
session-files-expire: 3600
-prune-unreferenced-files: 600
+prune-unreferenced-files: 28800
keep-unreferenced-files: 86400
health-check-interval:
matching-cluster-config: "_env:HEALTHCHECK_INTERVAL_MATCHING_CLUSTER_CONFIG:600"
diff --git a/src/Database/Esqueleto/Utils.hs b/src/Database/Esqueleto/Utils.hs
index 36c33d573..474fe9fe9 100644
--- a/src/Database/Esqueleto/Utils.hs
+++ b/src/Database/Esqueleto/Utils.hs
@@ -298,6 +298,8 @@ instance (PersistEntity val, PersistField typ) => SqlProject val typ (Maybe (E.E
sqlProject = (E.?.)
unSqlProject _ _ = Just
+infixl 8 ->.
+
(->.) :: E.SqlExpr (E.Value a) -> Text -> E.SqlExpr (E.Value b)
(->.) expr t = E.unsafeSqlBinOp "->" expr $ E.val t
diff --git a/src/Handler/Admin/Test.hs b/src/Handler/Admin/Test.hs
index c31fd691b..645152b0e 100644
--- a/src/Handler/Admin/Test.hs
+++ b/src/Handler/Admin/Test.hs
@@ -86,11 +86,9 @@ postAdminTestR = do
((emailResult, emailWidget), emailEnctype) <- runFormPost . identifyForm ("email" :: Text) $ renderAForm FormStandard emailTestForm
formResultModal emailResult AdminTestR $ \(email, ls) -> do
- jId <- mapWriterT runDB $ do
- jId <- queueJob $ JobSendTestEmail email ls
- tell . pure $ Message Success [shamlet|Email-test gestartet (Job ##{tshow (fromSqlKey jId)})|] (Just IconEmail)
- return jId
- runReaderT (writeJobCtl $ JobCtlPerform jId) =<< getYesod
+ mapWriterT runDBJobs $ do
+ lift . queueDBJob $ JobSendTestEmail email ls
+ tell . pure $ Message Success [shamlet|Email-test gestartet|] (Just IconEmail)
addMessage Warning [shamlet|Inkorrekt ausgegebener Alert|] -- For testing alert handling when short circuiting; for proper (not fallback-solution) handling always use `tell` within `formResultModal`
let emailWidget' = wrapForm emailWidget def
diff --git a/src/Handler/Utils/Exam.hs b/src/Handler/Utils/Exam.hs
index 8ba5f5584..cc8405762 100644
--- a/src/Handler/Utils/Exam.hs
+++ b/src/Handler/Utils/Exam.hs
@@ -1,4 +1,4 @@
-{-# OPTIONS_GHC -fno-warn-deprecations -fno-warn-incomplete-uni-patterns #-}
+{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
module Handler.Utils.Exam
( fetchExamAux
@@ -519,7 +519,7 @@ examAutoOccurrence (hash -> seed) rule ExamAutoOccurrenceConfig{..} occurrences
)
postprocess result = (resultAscList, resultUsers)
where
- resultAscList = pad . Map.fromListWith Set.union . accRes (pure <$> Set.lookupMin rangeAlphabet) $ (\r -> traceShow (over (traverse . _2 . traverse . traverse) CI.original r) r) result
+ resultAscList = pad . Map.fromListWith Set.union $ accRes (pure <$> Set.lookupMin rangeAlphabet) result
where
accRes _ [] = []
accRes prevEnd ((occA, nsA) : (occB, nsB) : xs)
diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs
index 045649ed1..18c85be59 100644
--- a/src/Jobs/Queue.hs
+++ b/src/Jobs/Queue.hs
@@ -80,22 +80,28 @@ writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
-queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX QueuedJobId
+queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId)
queueJobUnsafe queuedJobWriteLastExec job = do
$logInfoS "queueJob" $ tshow job
- queuedJobCreationTime <- liftIO getCurrentTime
- queuedJobCreationInstance <- getsYesod appInstanceID
- insert QueuedJob
- { queuedJobContent = toJSON job
- , queuedJobLockInstance = Nothing
- , queuedJobLockTime = Nothing
- , ..
- }
- -- We should not immediately notify a worker; instead wait for the transaction to finish first
- -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
- -- return jId
+
+ doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ]
+
+ if
+ | doQueue -> Just <$> do
+ queuedJobCreationTime <- liftIO getCurrentTime
+ queuedJobCreationInstance <- getsYesod appInstanceID
+ insert QueuedJob
+ { queuedJobContent = toJSON job
+ , queuedJobLockInstance = Nothing
+ , queuedJobLockTime = Nothing
+ , ..
+ }
+ -- We should not immediately notify a worker; instead wait for the transaction to finish first
+ -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
+ -- return jId
+ | otherwise -> return Nothing
-queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
+queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m (Maybe QueuedJobId)
-- ^ Queue a job for later execution
--
-- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`)
@@ -105,15 +111,15 @@ queueJob' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
queueJob' job = do
app <- getYesod
- queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
+ queueJob job >>= maybe (return ()) (flip runReaderT app . writeJobCtl . JobCtlPerform)
-- | Slightly modified Version of `DB` for `runDBJobs`
type JobDB = YesodJobDB UniWorX
queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX ()
-- | Queue a job as part of a database transaction and execute it after the transaction succeeds
-queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . Set.singleton
-queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . Set.singleton
+queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . maybe Set.empty Set.singleton
+queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . maybe Set.empty Set.singleton
sinkDBJobs :: ConduitT Job Void (YesodJobDB UniWorX) ()
-- | Queue many jobs as part of a database transaction and execute them after the transaction passes
diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs
index a7b56be8d..11fe8b12e 100644
--- a/src/Jobs/Types.hs
+++ b/src/Jobs/Types.hs
@@ -17,6 +17,7 @@ module Jobs.Types
, showWorkerId, newWorkerId
, JobQueue, jqInsert, jqDequeue
, JobPriority(..), prioritiseJob
+ , jobNoQueueSame
, module Cron
) where
@@ -235,6 +236,19 @@ prioritiseJob (JobCtlGenerateHealthReport _) = JobPrioRealtime
prioritiseJob JobCtlDetermineCrontab = JobPrioRealtime
prioritiseJob _ = JobPrioBatch
+jobNoQueueSame :: Job -> Bool
+jobNoQueueSame = \case
+ JobSendPasswordReset{} -> True
+ JobTruncateTransactionLog{} -> True
+ JobPruneInvitations{} -> True
+ JobDeleteTransactionLogIPs{} -> True
+ JobSynchroniseLdapUser{} -> True
+ JobChangeUserDisplayEmail{} -> True
+ JobPruneSessionFiles{} -> True
+ JobPruneUnreferencedFiles{} -> True
+ JobInjectFiles{} -> True
+ _ -> False
+
newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue JobPriority JobCtl }
deriving (Eq, Ord, Read, Show)
From a9cdfcc7e14440b24b43bf1ff26a94168ddb29ba Mon Sep 17 00:00:00 2001
From: Gregor Kleen
Date: Mon, 3 Aug 2020 14:16:03 +0200
Subject: [PATCH 16/17] refactor: hlint
---
src/Handler/Admin/Crontab.hs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Handler/Admin/Crontab.hs b/src/Handler/Admin/Crontab.hs
index 52c10eb8c..bc8a3097f 100644
--- a/src/Handler/Admin/Crontab.hs
+++ b/src/Handler/Admin/Crontab.hs
@@ -16,7 +16,7 @@ getAdminCrontabR = do
JobState{jobCurrentCrontab} <- MaybeT $ tryReadTMVar jState
MaybeT $ readTVar jobCurrentCrontab
- let mCrontab = mCrontab' & mapped . _2 %~ filter (hasn't $ _1 . _MatchNone)
+ let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _1 . _MatchNone)
siteLayoutMsg MsgMenuAdminCrontab $ do
setTitleI MsgMenuAdminCrontab
From 1d956a5fdc1cfbf57423925e34cd056cec2f6f9d Mon Sep 17 00:00:00 2001
From: Gregor Kleen
Date: Mon, 3 Aug 2020 14:21:36 +0200
Subject: [PATCH 17/17] chore(release): 18.5.0
---
CHANGELOG.md | 14 ++++++++++++++
package-lock.json | 2 +-
package.json | 2 +-
package.yaml | 2 +-
4 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 36b77fd11..eb67e3710 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,20 @@
All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.
+## [18.5.0](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/compare/v18.4.0...v18.5.0) (2020-08-03)
+
+
+### Bug Fixes
+
+* **jobs:** queue certain jobs at most once ([1be9716](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/commit/1be9716))
+
+
+### Features
+
+* admin-crontab-r ([460c133](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/commit/460c133))
+
+
+
## [18.4.0](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/compare/v18.3.0...v18.4.0) (2020-08-02)
diff --git a/package-lock.json b/package-lock.json
index 6357927a7..97223ff05 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,6 +1,6 @@
{
"name": "uni2work",
- "version": "18.4.0",
+ "version": "18.5.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
diff --git a/package.json b/package.json
index 91e5e1937..c5e99919c 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "uni2work",
- "version": "18.4.0",
+ "version": "18.5.0",
"description": "",
"keywords": [],
"author": "",
diff --git a/package.yaml b/package.yaml
index ec34e6c40..182eddbf9 100644
--- a/package.yaml
+++ b/package.yaml
@@ -1,5 +1,5 @@
name: uniworx
-version: 18.4.0
+version: 18.5.0
dependencies:
- base
|