diff --git a/src/Handler/Admin/Crontab.hs b/src/Handler/Admin/Crontab.hs index daff2070f..18225b5cb 100644 --- a/src/Handler/Admin/Crontab.hs +++ b/src/Handler/Admin/Crontab.hs @@ -17,6 +17,8 @@ import qualified Data.Text.Lazy.Builder as Text.Builder import qualified Data.HashSet as HashSet import qualified Data.HashMap.Strict as HashMap +import qualified Data.UUID as UUID + deriveJSON defaultOptions { constructorTagModifier = camelToPathPiece' 1 @@ -32,9 +34,12 @@ getAdminCrontabR = do let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _3 . _MatchNone) + instanceId <- getsYesod appInstanceID + selectRep $ do provideRep $ do crontabBearer <- runMaybeT . hoist runDB $ do + guardM $ hasGlobalGetParam GetGenerateToken uid <- MaybeT maybeAuthId guardM . lift . existsBy $ UniqueUserGroupMember UserGroupCrontab uid @@ -49,6 +54,10 @@ getAdminCrontabR = do
                 #{toPathPiece t}
+          
+
+
_{MsgInstanceId} +
#{UUID.toText instanceId}
$maybe (genTime, crontab) <- mCrontab

diff --git a/src/Handler/Metrics.hs b/src/Handler/Metrics.hs index e1e9a9b01..6cadaa2a0 100644 --- a/src/Handler/Metrics.hs +++ b/src/Handler/Metrics.hs @@ -26,6 +26,7 @@ getMetricsR = selectRep $ do samples <- sortBy metricSort <$> collectMetrics metricsBearer <- runMaybeT . hoist runDB $ do + guardM $ hasGlobalGetParam GetGenerateToken uid <- MaybeT maybeAuthId guardM . lift . existsBy $ UniqueUserGroupMember UserGroupMetrics uid diff --git a/src/Jobs.hs b/src/Jobs.hs index 4aca345e8..183df06e0 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -10,6 +10,7 @@ module Jobs import Import hiding (StateT) import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Queue +import Jobs.Offload import Jobs.Crontab import qualified Data.Conduit.List as C @@ -105,6 +106,7 @@ handleJobs foundation@UniWorX{..} jobShutdown <- liftIO newEmptyTMVarIO jobCurrentCrontab <- liftIO $ newTVarIO Nothing jobHeldLocks <- liftIO $ newTVarIO Set.empty + jobOffload <- liftIO newEmptyTMVarIO registerJobHeldLocksCount jobHeldLocks registerJobWorkerQueueDepth appJobState atomically $ putTMVar appJobState JobState @@ -155,7 +157,9 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> atomically . asum $ [ spawnMissingWorkers , reapDeadWorkers - ] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo ++ + ] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo + ++ maybeToList (manageOffloadHandler <$> mkJobOffloadHandler (appDatabaseConf appSettings') (appJobMode appSettings')) + ++ [ terminateGracefully terminate' ] where @@ -286,6 +290,27 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|] + manageOffloadHandler :: (ReaderT UniWorX m JobOffloadHandler) -> STM (ContT () m ()) + manageOffloadHandler spawn = do + shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + guard $ not shouldTerminate' + + JobContext{jobOffload} <- jobContext <$> readTMVar appJobState + cOffload <- tryReadTMVar jobOffload + + let respawn = do + nOffload <- lift $ runReaderT spawn foundation + atomically $ do + putTMVar jobOffload nOffload + whenIsJust cOffload $ \pOffload -> do + pOutgoing <- readTVar $ jobOffloadOutgoing pOffload + modifyTVar (jobOffloadOutgoing nOffload) (pOutgoing <>) + + respawn <$ case cOffload of + Nothing -> return () + Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler + + stopJobCtl :: MonadUnliftIO m => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobState} = do @@ -471,46 +496,55 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker $logDebugS logIdent "JobCtlQueue..." lift $ queueJob' job $logInfoS logIdent "JobCtlQueue" - handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do - content <- case fromJSON queuedJobContent of - Aeson.Success c -> return c - Aeson.Error t -> do - $logErrorS logIdent $ "Aeson decoding error: " <> pack t - throwM $ JInvalid jId j - - $logInfoS logIdent $ tshow content - $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content + handleCmd (JobCtlPerform jId) = do + jMode <- getsYesod $ view _appJobMode + case jMode of + JobsLocal{} -> performLocal + JobsOffload -> performOffload + where + performOffload = hoist atomically $ do + JobOffloadHandler{..} <- lift . readTMVar =<< asks jobOffload + lift $ modifyTVar jobOffloadOutgoing (`snoc` jId) + performLocal = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do + content <- case fromJSON queuedJobContent of + Aeson.Success c -> return c + Aeson.Error t -> do + $logErrorS logIdent $ "Aeson decoding error: " <> pack t + throwM $ JInvalid jId j - instanceID' <- getsYesod $ view instanceID - now <- liftIO getCurrentTime + $logInfoS logIdent $ tshow content + $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content - let cleanup = do - when queuedJobWriteLastExec $ - void $ upsertBy - (UniqueCronLastExec queuedJobContent) - CronLastExec - { cronLastExecJob = queuedJobContent - , cronLastExecTime = now - , cronLastExecInstance = instanceID' - } - [ CronLastExecTime =. now - , CronLastExecInstance =. instanceID' - ] - delete jId + instanceID' <- getsYesod $ view instanceID + now <- liftIO getCurrentTime - case performJob content of - JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do - act & withJobWorkerState wNum (JobWorkerExecJob content) - hoist lift cleanup - JobHandlerException act -> do - act & withJobWorkerState wNum (JobWorkerExecJob content) - runDB $ setSerializableBatch cleanup - JobHandlerAtomicWithFinalizer act fin -> do - res <- runDBJobs . setSerializableBatch $ do - res <- act & withJobWorkerState wNum (JobWorkerExecJob content) - hoist lift cleanup - return res - fin res + let cleanup = do + when queuedJobWriteLastExec $ + void $ upsertBy + (UniqueCronLastExec queuedJobContent) + CronLastExec + { cronLastExecJob = queuedJobContent + , cronLastExecTime = now + , cronLastExecInstance = instanceID' + } + [ CronLastExecTime =. now + , CronLastExecInstance =. instanceID' + ] + delete jId + + case performJob content of + JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do + act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + JobHandlerException act -> do + act & withJobWorkerState wNum (JobWorkerExecJob content) + runDB $ setSerializableBatch cleanup + JobHandlerAtomicWithFinalizer act fin -> do + res <- runDBJobs . setSerializableBatch $ do + res <- act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + return res + fin res handleCmd JobCtlDetermineCrontab = do $logDebugS logIdent "DetermineCrontab..." newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab' diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index e24cba39b..a7b87a1d3 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -27,17 +27,6 @@ determineCrontab :: DB (Crontab JobCtl) determineCrontab = execWriterT $ do UniWorX{ appSettings' = AppSettings{..} } <- getYesod - case appJobFlushInterval of - Just interval -> tell $ HashMap.singleton - JobCtlFlush - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = interval - , cronNotAfter = Right CronNotScheduled - } - Nothing -> return () - whenIsJust appJobCronInterval $ \interval -> tell $ HashMap.singleton JobCtlDetermineCrontab @@ -48,77 +37,6 @@ determineCrontab = execWriterT $ do , cronNotAfter = Right CronNotScheduled } - oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1] - whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton - (JobCtlQueue JobPruneInvitations) - Cron - { cronInitial = CronTimestamp $ utcToLocalTime oldestInvUTC - , cronRepeat = CronRepeatOnChange - , cronRateLimit = nominalDay - , cronNotAfter = Right CronNotScheduled - } - - oldestSessionFile <- lift $ preview (_head . _entityVal . _sessionFileTouched) <$> selectList [] [Asc SessionFileTouched, LimitTo 1] - whenIsJust oldestSessionFile $ \oldest -> tell $ HashMap.singleton - (JobCtlQueue JobPruneSessionFiles) - Cron - { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appSessionFilesExpire oldest - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appSessionFilesExpire / 2 - , cronNotAfter = Right CronNotScheduled - } - - oldestFallbackPersonalisedSheetFilesKey <- lift $ preview (_head . _entityVal . _fallbackPersonalisedSheetFilesKeyGenerated) <$> selectList [] [Asc FallbackPersonalisedSheetFilesKeyGenerated, LimitTo 1] - whenIsJust oldestFallbackPersonalisedSheetFilesKey $ \oldest -> tell $ HashMap.singleton - (JobCtlQueue JobPruneFallbackPersonalisedSheetFilesKeys) - Cron - { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appFallbackPersonalisedSheetFilesKeysExpire oldest - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appFallbackPersonalisedSheetFilesKeysExpire / 2 - , cronNotAfter = Right CronNotScheduled - } - - oldestSentMail <- lift $ preview (_head . _entityVal . _sentMailSentAt) <$> selectList [] [Asc SentMailSentAt, LimitTo 1] - whenIsJust ((,) <$> appMailRetainSent <*> oldestSentMail) $ \(retain, oldest) -> tell $ HashMap.singleton - (JobCtlQueue JobPruneOldSentMails) - Cron - { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime retain oldest - , cronRepeat = CronRepeatOnChange - , cronRateLimit = retain / 2 - , cronNotAfter = Right CronNotScheduled - } - - - whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval -> - tell $ HashMap.singleton - (JobCtlQueue JobInjectFiles) - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = iInterval - , cronNotAfter = Right CronNotScheduled - } - - whenIsJust appRechunkFiles $ \rInterval -> - tell $ HashMap.singleton - (JobCtlQueue JobRechunkFiles) - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = rInterval - , cronNotAfter = Right CronNotScheduled - } - - whenIsJust appCheckMissingFiles $ \rInterval -> - tell $ HashMap.singleton - (JobCtlQueue JobDetectMissingFiles) - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = rInterval - , cronNotAfter = Right CronNotScheduled - } - tell . flip foldMap universeF $ \kind -> case appHealthCheckInterval kind of Just int -> HashMap.singleton @@ -131,354 +49,437 @@ determineCrontab = execWriterT $ do } Nothing -> mempty - let newyear = cronCalendarAny - { cronDayOfYear = cronMatchOne 1 - } - in tell $ HashMap.singleton - (JobCtlQueue JobTruncateTransactionLog) + when (is _JobsLocal appJobMode) $ do + case appJobFlushInterval of + Just interval -> tell $ HashMap.singleton + JobCtlFlush Cron - { cronInitial = newyear - , cronRepeat = CronRepeatScheduled newyear - , cronRateLimit = minNominalYear + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = interval , cronNotAfter = Right CronNotScheduled } + Nothing -> return () - oldestLogEntry <- fmap listToMaybe . lift . E.select . E.from $ \transactionLog -> do - E.where_ . E.not_ . E.isNothing $ transactionLog E.^. TransactionLogRemote - E.orderBy [E.asc $ transactionLog E.^. TransactionLogTime] - E.limit 1 - return $ transactionLog E.^. TransactionLogTime - for_ oldestLogEntry $ \(E.Value oldestEntry) -> - tell $ HashMap.singleton - (JobCtlQueue JobDeleteTransactionLogIPs) + oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1] + whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton + (JobCtlQueue JobPruneInvitations) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appTransactionLogIPRetentionTime oldestEntry + { cronInitial = CronTimestamp $ utcToLocalTime oldestInvUTC , cronRepeat = CronRepeatOnChange , cronRateLimit = nominalDay , cronNotAfter = Right CronNotScheduled } - let - getNextIntervals within interval cInterval = do - now <- liftIO getPOSIXTime - return $ do - let - epochInterval = within / 2 - (currEpoch, epochNow) = now `divMod'` epochInterval - currInterval = epochNow `div'` interval - numIntervals = max 1 . floor $ epochInterval / interval - n = ceiling $ 4 * cInterval / interval - i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ] - let - ((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals - nextIntervalTime - = posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval - return (nextEpoch, nextInterval, nextIntervalTime, numIntervals) - - if - | is _Just appLdapConf - , Just syncWithin <- appSynchroniseLdapUsersWithin - , Just cInterval <- appJobCronInterval - -> do - nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval - - forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do - tell $ HashMap.singleton - (JobCtlQueue JobSynchroniseLdap - { jEpoch = fromInteger nextEpoch - , jNumIterations = fromInteger numIntervals - , jIteration = fromInteger nextInterval - }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime - , cronRepeat = CronRepeatNever - , cronRateLimit = appSynchroniseLdapUsersInterval - , cronNotAfter = Left syncWithin - } - | otherwise - -> return () - - whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do - nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval - forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do - tell $ HashMap.singleton - (JobCtlQueue JobPruneUnreferencedFiles - { jEpoch = fromInteger nextEpoch - , jNumIterations = fromInteger numIntervals - , jIteration = fromInteger nextInterval - } - ) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime - , cronRepeat = CronRepeatNever - , cronRateLimit = appPruneUnreferencedFilesInterval - , cronNotAfter = Left within - } - - let - sheetJobs (Entity nSheet Sheet{..}) = do - for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo - } - for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do - guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom - guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) - (fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]) - guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet] - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetHint{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo - } - for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do - guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom - guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left nominalDay - } - for_ sheetActiveTo $ \aTo -> do - whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo' - , cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo - } - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - when sheetAutoDistribute $ - tell $ HashMap.singleton - (JobCtlQueue $ JobDistributeCorrections nSheet) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo - , cronRepeat = CronRepeatNever - , cronRateLimit = 3600 -- Irrelevant due to `cronRepeat` - , cronNotAfter = Left nominalDay - } - - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs - - let - correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB () - correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } ) + oldestSessionFile <- lift $ preview (_head . _entityVal . _sessionFileTouched) <$> selectList [] [Asc SessionFileTouched, LimitTo 1] + whenIsJust oldestSessionFile $ \oldest -> tell $ HashMap.singleton + (JobCtlQueue JobPruneSessionFiles) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay time - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration + { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appSessionFilesExpire oldest + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appSessionFilesExpire / 2 + , cronNotAfter = Right CronNotScheduled } - submissionsByCorrector :: Entity Submission -> Map (UserId, SheetId) (Max UTCTime) - submissionsByCorrector (Entity _ sub) - | Just ratingBy <- submissionRatingBy sub - , Just assigned <- submissionRatingAssigned sub - , not $ submissionRatingDone sub - = Map.singleton (ratingBy, submissionSheet sub) $ Max assigned - | otherwise - = Map.empty + oldestFallbackPersonalisedSheetFilesKey <- lift $ preview (_head . _entityVal . _fallbackPersonalisedSheetFilesKeyGenerated) <$> selectList [] [Asc FallbackPersonalisedSheetFilesKeyGenerated, LimitTo 1] + whenIsJust oldestFallbackPersonalisedSheetFilesKey $ \oldest -> tell $ HashMap.singleton + (JobCtlQueue JobPruneFallbackPersonalisedSheetFilesKeys) + Cron + { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appFallbackPersonalisedSheetFilesKeysExpire oldest + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appFallbackPersonalisedSheetFilesKeysExpire / 2 + , cronNotAfter = Right CronNotScheduled + } - collateSubmissionsByCorrector acc entity = Map.unionWith (<>) acc $ submissionsByCorrector entity - correctorNotifications <=< runConduit $ - transPipe lift ( selectSource [ SubmissionRatingBy !=. Nothing, SubmissionRatingAssigned !=. Nothing ] [] - ) - .| C.fold collateSubmissionsByCorrector Map.empty + oldestSentMail <- lift $ preview (_head . _entityVal . _sentMailSentAt) <$> selectList [] [Asc SentMailSentAt, LimitTo 1] + whenIsJust ((,) <$> appMailRetainSent <*> oldestSentMail) $ \(retain, oldest) -> tell $ HashMap.singleton + (JobCtlQueue JobPruneOldSentMails) + Cron + { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime retain oldest + , cronRepeat = CronRepeatOnChange + , cronRateLimit = retain / 2 + , cronNotAfter = Right CronNotScheduled + } - let - examSelect = E.selectSource . E.from $ \(exam `E.InnerJoin` course `E.InnerJoin` school) -> do - E.on $ school E.^. SchoolId E.==. course E.^. CourseSchool - E.on $ course E.^. CourseId E.==. exam E.^. ExamCourse - return (exam, course, school) - examJobs (Entity nExam Exam{..}, _, Entity _ School{..}) = do - newestResult <- lift . E.select . E.from $ \examResult -> do - E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam - return . E.max_ $ examResult E.^. ExamResultLastChanged + whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobInjectFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = iInterval + , cronNotAfter = Right CronNotScheduled + } - whenIsJust examVisibleFrom $ \visibleFrom -> do - case over (mapped . _Value) ((max `on` NTop) examFinished) newestResult of - [E.Value (NTop (Just ts))] -> + whenIsJust appRechunkFiles $ \rInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobRechunkFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = rInterval + , cronNotAfter = Right CronNotScheduled + } + + whenIsJust appCheckMissingFiles $ \rInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobDetectMissingFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = rInterval + , cronNotAfter = Right CronNotScheduled + } + + let newyear = cronCalendarAny + { cronDayOfYear = cronMatchOne 1 + } + in tell $ HashMap.singleton + (JobCtlQueue JobTruncateTransactionLog) + Cron + { cronInitial = newyear + , cronRepeat = CronRepeatScheduled newyear + , cronRateLimit = minNominalYear + , cronNotAfter = Right CronNotScheduled + } + + oldestLogEntry <- fmap listToMaybe . lift . E.select . E.from $ \transactionLog -> do + E.where_ . E.not_ . E.isNothing $ transactionLog E.^. TransactionLogRemote + E.orderBy [E.asc $ transactionLog E.^. TransactionLogTime] + E.limit 1 + return $ transactionLog E.^. TransactionLogTime + for_ oldestLogEntry $ \(E.Value oldestEntry) -> + tell $ HashMap.singleton + (JobCtlQueue JobDeleteTransactionLogIPs) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appTransactionLogIPRetentionTime oldestEntry + , cronRepeat = CronRepeatOnChange + , cronRateLimit = nominalDay + , cronNotAfter = Right CronNotScheduled + } + + let + getNextIntervals within interval cInterval = do + now <- liftIO getPOSIXTime + return $ do + let + epochInterval = within / 2 + (currEpoch, epochNow) = now `divMod'` epochInterval + currInterval = epochNow `div'` interval + numIntervals = max 1 . floor $ epochInterval / interval + n = ceiling $ 4 * cInterval / interval + i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ] + let + ((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals + nextIntervalTime + = posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval + return (nextEpoch, nextInterval, nextIntervalTime, numIntervals) + + if + | is _Just appLdapConf + , Just syncWithin <- appSynchroniseLdapUsersWithin + , Just cInterval <- appJobCronInterval + -> do + nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval + + forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamResult{..}) + (JobCtlQueue JobSynchroniseLdap + { jEpoch = fromInteger nextEpoch + , jNumIterations = fromInteger numIntervals + , jIteration = fromInteger nextInterval + }) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom ts - , cronRepeat = CronRepeatOnChange + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime + , cronRepeat = CronRepeatNever + , cronRateLimit = appSynchroniseLdapUsersInterval + , cronNotAfter = Left syncWithin + } + | otherwise + -> return () + + whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do + nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval + forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do + tell $ HashMap.singleton + (JobCtlQueue JobPruneUnreferencedFiles + { jEpoch = fromInteger nextEpoch + , jNumIterations = fromInteger numIntervals + , jIteration = fromInteger nextInterval + } + ) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime + , cronRepeat = CronRepeatNever + , cronRateLimit = appPruneUnreferencedFilesInterval + , cronNotAfter = Left within + } + + let + sheetJobs (Entity nSheet Sheet{..}) = do + for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo + } + for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do + guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom + guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) + (fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]) + guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet] + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetHint{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo + } + for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do + guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom + guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left nominalDay + } + for_ sheetActiveTo $ \aTo -> do + whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo' + , cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationExpiration $ max visibleFrom ts + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo + } + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + when sheetAutoDistribute $ + tell $ HashMap.singleton + (JobCtlQueue $ JobDistributeCorrections nSheet) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo + , cronRepeat = CronRepeatNever + , cronRateLimit = 3600 -- Irrelevant due to `cronRepeat` + , cronNotAfter = Left nominalDay + } + + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs + + let + correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB () + correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } ) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay time + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + + submissionsByCorrector :: Entity Submission -> Map (UserId, SheetId) (Max UTCTime) + submissionsByCorrector (Entity _ sub) + | Just ratingBy <- submissionRatingBy sub + , Just assigned <- submissionRatingAssigned sub + , not $ submissionRatingDone sub + = Map.singleton (ratingBy, submissionSheet sub) $ Max assigned + | otherwise + = Map.empty + + collateSubmissionsByCorrector acc entity = Map.unionWith (<>) acc $ submissionsByCorrector entity + correctorNotifications <=< runConduit $ + transPipe lift ( selectSource [ SubmissionRatingBy !=. Nothing, SubmissionRatingAssigned !=. Nothing ] [] + ) + .| C.fold collateSubmissionsByCorrector Map.empty + + + let + examSelect = E.selectSource . E.from $ \(exam `E.InnerJoin` course `E.InnerJoin` school) -> do + E.on $ school E.^. SchoolId E.==. course E.^. CourseSchool + E.on $ course E.^. CourseId E.==. exam E.^. ExamCourse + return (exam, course, school) + examJobs (Entity nExam Exam{..}, _, Entity _ School{..}) = do + newestResult <- lift . E.select . E.from $ \examResult -> do + E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam + return . E.max_ $ examResult E.^. ExamResultLastChanged + + whenIsJust examVisibleFrom $ \visibleFrom -> do + case over (mapped . _Value) ((max `on` NTop) examFinished) newestResult of + [E.Value (NTop (Just ts))] -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamResult{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom ts + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationExpiration $ max visibleFrom ts + } + _other -> return () + + whenIsJust examRegisterFrom $ \registerFrom -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationActive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom registerFrom + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) examRegisterTo + } + whenIsJust ((,) <$> examRegisterFrom <*> examRegisterTo) $ \(registerFrom, registerTo) -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) registerTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ registerTo + } + whenIsJust ((,) <$> examRegisterFrom <*> examDeregisterUntil) $ \(registerFrom, deregisterUntil) -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamDeregistrationSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) deregisterUntil + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ deregisterUntil + } + + let closeTime = case (examClosed, examFinished) of + (mClose, Just finish) + | isn't _ExamCloseSeparate schoolExamCloseMode -> Just $ maybe id min mClose finish + (Just close, _) + | is _ExamCloseSeparate schoolExamCloseMode -> Just close + _other -> Nothing + + case closeTime of + Just close -> do + -- If an exam that was previously under `ExamCloseSeparate` rules transitions to `ExamCloseOnFinish`, it might suddenly have been closed an arbitrary time ago + -- If `cronNotAfter` was only `appNotificationExpiration` in that case, no notification might ever be sent + -- That's probably fine. + + changedResults <- lift . E.select . E.from $ \examResult -> do + E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam + E.&&. examResult E.^. ExamResultLastChanged E.>. E.val close + return $ examResult E.^. ExamResultId + + case newestResult of + [E.Value (Just lastChange)] + | not $ null changedResults + -> tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResultsChanged{ nExamResults = Set.fromList $ map E.unValue changedResults }) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + _other -> return () + + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResults{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ close + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + Nothing -> return () + in runConduit $ transPipe lift examSelect .| C.mapM_ examJobs + + + let + externalExamJobs nExternalExam = do + newestResult <- lift . E.select . E.from $ \externalExamResult -> do + E.where_ $ externalExamResult E.^. ExternalExamResultExam E.==. E.val nExternalExam + return . E.max_ $ externalExamResult E.^. ExternalExamResultLastChanged + + case newestResult of + [E.Value (Just lastChange)] -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExternalExamResults{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange + , cronRepeat = CronRepeatOnChange + , cronRateLimit = nominalDay + , cronNotAfter = Left appNotificationExpiration } _other -> return () - whenIsJust examRegisterFrom $ \registerFrom -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationActive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom registerFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) examRegisterTo - } - whenIsJust ((,) <$> examRegisterFrom <*> examRegisterTo) $ \(registerFrom, registerTo) -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) registerTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ registerTo - } - whenIsJust ((,) <$> examRegisterFrom <*> examDeregisterUntil) $ \(registerFrom, deregisterUntil) -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamDeregistrationSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) deregisterUntil - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ deregisterUntil - } - - let closeTime = case (examClosed, examFinished) of - (mClose, Just finish) - | isn't _ExamCloseSeparate schoolExamCloseMode -> Just $ maybe id min mClose finish - (Just close, _) - | is _ExamCloseSeparate schoolExamCloseMode -> Just close - _other -> Nothing - - case closeTime of - Just close -> do - -- If an exam that was previously under `ExamCloseSeparate` rules transitions to `ExamCloseOnFinish`, it might suddenly have been closed an arbitrary time ago - -- If `cronNotAfter` was only `appNotificationExpiration` in that case, no notification might ever be sent - -- That's probably fine. - - changedResults <- lift . E.select . E.from $ \examResult -> do - E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam - E.&&. examResult E.^. ExamResultLastChanged E.>. E.val close - return $ examResult E.^. ExamResultId - - case newestResult of - [E.Value (Just lastChange)] - | not $ null changedResults - -> tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResultsChanged{ nExamResults = Set.fromList $ map E.unValue changedResults }) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - _other -> return () - - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResults{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ close - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - Nothing -> return () - in runConduit $ transPipe lift examSelect .| C.mapM_ examJobs + runConduit $ transPipe lift (selectKeys [] []) .| C.mapM_ externalExamJobs - let - externalExamJobs nExternalExam = do - newestResult <- lift . E.select . E.from $ \externalExamResult -> do - E.where_ $ externalExamResult E.^. ExternalExamResultExam E.==. E.val nExternalExam - return . E.max_ $ externalExamResult E.^. ExternalExamResultLastChanged + allocations <- lift $ selectList [] [] - case newestResult of - [E.Value (Just lastChange)] -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExternalExamResults{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange - , cronRepeat = CronRepeatOnChange - , cronRateLimit = nominalDay - , cronNotAfter = Left appNotificationExpiration - } - _other -> return () + let + allocationTimes :: EntityField Allocation (Maybe UTCTime) -> MergeHashMap UTCTime [Entity Allocation] + allocationTimes aField = flip foldMap allocations $ \allocEnt -> case allocEnt ^. fieldLens aField of + Nothing -> mempty + Just t -> _MergeHashMap # HashMap.singleton t (pure allocEnt) - runConduit $ transPipe lift (selectKeys [] []) .| C.mapM_ externalExamJobs + forM_ allocations $ \(Entity nAllocation _) -> do + doneSince <- lift $ fmap (E.unValue <=< listToMaybe) . E.select . E.from $ \participant -> do + E.where_ $ participant E.^. CourseParticipantAllocated E.==. E.just (E.val nAllocation) + return . E.max_ $ participant E.^. CourseParticipantRegistration + whenIsJust doneSince $ \doneSince' -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationResults{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay doneSince' + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationCollateDelay $ addUTCTime appNotificationExpiration doneSince' + } - allocations <- lift $ selectList [] [] - - let - allocationTimes :: EntityField Allocation (Maybe UTCTime) -> MergeHashMap UTCTime [Entity Allocation] - allocationTimes aField = flip foldMap allocations $ \allocEnt -> case allocEnt ^. fieldLens aField of - Nothing -> mempty - Just t -> _MergeHashMap # HashMap.singleton t (pure allocEnt) - - forM_ allocations $ \(Entity nAllocation _) -> do - doneSince <- lift $ fmap (E.unValue <=< listToMaybe) . E.select . E.from $ \participant -> do - E.where_ $ participant E.^. CourseParticipantAllocated E.==. E.just (E.val nAllocation) - return . E.max_ $ participant E.^. CourseParticipantRegistration - - whenIsJust doneSince $ \doneSince' -> + iforM_ (allocationTimes AllocationStaffRegisterFrom) $ \staffRegisterFrom allocs -> tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationResults{..}) + (JobCtlQueue $ JobQueueNotification NotificationAllocationStaffRegister{ nAllocations = setOf (folded . _entityKey) allocs }) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay doneSince' + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffRegisterFrom , cronRepeat = CronRepeatOnChange , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationCollateDelay $ addUTCTime appNotificationExpiration doneSince' + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffRegisterTo . to NTop . filtered (> NTop (Just staffRegisterFrom))) allocs + } + iforM_ (allocationTimes AllocationRegisterFrom) $ \registerFrom allocs -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationRegister{ nAllocations = setOf (folded . _entityKey) allocs }) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerFrom + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationRegisterTo . to NTop . filtered (> NTop (Just registerFrom))) allocs + } + iforM_ (allocationTimes AllocationStaffAllocationFrom) $ \staffAllocationFrom allocs -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationAllocation{ nAllocations = setOf (folded . _entityKey) allocs }) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffAllocationFrom + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just staffAllocationFrom))) allocs + } + iforM_ (allocationTimes AllocationRegisterTo) $ \registerTo allocs' -> do + let allocs = flip filter allocs' $ \(Entity _ Allocation{..}) -> maybe True (> registerTo) allocationStaffAllocationTo + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationUnratedApplications{ nAllocations = setOf (folded . _entityKey) allocs }) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs } - - iforM_ (allocationTimes AllocationStaffRegisterFrom) $ \staffRegisterFrom allocs -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationStaffRegister{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffRegisterFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffRegisterTo . to NTop . filtered (> NTop (Just staffRegisterFrom))) allocs - } - iforM_ (allocationTimes AllocationRegisterFrom) $ \registerFrom allocs -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationRegister{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationRegisterTo . to NTop . filtered (> NTop (Just registerFrom))) allocs - } - iforM_ (allocationTimes AllocationStaffAllocationFrom) $ \staffAllocationFrom allocs -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationAllocation{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffAllocationFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just staffAllocationFrom))) allocs - } - iforM (allocationTimes AllocationRegisterTo) $ \registerTo allocs' -> do - let allocs = flip filter allocs' $ \(Entity _ Allocation{..}) -> maybe True (> registerTo) allocationStaffAllocationTo - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationUnratedApplications{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs - } diff --git a/src/Jobs/Offload.hs b/src/Jobs/Offload.hs new file mode 100644 index 000000000..00d3291cd --- /dev/null +++ b/src/Jobs/Offload.hs @@ -0,0 +1,70 @@ +module Jobs.Offload + ( mkJobOffloadHandler + ) where + +import Import hiding (bracket, js) +import Jobs.Types +import Jobs.Queue + +import qualified Database.PostgreSQL.Simple as PG +import qualified Database.PostgreSQL.Simple.Types as PG +import qualified Database.PostgreSQL.Simple.Notification as PG + +import Database.Persist.Postgresql (PostgresConf, pgConnStr) + +import Data.Text.Encoding (decodeUtf8') + +import UnliftIO.Exception (bracket) + + +jobOffloadChannel :: Text +jobOffloadChannel = "job-offload" + +mkJobOffloadHandler :: forall m. + ( MonadResource m + , MonadUnliftIO m + , MonadThrow m, MonadReader UniWorX m + , MonadLogger m + ) + => PostgresConf -> JobMode + -> Maybe (m JobOffloadHandler) +mkJobOffloadHandler dbConf jMode + | is _JobsLocal jMode, hasn't (_jobsAcceptOffload . only True) jMode = Nothing + | otherwise = Just $ do + jobOffloadOutgoing <- newTVarIO mempty + jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do + myPid <- liftIO $ PG.getBackendPID pgConn + let shouldListen = has (_jobsAcceptOffload . only True) jMode + + when shouldListen $ + void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel) + + foreverBreak $ \(($ ()) -> terminate) -> do + UniWorX{appJobState} <- ask + shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + when shouldTerminate terminate + + let + getInput = do + n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn + if | notificationPid == myPid || notificationChannel /= (encodeUtf8 jobOffloadChannel) -> getInput + | otherwise -> return n + getOutput = atomically $ do + jQueue <- readTVar jobOffloadOutgoing + case jQueue of + j :< js -> j <$ writeTVar jobOffloadOutgoing js + _other -> mzero + + io <- lift $ if + | shouldListen -> getInput `race` getOutput + | otherwise -> Right <$> getOutput + + case io of + Left PG.Notification{..} + | Just jId <- fromPathPiece =<< either (const Nothing) Just (decodeUtf8' notificationData) + -> writeJobCtl $ JobCtlPerform jId + | otherwise + -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow notificationData + Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId) + + return JobOffloadHandler{..} diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index f08801213..487c0ed6d 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -9,7 +9,7 @@ module Jobs.Types , classifyJobCtl , YesodJobDB , JobHandler(..), _JobHandlerAtomic, _JobHandlerException - , JobContext(..) + , JobOffloadHandler(..), JobContext(..) , JobState(..), _jobWorkers, _jobWorkerName, _jobContext, _jobPoolManager, _jobCron, _jobShutdown, _jobCurrentCrontab , jobWorkerNames , JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob @@ -238,10 +238,16 @@ showWorkerId = tshow . hashUnique . jobWorkerUnique newWorkerId :: MonadIO m => m JobWorkerId newWorkerId = JobWorkerId <$> liftIO newUnique +data JobOffloadHandler = JobOffloadHandler + { jobOffloadHandler :: Async () + , jobOffloadOutgoing :: TVar (Seq QueuedJobId) + } + data JobContext = JobContext { jobCrontab :: TVar (Crontab JobCtl) , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) , jobHeldLocks :: TVar (Set QueuedJobId) + , jobOffload :: TMVar JobOffloadHandler } diff --git a/src/Settings.hs b/src/Settings.hs index c5bf12dcf..f7014bc6a 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -206,9 +206,16 @@ data AppSettings = AppSettings , appInitialInstanceID :: Maybe (Either FilePath UUID) , appRibbon :: Maybe Text + , appJobMode :: JobMode + , appMemcacheAuth :: Bool } deriving Show +data JobMode = JobsLocal { jobsAcceptOffload :: Bool } + | JobsOffload + deriving (Eq, Ord, Read, Show, Generic, Typeable) + deriving anyclass (Hashable) + data ApprootScope = ApprootUserGenerated | ApprootDefault deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite, Hashable) @@ -342,6 +349,11 @@ deriveFromJSON defaultOptions { fieldLabelModifier = camelToPathPiece' 2 } ''UserDefaultConf +deriveJSON defaultOptions + { fieldLabelModifier = camelToPathPiece' 1 + , constructorTagModifier = camelToPathPiece' 1 + } ''JobMode + instance FromJSON LdapConf where parseJSON = withObject "LdapConf" $ \o -> do ldapTls <- o .:? "tls" @@ -596,6 +608,8 @@ instance FromJSON AppSettings where appMemcacheAuth <- o .:? "memcache-auth" .!= False + appJobMode <- o .:? "job-mode" .!= JobsLocal True + return AppSettings{..} makeClassy_ ''AppSettings diff --git a/src/Utils.hs b/src/Utils.hs index 159e1d424..ce3dfe13b 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -70,6 +70,7 @@ import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Catch import Control.Monad.Morph (hoist) import Control.Monad.Fail +import Control.Monad.Trans.Cont (ContT, evalContT, callCC) import Language.Haskell.TH import Language.Haskell.TH.Instances () @@ -943,6 +944,11 @@ forever' :: Monad m -> m b forever' start cont = cont start >>= flip forever' cont +foreverBreak :: Monad m + => ((r -> ContT r m b) -> ContT r m a) + -> m r +foreverBreak cont = evalContT . callCC $ forever . cont + -------------- -- Foldable -- diff --git a/src/Utils/Lens.hs b/src/Utils/Lens.hs index 33c6dff44..b1e194d7d 100644 --- a/src/Utils/Lens.hs +++ b/src/Utils/Lens.hs @@ -5,6 +5,7 @@ module Utils.Lens ( module Utils.Lens ) where import Import.NoModel +import Settings import Model import Model.Rating import qualified ClassyPrelude.Yesod as Yesod (HasHttpManager(..)) @@ -272,6 +273,9 @@ makePrisms ''AllocationPriority makePrisms ''RoomReference makeLenses_ ''RoomReference +makePrisms ''JobMode +makeLenses_ ''JobMode + -- makeClassy_ ''Load -------------------------- diff --git a/src/Utils/Parameters.hs b/src/Utils/Parameters.hs index 5d8faa79f..679a9a46b 100644 --- a/src/Utils/Parameters.hs +++ b/src/Utils/Parameters.hs @@ -32,6 +32,7 @@ data GlobalGetParam = GetLang | GetDownload | GetError | GetSelectTable + | GetGenerateToken deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic) deriving anyclass (Universe, Finite)