From 09fb26f1a892feba32185166223f8f95611ea9ef Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 1 Feb 2021 09:52:47 +0100 Subject: [PATCH 1/4] feat(jobs): batch job offloading BREAKING CHANGE: Job offloading --- src/Handler/Admin/Crontab.hs | 9 + src/Handler/Metrics.hs | 1 + src/Jobs.hs | 110 +++-- src/Jobs/Crontab.hs | 799 ++++++++++++++++++----------------- src/Jobs/Offload.hs | 70 +++ src/Jobs/Types.hs | 8 +- src/Settings.hs | 14 + src/Utils.hs | 6 + src/Utils/Lens.hs | 4 + src/Utils/Parameters.hs | 1 + 10 files changed, 584 insertions(+), 438 deletions(-) create mode 100644 src/Jobs/Offload.hs diff --git a/src/Handler/Admin/Crontab.hs b/src/Handler/Admin/Crontab.hs index daff2070f..18225b5cb 100644 --- a/src/Handler/Admin/Crontab.hs +++ b/src/Handler/Admin/Crontab.hs @@ -17,6 +17,8 @@ import qualified Data.Text.Lazy.Builder as Text.Builder import qualified Data.HashSet as HashSet import qualified Data.HashMap.Strict as HashMap +import qualified Data.UUID as UUID + deriveJSON defaultOptions { constructorTagModifier = camelToPathPiece' 1 @@ -32,9 +34,12 @@ getAdminCrontabR = do let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _3 . _MatchNone) + instanceId <- getsYesod appInstanceID + selectRep $ do provideRep $ do crontabBearer <- runMaybeT . hoist runDB $ do + guardM $ hasGlobalGetParam GetGenerateToken uid <- MaybeT maybeAuthId guardM . lift . existsBy $ UniqueUserGroupMember UserGroupCrontab uid @@ -49,6 +54,10 @@ getAdminCrontabR = do
                 #{toPathPiece t}
+          
+
+
_{MsgInstanceId} +
#{UUID.toText instanceId}
$maybe (genTime, crontab) <- mCrontab

diff --git a/src/Handler/Metrics.hs b/src/Handler/Metrics.hs index e1e9a9b01..6cadaa2a0 100644 --- a/src/Handler/Metrics.hs +++ b/src/Handler/Metrics.hs @@ -26,6 +26,7 @@ getMetricsR = selectRep $ do samples <- sortBy metricSort <$> collectMetrics metricsBearer <- runMaybeT . hoist runDB $ do + guardM $ hasGlobalGetParam GetGenerateToken uid <- MaybeT maybeAuthId guardM . lift . existsBy $ UniqueUserGroupMember UserGroupMetrics uid diff --git a/src/Jobs.hs b/src/Jobs.hs index 4aca345e8..183df06e0 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -10,6 +10,7 @@ module Jobs import Import hiding (StateT) import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Queue +import Jobs.Offload import Jobs.Crontab import qualified Data.Conduit.List as C @@ -105,6 +106,7 @@ handleJobs foundation@UniWorX{..} jobShutdown <- liftIO newEmptyTMVarIO jobCurrentCrontab <- liftIO $ newTVarIO Nothing jobHeldLocks <- liftIO $ newTVarIO Set.empty + jobOffload <- liftIO newEmptyTMVarIO registerJobHeldLocksCount jobHeldLocks registerJobWorkerQueueDepth appJobState atomically $ putTMVar appJobState JobState @@ -155,7 +157,9 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> atomically . asum $ [ spawnMissingWorkers , reapDeadWorkers - ] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo ++ + ] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo + ++ maybeToList (manageOffloadHandler <$> mkJobOffloadHandler (appDatabaseConf appSettings') (appJobMode appSettings')) + ++ [ terminateGracefully terminate' ] where @@ -286,6 +290,27 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|] + manageOffloadHandler :: (ReaderT UniWorX m JobOffloadHandler) -> STM (ContT () m ()) + manageOffloadHandler spawn = do + shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + guard $ not shouldTerminate' + + JobContext{jobOffload} <- jobContext <$> readTMVar appJobState + cOffload <- tryReadTMVar jobOffload + + let respawn = do + nOffload <- lift $ runReaderT spawn foundation + atomically $ do + putTMVar jobOffload nOffload + whenIsJust cOffload $ \pOffload -> do + pOutgoing <- readTVar $ jobOffloadOutgoing pOffload + modifyTVar (jobOffloadOutgoing nOffload) (pOutgoing <>) + + respawn <$ case cOffload of + Nothing -> return () + Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler + + stopJobCtl :: MonadUnliftIO m => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobState} = do @@ -471,46 +496,55 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker $logDebugS logIdent "JobCtlQueue..." lift $ queueJob' job $logInfoS logIdent "JobCtlQueue" - handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do - content <- case fromJSON queuedJobContent of - Aeson.Success c -> return c - Aeson.Error t -> do - $logErrorS logIdent $ "Aeson decoding error: " <> pack t - throwM $ JInvalid jId j - - $logInfoS logIdent $ tshow content - $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content + handleCmd (JobCtlPerform jId) = do + jMode <- getsYesod $ view _appJobMode + case jMode of + JobsLocal{} -> performLocal + JobsOffload -> performOffload + where + performOffload = hoist atomically $ do + JobOffloadHandler{..} <- lift . readTMVar =<< asks jobOffload + lift $ modifyTVar jobOffloadOutgoing (`snoc` jId) + performLocal = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do + content <- case fromJSON queuedJobContent of + Aeson.Success c -> return c + Aeson.Error t -> do + $logErrorS logIdent $ "Aeson decoding error: " <> pack t + throwM $ JInvalid jId j - instanceID' <- getsYesod $ view instanceID - now <- liftIO getCurrentTime + $logInfoS logIdent $ tshow content + $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content - let cleanup = do - when queuedJobWriteLastExec $ - void $ upsertBy - (UniqueCronLastExec queuedJobContent) - CronLastExec - { cronLastExecJob = queuedJobContent - , cronLastExecTime = now - , cronLastExecInstance = instanceID' - } - [ CronLastExecTime =. now - , CronLastExecInstance =. instanceID' - ] - delete jId + instanceID' <- getsYesod $ view instanceID + now <- liftIO getCurrentTime - case performJob content of - JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do - act & withJobWorkerState wNum (JobWorkerExecJob content) - hoist lift cleanup - JobHandlerException act -> do - act & withJobWorkerState wNum (JobWorkerExecJob content) - runDB $ setSerializableBatch cleanup - JobHandlerAtomicWithFinalizer act fin -> do - res <- runDBJobs . setSerializableBatch $ do - res <- act & withJobWorkerState wNum (JobWorkerExecJob content) - hoist lift cleanup - return res - fin res + let cleanup = do + when queuedJobWriteLastExec $ + void $ upsertBy + (UniqueCronLastExec queuedJobContent) + CronLastExec + { cronLastExecJob = queuedJobContent + , cronLastExecTime = now + , cronLastExecInstance = instanceID' + } + [ CronLastExecTime =. now + , CronLastExecInstance =. instanceID' + ] + delete jId + + case performJob content of + JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do + act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + JobHandlerException act -> do + act & withJobWorkerState wNum (JobWorkerExecJob content) + runDB $ setSerializableBatch cleanup + JobHandlerAtomicWithFinalizer act fin -> do + res <- runDBJobs . setSerializableBatch $ do + res <- act & withJobWorkerState wNum (JobWorkerExecJob content) + hoist lift cleanup + return res + fin res handleCmd JobCtlDetermineCrontab = do $logDebugS logIdent "DetermineCrontab..." newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab' diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index e24cba39b..a7b87a1d3 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -27,17 +27,6 @@ determineCrontab :: DB (Crontab JobCtl) determineCrontab = execWriterT $ do UniWorX{ appSettings' = AppSettings{..} } <- getYesod - case appJobFlushInterval of - Just interval -> tell $ HashMap.singleton - JobCtlFlush - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = interval - , cronNotAfter = Right CronNotScheduled - } - Nothing -> return () - whenIsJust appJobCronInterval $ \interval -> tell $ HashMap.singleton JobCtlDetermineCrontab @@ -48,77 +37,6 @@ determineCrontab = execWriterT $ do , cronNotAfter = Right CronNotScheduled } - oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1] - whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton - (JobCtlQueue JobPruneInvitations) - Cron - { cronInitial = CronTimestamp $ utcToLocalTime oldestInvUTC - , cronRepeat = CronRepeatOnChange - , cronRateLimit = nominalDay - , cronNotAfter = Right CronNotScheduled - } - - oldestSessionFile <- lift $ preview (_head . _entityVal . _sessionFileTouched) <$> selectList [] [Asc SessionFileTouched, LimitTo 1] - whenIsJust oldestSessionFile $ \oldest -> tell $ HashMap.singleton - (JobCtlQueue JobPruneSessionFiles) - Cron - { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appSessionFilesExpire oldest - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appSessionFilesExpire / 2 - , cronNotAfter = Right CronNotScheduled - } - - oldestFallbackPersonalisedSheetFilesKey <- lift $ preview (_head . _entityVal . _fallbackPersonalisedSheetFilesKeyGenerated) <$> selectList [] [Asc FallbackPersonalisedSheetFilesKeyGenerated, LimitTo 1] - whenIsJust oldestFallbackPersonalisedSheetFilesKey $ \oldest -> tell $ HashMap.singleton - (JobCtlQueue JobPruneFallbackPersonalisedSheetFilesKeys) - Cron - { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appFallbackPersonalisedSheetFilesKeysExpire oldest - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appFallbackPersonalisedSheetFilesKeysExpire / 2 - , cronNotAfter = Right CronNotScheduled - } - - oldestSentMail <- lift $ preview (_head . _entityVal . _sentMailSentAt) <$> selectList [] [Asc SentMailSentAt, LimitTo 1] - whenIsJust ((,) <$> appMailRetainSent <*> oldestSentMail) $ \(retain, oldest) -> tell $ HashMap.singleton - (JobCtlQueue JobPruneOldSentMails) - Cron - { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime retain oldest - , cronRepeat = CronRepeatOnChange - , cronRateLimit = retain / 2 - , cronNotAfter = Right CronNotScheduled - } - - - whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval -> - tell $ HashMap.singleton - (JobCtlQueue JobInjectFiles) - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = iInterval - , cronNotAfter = Right CronNotScheduled - } - - whenIsJust appRechunkFiles $ \rInterval -> - tell $ HashMap.singleton - (JobCtlQueue JobRechunkFiles) - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = rInterval - , cronNotAfter = Right CronNotScheduled - } - - whenIsJust appCheckMissingFiles $ \rInterval -> - tell $ HashMap.singleton - (JobCtlQueue JobDetectMissingFiles) - Cron - { cronInitial = CronAsap - , cronRepeat = CronRepeatScheduled CronAsap - , cronRateLimit = rInterval - , cronNotAfter = Right CronNotScheduled - } - tell . flip foldMap universeF $ \kind -> case appHealthCheckInterval kind of Just int -> HashMap.singleton @@ -131,354 +49,437 @@ determineCrontab = execWriterT $ do } Nothing -> mempty - let newyear = cronCalendarAny - { cronDayOfYear = cronMatchOne 1 - } - in tell $ HashMap.singleton - (JobCtlQueue JobTruncateTransactionLog) + when (is _JobsLocal appJobMode) $ do + case appJobFlushInterval of + Just interval -> tell $ HashMap.singleton + JobCtlFlush Cron - { cronInitial = newyear - , cronRepeat = CronRepeatScheduled newyear - , cronRateLimit = minNominalYear + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = interval , cronNotAfter = Right CronNotScheduled } + Nothing -> return () - oldestLogEntry <- fmap listToMaybe . lift . E.select . E.from $ \transactionLog -> do - E.where_ . E.not_ . E.isNothing $ transactionLog E.^. TransactionLogRemote - E.orderBy [E.asc $ transactionLog E.^. TransactionLogTime] - E.limit 1 - return $ transactionLog E.^. TransactionLogTime - for_ oldestLogEntry $ \(E.Value oldestEntry) -> - tell $ HashMap.singleton - (JobCtlQueue JobDeleteTransactionLogIPs) + oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1] + whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton + (JobCtlQueue JobPruneInvitations) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appTransactionLogIPRetentionTime oldestEntry + { cronInitial = CronTimestamp $ utcToLocalTime oldestInvUTC , cronRepeat = CronRepeatOnChange , cronRateLimit = nominalDay , cronNotAfter = Right CronNotScheduled } - let - getNextIntervals within interval cInterval = do - now <- liftIO getPOSIXTime - return $ do - let - epochInterval = within / 2 - (currEpoch, epochNow) = now `divMod'` epochInterval - currInterval = epochNow `div'` interval - numIntervals = max 1 . floor $ epochInterval / interval - n = ceiling $ 4 * cInterval / interval - i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ] - let - ((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals - nextIntervalTime - = posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval - return (nextEpoch, nextInterval, nextIntervalTime, numIntervals) - - if - | is _Just appLdapConf - , Just syncWithin <- appSynchroniseLdapUsersWithin - , Just cInterval <- appJobCronInterval - -> do - nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval - - forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do - tell $ HashMap.singleton - (JobCtlQueue JobSynchroniseLdap - { jEpoch = fromInteger nextEpoch - , jNumIterations = fromInteger numIntervals - , jIteration = fromInteger nextInterval - }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime - , cronRepeat = CronRepeatNever - , cronRateLimit = appSynchroniseLdapUsersInterval - , cronNotAfter = Left syncWithin - } - | otherwise - -> return () - - whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do - nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval - forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do - tell $ HashMap.singleton - (JobCtlQueue JobPruneUnreferencedFiles - { jEpoch = fromInteger nextEpoch - , jNumIterations = fromInteger numIntervals - , jIteration = fromInteger nextInterval - } - ) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime - , cronRepeat = CronRepeatNever - , cronRateLimit = appPruneUnreferencedFilesInterval - , cronNotAfter = Left within - } - - let - sheetJobs (Entity nSheet Sheet{..}) = do - for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo - } - for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do - guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom - guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) - (fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]) - guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet] - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetHint{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo - } - for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do - guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom - guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left nominalDay - } - for_ sheetActiveTo $ \aTo -> do - whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo' - , cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo - } - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - when sheetAutoDistribute $ - tell $ HashMap.singleton - (JobCtlQueue $ JobDistributeCorrections nSheet) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo - , cronRepeat = CronRepeatNever - , cronRateLimit = 3600 -- Irrelevant due to `cronRepeat` - , cronNotAfter = Left nominalDay - } - - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs - - let - correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB () - correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } ) + oldestSessionFile <- lift $ preview (_head . _entityVal . _sessionFileTouched) <$> selectList [] [Asc SessionFileTouched, LimitTo 1] + whenIsJust oldestSessionFile $ \oldest -> tell $ HashMap.singleton + (JobCtlQueue JobPruneSessionFiles) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay time - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration + { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appSessionFilesExpire oldest + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appSessionFilesExpire / 2 + , cronNotAfter = Right CronNotScheduled } - submissionsByCorrector :: Entity Submission -> Map (UserId, SheetId) (Max UTCTime) - submissionsByCorrector (Entity _ sub) - | Just ratingBy <- submissionRatingBy sub - , Just assigned <- submissionRatingAssigned sub - , not $ submissionRatingDone sub - = Map.singleton (ratingBy, submissionSheet sub) $ Max assigned - | otherwise - = Map.empty + oldestFallbackPersonalisedSheetFilesKey <- lift $ preview (_head . _entityVal . _fallbackPersonalisedSheetFilesKeyGenerated) <$> selectList [] [Asc FallbackPersonalisedSheetFilesKeyGenerated, LimitTo 1] + whenIsJust oldestFallbackPersonalisedSheetFilesKey $ \oldest -> tell $ HashMap.singleton + (JobCtlQueue JobPruneFallbackPersonalisedSheetFilesKeys) + Cron + { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appFallbackPersonalisedSheetFilesKeysExpire oldest + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appFallbackPersonalisedSheetFilesKeysExpire / 2 + , cronNotAfter = Right CronNotScheduled + } - collateSubmissionsByCorrector acc entity = Map.unionWith (<>) acc $ submissionsByCorrector entity - correctorNotifications <=< runConduit $ - transPipe lift ( selectSource [ SubmissionRatingBy !=. Nothing, SubmissionRatingAssigned !=. Nothing ] [] - ) - .| C.fold collateSubmissionsByCorrector Map.empty + oldestSentMail <- lift $ preview (_head . _entityVal . _sentMailSentAt) <$> selectList [] [Asc SentMailSentAt, LimitTo 1] + whenIsJust ((,) <$> appMailRetainSent <*> oldestSentMail) $ \(retain, oldest) -> tell $ HashMap.singleton + (JobCtlQueue JobPruneOldSentMails) + Cron + { cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime retain oldest + , cronRepeat = CronRepeatOnChange + , cronRateLimit = retain / 2 + , cronNotAfter = Right CronNotScheduled + } - let - examSelect = E.selectSource . E.from $ \(exam `E.InnerJoin` course `E.InnerJoin` school) -> do - E.on $ school E.^. SchoolId E.==. course E.^. CourseSchool - E.on $ course E.^. CourseId E.==. exam E.^. ExamCourse - return (exam, course, school) - examJobs (Entity nExam Exam{..}, _, Entity _ School{..}) = do - newestResult <- lift . E.select . E.from $ \examResult -> do - E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam - return . E.max_ $ examResult E.^. ExamResultLastChanged + whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobInjectFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = iInterval + , cronNotAfter = Right CronNotScheduled + } - whenIsJust examVisibleFrom $ \visibleFrom -> do - case over (mapped . _Value) ((max `on` NTop) examFinished) newestResult of - [E.Value (NTop (Just ts))] -> + whenIsJust appRechunkFiles $ \rInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobRechunkFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = rInterval + , cronNotAfter = Right CronNotScheduled + } + + whenIsJust appCheckMissingFiles $ \rInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobDetectMissingFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = rInterval + , cronNotAfter = Right CronNotScheduled + } + + let newyear = cronCalendarAny + { cronDayOfYear = cronMatchOne 1 + } + in tell $ HashMap.singleton + (JobCtlQueue JobTruncateTransactionLog) + Cron + { cronInitial = newyear + , cronRepeat = CronRepeatScheduled newyear + , cronRateLimit = minNominalYear + , cronNotAfter = Right CronNotScheduled + } + + oldestLogEntry <- fmap listToMaybe . lift . E.select . E.from $ \transactionLog -> do + E.where_ . E.not_ . E.isNothing $ transactionLog E.^. TransactionLogRemote + E.orderBy [E.asc $ transactionLog E.^. TransactionLogTime] + E.limit 1 + return $ transactionLog E.^. TransactionLogTime + for_ oldestLogEntry $ \(E.Value oldestEntry) -> + tell $ HashMap.singleton + (JobCtlQueue JobDeleteTransactionLogIPs) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appTransactionLogIPRetentionTime oldestEntry + , cronRepeat = CronRepeatOnChange + , cronRateLimit = nominalDay + , cronNotAfter = Right CronNotScheduled + } + + let + getNextIntervals within interval cInterval = do + now <- liftIO getPOSIXTime + return $ do + let + epochInterval = within / 2 + (currEpoch, epochNow) = now `divMod'` epochInterval + currInterval = epochNow `div'` interval + numIntervals = max 1 . floor $ epochInterval / interval + n = ceiling $ 4 * cInterval / interval + i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ] + let + ((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals + nextIntervalTime + = posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval + return (nextEpoch, nextInterval, nextIntervalTime, numIntervals) + + if + | is _Just appLdapConf + , Just syncWithin <- appSynchroniseLdapUsersWithin + , Just cInterval <- appJobCronInterval + -> do + nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval + + forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamResult{..}) + (JobCtlQueue JobSynchroniseLdap + { jEpoch = fromInteger nextEpoch + , jNumIterations = fromInteger numIntervals + , jIteration = fromInteger nextInterval + }) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom ts - , cronRepeat = CronRepeatOnChange + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime + , cronRepeat = CronRepeatNever + , cronRateLimit = appSynchroniseLdapUsersInterval + , cronNotAfter = Left syncWithin + } + | otherwise + -> return () + + whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do + nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval + forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do + tell $ HashMap.singleton + (JobCtlQueue JobPruneUnreferencedFiles + { jEpoch = fromInteger nextEpoch + , jNumIterations = fromInteger numIntervals + , jIteration = fromInteger nextInterval + } + ) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime + , cronRepeat = CronRepeatNever + , cronRateLimit = appPruneUnreferencedFilesInterval + , cronNotAfter = Left within + } + + let + sheetJobs (Entity nSheet Sheet{..}) = do + for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetActive{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo + } + for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do + guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom + guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom) + (fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]) + guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet] + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetHint{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo + } + for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do + guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom + guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet] + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left nominalDay + } + for_ sheetActiveTo $ \aTo -> do + whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo' + , cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationExpiration $ max visibleFrom ts + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo + } + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + when sheetAutoDistribute $ + tell $ HashMap.singleton + (JobCtlQueue $ JobDistributeCorrections nSheet) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo + , cronRepeat = CronRepeatNever + , cronRateLimit = 3600 -- Irrelevant due to `cronRepeat` + , cronNotAfter = Left nominalDay + } + + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs + + let + correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB () + correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } ) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay time + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + + submissionsByCorrector :: Entity Submission -> Map (UserId, SheetId) (Max UTCTime) + submissionsByCorrector (Entity _ sub) + | Just ratingBy <- submissionRatingBy sub + , Just assigned <- submissionRatingAssigned sub + , not $ submissionRatingDone sub + = Map.singleton (ratingBy, submissionSheet sub) $ Max assigned + | otherwise + = Map.empty + + collateSubmissionsByCorrector acc entity = Map.unionWith (<>) acc $ submissionsByCorrector entity + correctorNotifications <=< runConduit $ + transPipe lift ( selectSource [ SubmissionRatingBy !=. Nothing, SubmissionRatingAssigned !=. Nothing ] [] + ) + .| C.fold collateSubmissionsByCorrector Map.empty + + + let + examSelect = E.selectSource . E.from $ \(exam `E.InnerJoin` course `E.InnerJoin` school) -> do + E.on $ school E.^. SchoolId E.==. course E.^. CourseSchool + E.on $ course E.^. CourseId E.==. exam E.^. ExamCourse + return (exam, course, school) + examJobs (Entity nExam Exam{..}, _, Entity _ School{..}) = do + newestResult <- lift . E.select . E.from $ \examResult -> do + E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam + return . E.max_ $ examResult E.^. ExamResultLastChanged + + whenIsJust examVisibleFrom $ \visibleFrom -> do + case over (mapped . _Value) ((max `on` NTop) examFinished) newestResult of + [E.Value (NTop (Just ts))] -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamResult{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom ts + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationExpiration $ max visibleFrom ts + } + _other -> return () + + whenIsJust examRegisterFrom $ \registerFrom -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationActive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom registerFrom + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) examRegisterTo + } + whenIsJust ((,) <$> examRegisterFrom <*> examRegisterTo) $ \(registerFrom, registerTo) -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) registerTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ registerTo + } + whenIsJust ((,) <$> examRegisterFrom <*> examDeregisterUntil) $ \(registerFrom, deregisterUntil) -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamDeregistrationSoonInactive{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) deregisterUntil + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ deregisterUntil + } + + let closeTime = case (examClosed, examFinished) of + (mClose, Just finish) + | isn't _ExamCloseSeparate schoolExamCloseMode -> Just $ maybe id min mClose finish + (Just close, _) + | is _ExamCloseSeparate schoolExamCloseMode -> Just close + _other -> Nothing + + case closeTime of + Just close -> do + -- If an exam that was previously under `ExamCloseSeparate` rules transitions to `ExamCloseOnFinish`, it might suddenly have been closed an arbitrary time ago + -- If `cronNotAfter` was only `appNotificationExpiration` in that case, no notification might ever be sent + -- That's probably fine. + + changedResults <- lift . E.select . E.from $ \examResult -> do + E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam + E.&&. examResult E.^. ExamResultLastChanged E.>. E.val close + return $ examResult E.^. ExamResultId + + case newestResult of + [E.Value (Just lastChange)] + | not $ null changedResults + -> tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResultsChanged{ nExamResults = Set.fromList $ map E.unValue changedResults }) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + _other -> return () + + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResults{..}) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ close + , cronRepeat = CronRepeatNever + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Left appNotificationExpiration + } + Nothing -> return () + in runConduit $ transPipe lift examSelect .| C.mapM_ examJobs + + + let + externalExamJobs nExternalExam = do + newestResult <- lift . E.select . E.from $ \externalExamResult -> do + E.where_ $ externalExamResult E.^. ExternalExamResultExam E.==. E.val nExternalExam + return . E.max_ $ externalExamResult E.^. ExternalExamResultLastChanged + + case newestResult of + [E.Value (Just lastChange)] -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExternalExamResults{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange + , cronRepeat = CronRepeatOnChange + , cronRateLimit = nominalDay + , cronNotAfter = Left appNotificationExpiration } _other -> return () - whenIsJust examRegisterFrom $ \registerFrom -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationActive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom registerFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) examRegisterTo - } - whenIsJust ((,) <$> examRegisterFrom <*> examRegisterTo) $ \(registerFrom, registerTo) -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamRegistrationSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) registerTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ registerTo - } - whenIsJust ((,) <$> examRegisterFrom <*> examDeregisterUntil) $ \(registerFrom, deregisterUntil) -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamDeregistrationSoonInactive{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) deregisterUntil - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ deregisterUntil - } - - let closeTime = case (examClosed, examFinished) of - (mClose, Just finish) - | isn't _ExamCloseSeparate schoolExamCloseMode -> Just $ maybe id min mClose finish - (Just close, _) - | is _ExamCloseSeparate schoolExamCloseMode -> Just close - _other -> Nothing - - case closeTime of - Just close -> do - -- If an exam that was previously under `ExamCloseSeparate` rules transitions to `ExamCloseOnFinish`, it might suddenly have been closed an arbitrary time ago - -- If `cronNotAfter` was only `appNotificationExpiration` in that case, no notification might ever be sent - -- That's probably fine. - - changedResults <- lift . E.select . E.from $ \examResult -> do - E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam - E.&&. examResult E.^. ExamResultLastChanged E.>. E.val close - return $ examResult E.^. ExamResultId - - case newestResult of - [E.Value (Just lastChange)] - | not $ null changedResults - -> tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResultsChanged{ nExamResults = Set.fromList $ map E.unValue changedResults }) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - _other -> return () - - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResults{..}) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ close - , cronRepeat = CronRepeatNever - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Left appNotificationExpiration - } - Nothing -> return () - in runConduit $ transPipe lift examSelect .| C.mapM_ examJobs + runConduit $ transPipe lift (selectKeys [] []) .| C.mapM_ externalExamJobs - let - externalExamJobs nExternalExam = do - newestResult <- lift . E.select . E.from $ \externalExamResult -> do - E.where_ $ externalExamResult E.^. ExternalExamResultExam E.==. E.val nExternalExam - return . E.max_ $ externalExamResult E.^. ExternalExamResultLastChanged + allocations <- lift $ selectList [] [] - case newestResult of - [E.Value (Just lastChange)] -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationExamOfficeExternalExamResults{..}) - Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange - , cronRepeat = CronRepeatOnChange - , cronRateLimit = nominalDay - , cronNotAfter = Left appNotificationExpiration - } - _other -> return () + let + allocationTimes :: EntityField Allocation (Maybe UTCTime) -> MergeHashMap UTCTime [Entity Allocation] + allocationTimes aField = flip foldMap allocations $ \allocEnt -> case allocEnt ^. fieldLens aField of + Nothing -> mempty + Just t -> _MergeHashMap # HashMap.singleton t (pure allocEnt) - runConduit $ transPipe lift (selectKeys [] []) .| C.mapM_ externalExamJobs + forM_ allocations $ \(Entity nAllocation _) -> do + doneSince <- lift $ fmap (E.unValue <=< listToMaybe) . E.select . E.from $ \participant -> do + E.where_ $ participant E.^. CourseParticipantAllocated E.==. E.just (E.val nAllocation) + return . E.max_ $ participant E.^. CourseParticipantRegistration + whenIsJust doneSince $ \doneSince' -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationResults{..}) + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay doneSince' + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationCollateDelay $ addUTCTime appNotificationExpiration doneSince' + } - allocations <- lift $ selectList [] [] - - let - allocationTimes :: EntityField Allocation (Maybe UTCTime) -> MergeHashMap UTCTime [Entity Allocation] - allocationTimes aField = flip foldMap allocations $ \allocEnt -> case allocEnt ^. fieldLens aField of - Nothing -> mempty - Just t -> _MergeHashMap # HashMap.singleton t (pure allocEnt) - - forM_ allocations $ \(Entity nAllocation _) -> do - doneSince <- lift $ fmap (E.unValue <=< listToMaybe) . E.select . E.from $ \participant -> do - E.where_ $ participant E.^. CourseParticipantAllocated E.==. E.just (E.val nAllocation) - return . E.max_ $ participant E.^. CourseParticipantRegistration - - whenIsJust doneSince $ \doneSince' -> + iforM_ (allocationTimes AllocationStaffRegisterFrom) $ \staffRegisterFrom allocs -> tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationResults{..}) + (JobCtlQueue $ JobQueueNotification NotificationAllocationStaffRegister{ nAllocations = setOf (folded . _entityKey) allocs }) Cron - { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay doneSince' + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffRegisterFrom , cronRepeat = CronRepeatOnChange , cronRateLimit = appNotificationRateLimit - , cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationCollateDelay $ addUTCTime appNotificationExpiration doneSince' + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffRegisterTo . to NTop . filtered (> NTop (Just staffRegisterFrom))) allocs + } + iforM_ (allocationTimes AllocationRegisterFrom) $ \registerFrom allocs -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationRegister{ nAllocations = setOf (folded . _entityKey) allocs }) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerFrom + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationRegisterTo . to NTop . filtered (> NTop (Just registerFrom))) allocs + } + iforM_ (allocationTimes AllocationStaffAllocationFrom) $ \staffAllocationFrom allocs -> + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationAllocation{ nAllocations = setOf (folded . _entityKey) allocs }) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffAllocationFrom + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just staffAllocationFrom))) allocs + } + iforM_ (allocationTimes AllocationRegisterTo) $ \registerTo allocs' -> do + let allocs = flip filter allocs' $ \(Entity _ Allocation{..}) -> maybe True (> registerTo) allocationStaffAllocationTo + tell $ HashMap.singleton + (JobCtlQueue $ JobQueueNotification NotificationAllocationUnratedApplications{ nAllocations = setOf (folded . _entityKey) allocs }) + Cron + { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerTo + , cronRepeat = CronRepeatOnChange + , cronRateLimit = appNotificationRateLimit + , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs } - - iforM_ (allocationTimes AllocationStaffRegisterFrom) $ \staffRegisterFrom allocs -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationStaffRegister{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffRegisterFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffRegisterTo . to NTop . filtered (> NTop (Just staffRegisterFrom))) allocs - } - iforM_ (allocationTimes AllocationRegisterFrom) $ \registerFrom allocs -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationRegister{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationRegisterTo . to NTop . filtered (> NTop (Just registerFrom))) allocs - } - iforM_ (allocationTimes AllocationStaffAllocationFrom) $ \staffAllocationFrom allocs -> - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationAllocation{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffAllocationFrom - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just staffAllocationFrom))) allocs - } - iforM (allocationTimes AllocationRegisterTo) $ \registerTo allocs' -> do - let allocs = flip filter allocs' $ \(Entity _ Allocation{..}) -> maybe True (> registerTo) allocationStaffAllocationTo - tell $ HashMap.singleton - (JobCtlQueue $ JobQueueNotification NotificationAllocationUnratedApplications{ nAllocations = setOf (folded . _entityKey) allocs }) - Cron - { cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerTo - , cronRepeat = CronRepeatOnChange - , cronRateLimit = appNotificationRateLimit - , cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs - } diff --git a/src/Jobs/Offload.hs b/src/Jobs/Offload.hs new file mode 100644 index 000000000..00d3291cd --- /dev/null +++ b/src/Jobs/Offload.hs @@ -0,0 +1,70 @@ +module Jobs.Offload + ( mkJobOffloadHandler + ) where + +import Import hiding (bracket, js) +import Jobs.Types +import Jobs.Queue + +import qualified Database.PostgreSQL.Simple as PG +import qualified Database.PostgreSQL.Simple.Types as PG +import qualified Database.PostgreSQL.Simple.Notification as PG + +import Database.Persist.Postgresql (PostgresConf, pgConnStr) + +import Data.Text.Encoding (decodeUtf8') + +import UnliftIO.Exception (bracket) + + +jobOffloadChannel :: Text +jobOffloadChannel = "job-offload" + +mkJobOffloadHandler :: forall m. + ( MonadResource m + , MonadUnliftIO m + , MonadThrow m, MonadReader UniWorX m + , MonadLogger m + ) + => PostgresConf -> JobMode + -> Maybe (m JobOffloadHandler) +mkJobOffloadHandler dbConf jMode + | is _JobsLocal jMode, hasn't (_jobsAcceptOffload . only True) jMode = Nothing + | otherwise = Just $ do + jobOffloadOutgoing <- newTVarIO mempty + jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do + myPid <- liftIO $ PG.getBackendPID pgConn + let shouldListen = has (_jobsAcceptOffload . only True) jMode + + when shouldListen $ + void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel) + + foreverBreak $ \(($ ()) -> terminate) -> do + UniWorX{appJobState} <- ask + shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + when shouldTerminate terminate + + let + getInput = do + n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn + if | notificationPid == myPid || notificationChannel /= (encodeUtf8 jobOffloadChannel) -> getInput + | otherwise -> return n + getOutput = atomically $ do + jQueue <- readTVar jobOffloadOutgoing + case jQueue of + j :< js -> j <$ writeTVar jobOffloadOutgoing js + _other -> mzero + + io <- lift $ if + | shouldListen -> getInput `race` getOutput + | otherwise -> Right <$> getOutput + + case io of + Left PG.Notification{..} + | Just jId <- fromPathPiece =<< either (const Nothing) Just (decodeUtf8' notificationData) + -> writeJobCtl $ JobCtlPerform jId + | otherwise + -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow notificationData + Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId) + + return JobOffloadHandler{..} diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index f08801213..487c0ed6d 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -9,7 +9,7 @@ module Jobs.Types , classifyJobCtl , YesodJobDB , JobHandler(..), _JobHandlerAtomic, _JobHandlerException - , JobContext(..) + , JobOffloadHandler(..), JobContext(..) , JobState(..), _jobWorkers, _jobWorkerName, _jobContext, _jobPoolManager, _jobCron, _jobShutdown, _jobCurrentCrontab , jobWorkerNames , JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob @@ -238,10 +238,16 @@ showWorkerId = tshow . hashUnique . jobWorkerUnique newWorkerId :: MonadIO m => m JobWorkerId newWorkerId = JobWorkerId <$> liftIO newUnique +data JobOffloadHandler = JobOffloadHandler + { jobOffloadHandler :: Async () + , jobOffloadOutgoing :: TVar (Seq QueuedJobId) + } + data JobContext = JobContext { jobCrontab :: TVar (Crontab JobCtl) , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) , jobHeldLocks :: TVar (Set QueuedJobId) + , jobOffload :: TMVar JobOffloadHandler } diff --git a/src/Settings.hs b/src/Settings.hs index c5bf12dcf..f7014bc6a 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -206,9 +206,16 @@ data AppSettings = AppSettings , appInitialInstanceID :: Maybe (Either FilePath UUID) , appRibbon :: Maybe Text + , appJobMode :: JobMode + , appMemcacheAuth :: Bool } deriving Show +data JobMode = JobsLocal { jobsAcceptOffload :: Bool } + | JobsOffload + deriving (Eq, Ord, Read, Show, Generic, Typeable) + deriving anyclass (Hashable) + data ApprootScope = ApprootUserGenerated | ApprootDefault deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite, Hashable) @@ -342,6 +349,11 @@ deriveFromJSON defaultOptions { fieldLabelModifier = camelToPathPiece' 2 } ''UserDefaultConf +deriveJSON defaultOptions + { fieldLabelModifier = camelToPathPiece' 1 + , constructorTagModifier = camelToPathPiece' 1 + } ''JobMode + instance FromJSON LdapConf where parseJSON = withObject "LdapConf" $ \o -> do ldapTls <- o .:? "tls" @@ -596,6 +608,8 @@ instance FromJSON AppSettings where appMemcacheAuth <- o .:? "memcache-auth" .!= False + appJobMode <- o .:? "job-mode" .!= JobsLocal True + return AppSettings{..} makeClassy_ ''AppSettings diff --git a/src/Utils.hs b/src/Utils.hs index 159e1d424..ce3dfe13b 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -70,6 +70,7 @@ import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Catch import Control.Monad.Morph (hoist) import Control.Monad.Fail +import Control.Monad.Trans.Cont (ContT, evalContT, callCC) import Language.Haskell.TH import Language.Haskell.TH.Instances () @@ -943,6 +944,11 @@ forever' :: Monad m -> m b forever' start cont = cont start >>= flip forever' cont +foreverBreak :: Monad m + => ((r -> ContT r m b) -> ContT r m a) + -> m r +foreverBreak cont = evalContT . callCC $ forever . cont + -------------- -- Foldable -- diff --git a/src/Utils/Lens.hs b/src/Utils/Lens.hs index 33c6dff44..b1e194d7d 100644 --- a/src/Utils/Lens.hs +++ b/src/Utils/Lens.hs @@ -5,6 +5,7 @@ module Utils.Lens ( module Utils.Lens ) where import Import.NoModel +import Settings import Model import Model.Rating import qualified ClassyPrelude.Yesod as Yesod (HasHttpManager(..)) @@ -272,6 +273,9 @@ makePrisms ''AllocationPriority makePrisms ''RoomReference makeLenses_ ''RoomReference +makePrisms ''JobMode +makeLenses_ ''JobMode + -- makeClassy_ ''Load -------------------------- diff --git a/src/Utils/Parameters.hs b/src/Utils/Parameters.hs index 5d8faa79f..679a9a46b 100644 --- a/src/Utils/Parameters.hs +++ b/src/Utils/Parameters.hs @@ -32,6 +32,7 @@ data GlobalGetParam = GetLang | GetDownload | GetError | GetSelectTable + | GetGenerateToken deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic) deriving anyclass (Universe, Finite) From b0dcbd68fef957d2936c1a665368693352970d60 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 1 Feb 2021 10:51:26 +0100 Subject: [PATCH 2/4] chore: bump workflows --- testdata/workflows | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testdata/workflows b/testdata/workflows index b05854bee..cf7dcf58c 160000 --- a/testdata/workflows +++ b/testdata/workflows @@ -1 +1 @@ -Subproject commit b05854beedd35e2d5ffe43628b747efa86e92ffb +Subproject commit cf7dcf58c524176bbdd27ff279d68a5ab90cd06e From b814bc094adb09be088c2f8c2750d42f2396bd14 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 1 Feb 2021 17:37:55 +0100 Subject: [PATCH 3/4] refactor(workflows): shared workflow graphs --- models/workflows.model | 11 ++-- src/Foundation/Authorization.hs | 19 +++---- src/Foundation/Yesod/Middleware.hs | 3 +- src/Handler/Utils/Workflow/EdgeForm.hs | 17 +++--- src/Handler/Workflow/Definition/Edit.hs | 6 ++- .../Workflow/Definition/Instantiate.hs | 4 +- src/Handler/Workflow/Definition/New.hs | 4 +- src/Handler/Workflow/Instance/Form.hs | 2 +- src/Handler/Workflow/Instance/Initiate.hs | 3 +- src/Handler/Workflow/Instance/New.hs | 3 +- src/Handler/Workflow/Workflow/List.hs | 8 +-- src/Handler/Workflow/Workflow/Workflow.hs | 2 +- src/Jobs.hs | 2 +- src/Jobs/Handler/Files.hs | 4 +- src/Jobs/Offload.hs | 2 +- src/Model/Migration/Definitions.hs | 52 +++++++++++++++++++ src/Model/Types/Workflow.hs | 17 +++++- src/Utils/Workflow.hs | 38 ++++++++++++++ test/Database/Fill.hs | 11 ++-- 19 files changed, 161 insertions(+), 47 deletions(-) diff --git a/models/workflows.model b/models/workflows.model index 590b79744..7561e9c65 100644 --- a/models/workflows.model +++ b/models/workflows.model @@ -1,5 +1,10 @@ -WorkflowDefinition +SharedWorkflowGraph + hash WorkflowGraphReference graph (WorkflowGraph FileReference SqlBackendKey) -- UserId + Primary hash + +WorkflowDefinition + graph SharedWorkflowGraphId scope WorkflowScope' name WorkflowDefinitionName instanceCategory WorkflowInstanceCategory Maybe @@ -21,7 +26,7 @@ WorkflowDefinitionInstanceDescription WorkflowInstance definition WorkflowDefinitionId Maybe - graph (WorkflowGraph FileReference SqlBackendKey) -- UserId + graph SharedWorkflowGraphId scope (WorkflowScope TermIdentifier SchoolShorthand SqlBackendKey) -- TermId, SchoolId, CourseId name WorkflowInstanceName category WorkflowInstanceCategory Maybe @@ -37,5 +42,5 @@ WorkflowInstanceDescription WorkflowWorkflow instance WorkflowInstanceId Maybe scope (WorkflowScope TermIdentifier SchoolShorthand SqlBackendKey) -- TermId, SchoolId, CourseId - graph (WorkflowGraph FileReference SqlBackendKey) -- UserId + graph SharedWorkflowGraphId state (WorkflowState FileReference SqlBackendKey) -- UserId diff --git a/src/Foundation/Authorization.hs b/src/Foundation/Authorization.hs index ccfa3b84c..6c76bd8a9 100644 --- a/src/Foundation/Authorization.hs +++ b/src/Foundation/Authorization.hs @@ -1416,9 +1416,8 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite -> wInitiate win rScope = selectLanguageI18n <=< $memcacheAuthHere' (Right diffDay) (evalCtx, route, mAuthId) . maybeT (unauthorizedI18n MsgUnauthorizedWorkflowInitiate) $ do -- @isWrite@ not included since it should make no difference regarding initiation (in the end that will always be a write) scope <- MaybeT . $cachedHereBinary rScope . runMaybeT $ fromRouteWorkflowScope rScope Entity _ WorkflowInstance{..} <- $cachedHereBinary (win, scope) . MaybeT . getBy . UniqueWorkflowInstance win $ scope ^. _DBWorkflowScope + wiGraph <- lift $ getSharedIdWorkflowGraph workflowInstanceGraph let - wiGraph :: IdWorkflowGraph - wiGraph = _DBWorkflowGraph # workflowInstanceGraph edges = do WGN{..} <- wiGraph ^.. _wgNodes . folded WorkflowGraphEdgeInitial{..} <- wgnEdges ^.. folded @@ -1434,11 +1433,9 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite -> (wwId, edges) <- memcacheAuth' (Right diffDay) (AuthCacheWorkflowWorkflowEdgeActors cID) $ do wwId <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decrypt cID WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId $ get wwId + wwGraph <- lift $ getSharedIdWorkflowGraph workflowWorkflowGraph let - wwGraph :: IdWorkflowGraph - wwGraph = _DBWorkflowGraph # workflowWorkflowGraph - wwNode = wpTo $ last workflowWorkflowState return . (wwId, ) . (Set.fromList :: _ -> Set (WorkflowRole UserId)) . foldMap toNullable $ do @@ -1455,11 +1452,9 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite -> (wwId, roles) <- memcacheAuth' (Right diffDay) (AuthCacheWorkflowWorkflowViewers cID) $ do wwId <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decrypt cID WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId $ get wwId + wwGraph <- lift $ getSharedIdWorkflowGraph workflowWorkflowGraph let - wwGraph :: IdWorkflowGraph - wwGraph = _DBWorkflowGraph # workflowWorkflowGraph - nodeViewers = do WorkflowAction{..} <- otoList workflowWorkflowState (node, WGN{..}) <- itoListOf (_wgNodes . ifolded) wwGraph @@ -1483,9 +1478,7 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite -> wwId <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decrypt wwCID WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId $ get wwId stIx <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decryptWorkflowStateIndex wwId stCID - let - wwGraph :: IdWorkflowGraph - wwGraph = _DBWorkflowGraph # workflowWorkflowGraph + wwGraph <- lift $ getSharedIdWorkflowGraph workflowWorkflowGraph act <- workflowStateIndex stIx $ _DBWorkflowState # workflowWorkflowState let cState = wpTo act @@ -1767,8 +1760,8 @@ mayViewWorkflowAction' eval mAuthId wwId WorkflowAction{..} = hoist (withReaderT WorkflowWorkflow{..} <- MaybeT . lift $ get wwId rScope <- hoist lift . toRouteWorkflowScope $ _DBWorkflowScope # workflowWorkflowScope cID <- hoist lift . catchMaybeT (Proxy @CryptoIDError) . lift $ encrypt wwId - let WorkflowGraph{..} = _DBWorkflowGraph # workflowWorkflowGraph - canonRoute = _WorkflowScopeRoute # (rScope, WorkflowWorkflowR cID WWWorkflowR) + WorkflowGraph{..} <- lift . lift $ getSharedIdWorkflowGraph workflowWorkflowGraph + let canonRoute = _WorkflowScopeRoute # (rScope, WorkflowWorkflowR cID WWWorkflowR) evalWorkflowRole'' role = lift $ is _Authorized <$> evalWorkflowRoleFor' eval mAuthId (Just wwId) role canonRoute False WorkflowNodeView{..} <- hoistMaybe $ Map.lookup wpTo wgNodes >>= wgnViewers guardM $ orM diff --git a/src/Foundation/Yesod/Middleware.hs b/src/Foundation/Yesod/Middleware.hs index 473f50de3..df288f3be 100644 --- a/src/Foundation/Yesod/Middleware.hs +++ b/src/Foundation/Yesod/Middleware.hs @@ -313,7 +313,8 @@ routeNormalizers = map (hoist (hoist liftHandler . withReaderT projectBackend) . (_, WorkflowWorkflowR cID (WWFilesR wpl _)) <- hoistMaybe $ route ^? _WorkflowScopeRoute wwId <- decrypt cID WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId . lift $ get wwId - [wpl'] <- return . filter (== wpl) . sortOn (CI.original . unWorkflowPayloadLabel) . foldMap Map.keys $ wgnPayloadView <$> wgNodes workflowWorkflowGraph + wwGraph <- lift . lift $ getSharedDBWorkflowGraph workflowWorkflowGraph + [wpl'] <- return . filter (== wpl) . sortOn (CI.original . unWorkflowPayloadLabel) . foldMap Map.keys $ wgnPayloadView <$> wgNodes wwGraph (caseChanged `on` unWorkflowPayloadLabel) wpl wpl' return $ route & typesUsing @RouteChildren @WorkflowPayloadLabel . filtered (== wpl) .~ wpl' diff --git a/src/Handler/Utils/Workflow/EdgeForm.hs b/src/Handler/Utils/Workflow/EdgeForm.hs index 1e6b033f1..d7b932e82 100644 --- a/src/Handler/Utils/Workflow/EdgeForm.hs +++ b/src/Handler/Utils/Workflow/EdgeForm.hs @@ -81,14 +81,15 @@ workflowEdgeForm mwwId mPrev = runMaybeT $ do MsgRenderer mr <- getMsgRenderer ctx' <- bitraverse (MaybeT . getEntity) (MaybeT . getEntity) mwwId - let (scope, graph) = case ctx of - Left WorkflowInstance{..} -> ( _DBWorkflowScope # workflowInstanceScope - , _DBWorkflowGraph # workflowInstanceGraph - ) - Right WorkflowWorkflow{..} -> ( _DBWorkflowScope # workflowWorkflowScope - , _DBWorkflowGraph # workflowWorkflowGraph - ) - wState = ctx ^? _Right . _workflowWorkflowState . to last . _wpTo + let (scope, sharedGraphId) = case ctx' of + Left (Entity _ WorkflowInstance{..}) -> ( _DBWorkflowScope # workflowInstanceScope + , workflowInstanceGraph + ) + Right (Entity _ WorkflowWorkflow{..}) -> ( _DBWorkflowScope # workflowWorkflowScope + , workflowWorkflowGraph + ) + graph <- lift $ getSharedIdWorkflowGraph sharedGraphId + let wState = ctx ^? _Right . _workflowWorkflowState . to last . _wpTo wPayload' = ctx ^? _Right . _workflowWorkflowState . re _DBWorkflowState ctx = bimap entityVal entityVal ctx' mAuthId <- maybeAuthId diff --git a/src/Handler/Workflow/Definition/Edit.hs b/src/Handler/Workflow/Definition/Edit.hs index 1f7dbf8cc..1967fc958 100644 --- a/src/Handler/Workflow/Definition/Edit.hs +++ b/src/Handler/Workflow/Definition/Edit.hs @@ -5,6 +5,7 @@ module Handler.Workflow.Definition.Edit ) where import Import +import Utils.Workflow import Handler.Utils import Handler.Workflow.Definition.Form @@ -29,7 +30,7 @@ postAWDEditR wds' wdn = do | Entity _ WorkflowDefinitionInstanceDescription{..} <- iDescs ] - wdfGraph <- toWorkflowGraphForm workflowDefinitionGraph + wdfGraph <- toWorkflowGraphForm =<< getSharedDBWorkflowGraph workflowDefinitionGraph return WorkflowDefinitionForm { wdfScope = workflowDefinitionScope @@ -44,9 +45,10 @@ postAWDEditR wds' wdn = do act <- formResultMaybe editRes $ \WorkflowDefinitionForm{..} -> do wdfGraph' <- fromWorkflowGraphForm wdfGraph + wdfGraph'' <- insertSharedWorkflowGraph wdfGraph' insConflict <- replaceUnique wdId WorkflowDefinition - { workflowDefinitionGraph = wdfGraph' + { workflowDefinitionGraph = wdfGraph'' , workflowDefinitionScope = wdfScope , workflowDefinitionName = wdfName , workflowDefinitionInstanceCategory = wdfInstanceCategory diff --git a/src/Handler/Workflow/Definition/Instantiate.hs b/src/Handler/Workflow/Definition/Instantiate.hs index 4f0773167..fbac35cd6 100644 --- a/src/Handler/Workflow/Definition/Instantiate.hs +++ b/src/Handler/Workflow/Definition/Instantiate.hs @@ -3,6 +3,7 @@ module Handler.Workflow.Definition.Instantiate ) where import Import +import Utils.Workflow import Handler.Utils import Handler.Utils.Workflow.Form @@ -22,9 +23,10 @@ postAWDInstantiateR wds' wdn = do & over _wisTerm unTermKey & over _wisSchool unSchoolKey & over _wisCourse (view _SqlKey) + workflowInstanceGraph <- insertSharedWorkflowGraph wifGraph' instId <- insertUnique WorkflowInstance { workflowInstanceDefinition = Just wdId - , workflowInstanceGraph = wifGraph' + , workflowInstanceGraph , workflowInstanceScope = wifScope' , workflowInstanceName = wifName , workflowInstanceCategory = wifCategory diff --git a/src/Handler/Workflow/Definition/New.hs b/src/Handler/Workflow/Definition/New.hs index 898c3b831..827986354 100644 --- a/src/Handler/Workflow/Definition/New.hs +++ b/src/Handler/Workflow/Definition/New.hs @@ -5,6 +5,7 @@ module Handler.Workflow.Definition.New import Import import Handler.Utils import Handler.Workflow.Definition.Form +import Utils.Workflow getAdminWorkflowDefinitionNewR, postAdminWorkflowDefinitionNewR :: Handler Html @@ -15,9 +16,10 @@ postAdminWorkflowDefinitionNewR = do act <- formResultMaybe newRes $ \WorkflowDefinitionForm{ .. } -> do wdfGraph' <- fromWorkflowGraphForm wdfGraph + workflowDefinitionGraph <- insertSharedWorkflowGraph wdfGraph' insRes <- insertUnique WorkflowDefinition - { workflowDefinitionGraph = wdfGraph' + { workflowDefinitionGraph , workflowDefinitionScope = wdfScope , workflowDefinitionName = wdfName , workflowDefinitionInstanceCategory = wdfInstanceCategory diff --git a/src/Handler/Workflow/Instance/Form.hs b/src/Handler/Workflow/Instance/Form.hs index 549158565..246ac38cf 100644 --- a/src/Handler/Workflow/Instance/Form.hs +++ b/src/Handler/Workflow/Instance/Form.hs @@ -64,7 +64,7 @@ workflowInstanceForm forcedDefId template = renderWForm FormStandard $ do [ (workflowDefinitionInstanceDescriptionLanguage, (workflowDefinitionInstanceDescriptionTitle, workflowDefinitionInstanceDescriptionDescription)) | Entity _ WorkflowDefinitionInstanceDescription{..} <- descs ] - defGraph <- for defEnt $ toWorkflowGraphForm . workflowDefinitionGraph . entityVal + defGraph <- for defEnt $ toWorkflowGraphForm <=< lift . lift . getSharedDBWorkflowGraph . workflowDefinitionGraph . entityVal wifScopeRes <- aFormToWForm . hoistAForm lift $ workflowInstanceScopeForm (workflowDefinitionScope . entityVal <$> defEnt) (fslI MsgWorkflowScope) (wifScope <$> template) wifNameRes <- wreq ciField (fslI MsgWorkflowInstanceName) (fmap wifName template <|> fmap (workflowDefinitionName . entityVal) defEnt) diff --git a/src/Handler/Workflow/Instance/Initiate.hs b/src/Handler/Workflow/Instance/Initiate.hs index f9f5677b2..fda1576d6 100644 --- a/src/Handler/Workflow/Instance/Initiate.hs +++ b/src/Handler/Workflow/Instance/Initiate.hs @@ -45,7 +45,8 @@ workflowInstanceInitiateR rScope win = do ((edgeRes, edgeView), edgeEnc) <- liftHandler . runFormPost $ renderAForm FormStandard edgeForm edgeAct <- formResultMaybe edgeRes $ \edgeRes' -> do - workflowWorkflowState <- view _DBWorkflowState <$> followEdge (_DBWorkflowGraph # workflowInstanceGraph) edgeRes' Nothing + wGraph <- getSharedIdWorkflowGraph workflowInstanceGraph + workflowWorkflowState <- view _DBWorkflowState <$> followEdge wGraph edgeRes' Nothing wwId <- insert WorkflowWorkflow { workflowWorkflowInstance = Just wiId diff --git a/src/Handler/Workflow/Instance/New.hs b/src/Handler/Workflow/Instance/New.hs index c02f6d2e8..e715c62ba 100644 --- a/src/Handler/Workflow/Instance/New.hs +++ b/src/Handler/Workflow/Instance/New.hs @@ -25,13 +25,14 @@ adminWorkflowInstanceNewR wdId = do act <- formResultMaybe instRes $ \WorkflowInstanceForm{..} -> do wifGraph' <- fromWorkflowGraphForm wifGraph + workflowInstanceGraph <- insertSharedWorkflowGraph wifGraph' let wifScope' = wifScope & over _wisTerm unTermKey & over _wisSchool unSchoolKey & over _wisCourse (view _SqlKey) instId <- insertUnique WorkflowInstance { workflowInstanceDefinition = wdId - , workflowInstanceGraph = wifGraph' + , workflowInstanceGraph , workflowInstanceScope = wifScope' , workflowInstanceName = wifName , workflowInstanceCategory = wifCategory diff --git a/src/Handler/Workflow/Workflow/List.hs b/src/Handler/Workflow/Workflow/List.hs index c9333ccbb..97d56fb28 100644 --- a/src/Handler/Workflow/Workflow/List.hs +++ b/src/Handler/Workflow/Workflow/List.hs @@ -231,8 +231,8 @@ workflowWorkflowList (title, heading) WWListColumns{..} sqlPred = do MaybeT $ selectWorkflowInstanceDescription wiId cID <- encrypt wwId rScope <- lift . runMaybeT . toRouteWorkflowScope $ _DBWorkflowScope # workflowWorkflowScope - let WorkflowGraph{..} = ww ^. _entityVal . _workflowWorkflowGraph . from _DBWorkflowGraph - hasWorkflowRole' :: WorkflowRole UserId -> DB Bool + WorkflowGraph{..} <- lift . getSharedIdWorkflowGraph $ ww ^. _entityVal . _workflowWorkflowGraph + let hasWorkflowRole' :: WorkflowRole UserId -> DB Bool hasWorkflowRole' role = maybeT (return False) $ do rScope' <- hoistMaybe rScope let canonRoute = _WorkflowScopeRoute # (rScope', WorkflowWorkflowR cID WWWorkflowR) @@ -360,6 +360,8 @@ workflowWorkflowList (title, heading) WWListColumns{..} sqlPred = do jwiScope <- toRouteWorkflowScope $ _DBWorkflowScope # workflowInstanceScope let jwiName = workflowInstanceName return JsonWorkflowInstance{..} + let Entity _ WorkflowWorkflow{..} = res ^. resultWorkflowWorkflow + WorkflowGraph{..} <- getSharedIdWorkflowGraph workflowWorkflowGraph (fmap getLast -> wState) <- let go :: forall m. ( MonadHandler m @@ -410,9 +412,7 @@ workflowWorkflowList (title, heading) WWListColumns{..} sqlPred = do tell . Just $ Last (stCID, nTo, aUser, wpTime, payload) - Entity _ WorkflowWorkflow{..} = res ^. resultWorkflowWorkflow wState = review _DBWorkflowState workflowWorkflowState - WorkflowGraph{..} = _DBWorkflowGraph # workflowWorkflowGraph in runConduit $ sourceWorkflowActionInfos wwId wState .| execWriterC (C.mapM_ go) let jwwLastAction = wState <&> \(jwaIx, jwaTo, jwaUser, jwaTime, _) -> JsonWorkflowAction{..} diff --git a/src/Handler/Workflow/Workflow/Workflow.hs b/src/Handler/Workflow/Workflow/Workflow.hs index f63f95e43..430d8aa59 100644 --- a/src/Handler/Workflow/Workflow/Workflow.hs +++ b/src/Handler/Workflow/Workflow/Workflow.hs @@ -83,8 +83,8 @@ workflowR rScope cID = do WorkflowWorkflow{..} <- get404 wwId maybeT notFound . void . assertM (== review _DBWorkflowScope workflowWorkflowScope) $ fromRouteWorkflowScope rScope mEdgeForm <- workflowEdgeForm (Right wwId) Nothing + wGraph <- getSharedIdWorkflowGraph workflowWorkflowGraph let canonRoute = _WorkflowScopeRoute # (rScope, WorkflowWorkflowR cID WWWorkflowR) - wGraph = _DBWorkflowGraph # workflowWorkflowGraph mEdge <- for mEdgeForm $ \edgeForm -> do ((edgeRes, edgeView), edgeEnc) <- liftHandler . runFormPost $ renderAForm FormStandard edgeForm diff --git a/src/Jobs.hs b/src/Jobs.hs index 183df06e0..bb06b659c 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -290,7 +290,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|] - manageOffloadHandler :: (ReaderT UniWorX m JobOffloadHandler) -> STM (ContT () m ()) + manageOffloadHandler :: ReaderT UniWorX m JobOffloadHandler -> STM (ContT () m ()) manageOffloadHandler spawn = do shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard $ not shouldTerminate' diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index e100c571d..33de12763 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -73,9 +73,7 @@ fileReferences (E.just -> fHash) ] workflowFileReferences :: MonadResource m => ConduitT () FileContentReference (SqlPersistT m) () -workflowFileReferences = mconcat [ E.selectSource (E.from $ pure . (E.^. WorkflowDefinitionGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) - , E.selectSource (E.from $ pure . (E.^. WorkflowInstanceGraph )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) - , E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowGraph )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) +workflowFileReferences = mconcat [ E.selectSource (E.from $ pure . (E.^. SharedWorkflowGraphGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) , E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowState )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) ] diff --git a/src/Jobs/Offload.hs b/src/Jobs/Offload.hs index 00d3291cd..2991e03f2 100644 --- a/src/Jobs/Offload.hs +++ b/src/Jobs/Offload.hs @@ -47,7 +47,7 @@ mkJobOffloadHandler dbConf jMode let getInput = do n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn - if | notificationPid == myPid || notificationChannel /= (encodeUtf8 jobOffloadChannel) -> getInput + if | notificationPid == myPid || notificationChannel /= encodeUtf8 jobOffloadChannel -> getInput | otherwise -> return n getOutput = atomically $ do jQueue <- readTVar jobOffloadOutgoing diff --git a/src/Model/Migration/Definitions.hs b/src/Model/Migration/Definitions.hs index 8fc1ae10d..baad2c3ec 100644 --- a/src/Model/Migration/Definitions.hs +++ b/src/Model/Migration/Definitions.hs @@ -47,6 +47,8 @@ import Data.Time.Format import qualified Data.Time.Zones as TZ +import Utils.Workflow + data ManualMigration = Migration20180813SimplifyUserTheme @@ -97,6 +99,7 @@ data ManualMigration | Migration20201106StoredMarkup | Migration20201119RoomTypes | Migration20210115ExamPartsFrom + | Migration20210201SharedWorkflowGraphs deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite) @@ -968,6 +971,55 @@ customMigrations = mapF $ \case migrateExam _ = return () in runConduit $ getExam .| C.mapM_ migrateExam + Migration20210201SharedWorkflowGraphs -> do + unlessM (tableExists "shared_workflow_graph") + [executeQQ|CREATE TABLE "shared_workflow_graph" ("hash" bytea primary key, "graph" jsonb not null)|] + + whenM (tableExists "workflow_definition") $ do + [executeQQ|ALTER TABLE "workflow_definition" ADD COLUMN "graph_id" bytea references shared_workflow_graph(hash)|] + let getDefinitions = [queryQQ|SELECT "id", "graph" FROM "workflow_definition"|] + migrateDefinition [ fromPersistValue -> Right (wdId :: WorkflowDefinitionId), fromPersistValue -> Right (graph :: DBWorkflowGraph) ] = do + swgId <- insertSharedWorkflowGraph graph + [executeQQ|UPDATE "workflow_definition" SET "graph_id" = #{swgId} WHERE "id" = #{wdId}|] + migrateDefinition _ = return () + in runConduit $ getDefinitions .| C.mapM_ migrateDefinition + + [executeQQ| + ALTER TABLE "workflow_definition" DROP COLUMN "graph"; + ALTER TABLE "workflow_definition" ALTER COLUMN "graph_id" SET not null; + ALTER TABLE "workflow_definition" RENAME COLUMN "graph_id" TO "graph"; + |] + + whenM (tableExists "workflow_instance") $ do + [executeQQ|ALTER TABLE "workflow_instance" ADD COLUMN "graph_id" bytea references shared_workflow_graph(hash)|] + let getInstances = [queryQQ|SELECT "id", "graph" FROM "workflow_instance"|] + migrateInstance [ fromPersistValue -> Right (wiId :: WorkflowInstanceId), fromPersistValue -> Right (graph :: DBWorkflowGraph) ] = do + swgId <- insertSharedWorkflowGraph graph + [executeQQ|UPDATE "workflow_instance" SET "graph_id" = #{swgId} WHERE "id" = #{wiId}|] + migrateInstance _ = return () + in runConduit $ getInstances .| C.mapM_ migrateInstance + + [executeQQ| + ALTER TABLE "workflow_instance" DROP COLUMN "graph"; + ALTER TABLE "workflow_instance" ALTER COLUMN "graph_id" SET not null; + ALTER TABLE "workflow_instance" RENAME COLUMN "graph_id" TO "graph"; + |] + + whenM (tableExists "workflow_workflow") $ do + [executeQQ|ALTER TABLE "workflow_workflow" ADD COLUMN "graph_id" bytea references shared_workflow_graph(hash)|] + let getWorkflows = [queryQQ|SELECT "id", "graph" FROM "workflow_workflow"|] + migrateWorkflow [ fromPersistValue -> Right (wwId :: WorkflowWorkflowId), fromPersistValue -> Right (graph :: DBWorkflowGraph) ] = do + swgId <- insertSharedWorkflowGraph graph + [executeQQ|UPDATE "workflow_workflow" SET "graph_id" = #{swgId} WHERE "id" = #{wwId}|] + migrateWorkflow _ = return () + in runConduit $ getWorkflows .| C.mapM_ migrateWorkflow + + [executeQQ| + ALTER TABLE "workflow_workflow" DROP COLUMN "graph"; + ALTER TABLE "workflow_workflow" ALTER COLUMN "graph_id" SET not null; + ALTER TABLE "workflow_workflow" RENAME COLUMN "graph_id" TO "graph"; + |] + tableExists :: MonadIO m => Text -> ReaderT SqlBackend m Bool tableExists table = do diff --git a/src/Model/Types/Workflow.hs b/src/Model/Types/Workflow.hs index ce27b4f1a..cdea870ed 100644 --- a/src/Model/Types/Workflow.hs +++ b/src/Model/Types/Workflow.hs @@ -1,7 +1,7 @@ {-# LANGUAGE UndecidableInstances #-} module Model.Types.Workflow - ( WorkflowGraph(..) + ( WorkflowGraph(..), WorkflowGraphReference(..) , WorkflowGraphNodeLabel , WorkflowGraphNode(..) , WorkflowNodeView(..) @@ -37,6 +37,8 @@ import Model.Types.Security (AuthDNF, PredDNF) import Model.Types.File (FileContentReference, FileFieldUserOption, FileField, _fieldAdditionalFiles, FileReferenceTitleMapConvertible(..)) import Database.Persist.Sql (PersistFieldSql(..)) +import Web.HttpApiData (ToHttpApiData, FromHttpApiData) +import Data.ByteArray (ByteArrayAccess) import Data.Maybe (fromJust) @@ -77,6 +79,15 @@ deriving instance (Eq fileid, Eq userid, Typeable fileid, Typeable userid, Eq (F deriving instance (Ord fileid, Ord userid, Typeable fileid, Typeable userid, Ord (FileField fileid)) => Ord (WorkflowGraph fileid userid) deriving instance (Show fileid, Show userid, Show (FileField fileid)) => Show (WorkflowGraph fileid userid) +newtype WorkflowGraphReference = WorkflowGraphReference (Digest SHA3_256) + deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable) + deriving newtype ( PersistField, PersistFieldSql + , PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON + , Hashable, NFData + , ByteArrayAccess + , Binary + ) + ----- WORKFLOW GRAPH: NODES ----- newtype WorkflowGraphNodeLabel = WorkflowGraphNodeLabel { unWorkflowGraphNodeLabel :: CI Text } @@ -1051,3 +1062,7 @@ instance Binary WorkflowScope' instance (Binary termid, Binary schoolid, Binary courseid) => Binary (WorkflowScope termid schoolid courseid) instance Binary userid => Binary (WorkflowRole userid) + +----- TH Jail ----- + +makeWrapped ''WorkflowGraphReference diff --git a/src/Utils/Workflow.hs b/src/Utils/Workflow.hs index 8facd298d..5454c150a 100644 --- a/src/Utils/Workflow.hs +++ b/src/Utils/Workflow.hs @@ -10,6 +10,8 @@ module Utils.Workflow , decryptWorkflowStateIndex, encryptWorkflowStateIndex , isTopWorkflowScope, isTopWorkflowScopeSql , selectWorkflowInstanceDescription + , SharedWorkflowGraphException(..), getSharedDBWorkflowGraph, getSharedIdWorkflowGraph + , insertSharedWorkflowGraph ) where import Import.NoFoundation @@ -19,11 +21,15 @@ import qualified Crypto.MAC.KMAC as Crypto import qualified Data.ByteArray as BA import qualified Data.Binary as Binary import Crypto.Hash.Algorithms (SHAKE256) +import qualified Crypto.Hash as Crypto import Language.Haskell.TH (nameBase) +import qualified Data.Aeson as Aeson import qualified Database.Esqueleto as E import qualified Database.Esqueleto.Utils as E +{-# ANN module ("HLint: ignore Use newtype instead of data" :: String) #-} + type RouteWorkflowScope = WorkflowScope TermId SchoolId (TermId, SchoolId, CourseShorthand) type DBWorkflowScope = WorkflowScope TermIdentifier SchoolShorthand SqlBackendKey @@ -130,3 +136,35 @@ selectWorkflowInstanceDescription wiId = withReaderT (projectBackend @SqlReadBac return $ workflowInstanceDescription E.^. WorkflowInstanceDescriptionLanguage descLang <- traverse selectLanguage . nonEmpty $ E.unValue <$> descLangs fmap join . for descLang $ \descLang' -> getBy $ UniqueWorkflowInstanceDescription wiId descLang' + + +data SharedWorkflowGraphException + = SharedWorkflowGraphNotFound SharedWorkflowGraphId + deriving (Eq, Ord, Read, Show, Generic, Typeable) + deriving anyclass (Exception) + +getSharedDBWorkflowGraph :: ( MonadHandler m + , BackendCompatible SqlReadBackend backend + ) + => SharedWorkflowGraphId + -> ReaderT backend m DBWorkflowGraph +getSharedDBWorkflowGraph swgId = $cachedHereBinary swgId . withReaderT (projectBackend @SqlReadBackend) $ do + maybe (liftHandler . throwM $ SharedWorkflowGraphNotFound swgId) (return . sharedWorkflowGraphGraph) =<< get swgId + +getSharedIdWorkflowGraph :: ( MonadHandler m + , BackendCompatible SqlReadBackend backend + ) + => SharedWorkflowGraphId + -> ReaderT backend m IdWorkflowGraph +getSharedIdWorkflowGraph = fmap (review _DBWorkflowGraph) . getSharedDBWorkflowGraph + +insertSharedWorkflowGraph :: ( MonadIO m + , BackendCompatible SqlBackend backend + ) + => DBWorkflowGraph + -> ReaderT backend m SharedWorkflowGraphId +insertSharedWorkflowGraph graph = withReaderT (projectBackend @SqlBackend) $ + swgId' <$ repsert swgId' (SharedWorkflowGraph swgId graph) + where + swgId = WorkflowGraphReference . Crypto.hashlazy $ Aeson.encode graph + swgId' = SharedWorkflowGraphKey swgId diff --git a/test/Database/Fill.hs b/test/Database/Fill.hs index 654dc024d..b254ca37b 100644 --- a/test/Database/Fill.hs +++ b/test/Database/Fill.hs @@ -34,6 +34,7 @@ import qualified Data.Conduit.Combinators as C import qualified Data.Yaml as Yaml +import Utils.Workflow import Utils.Workflow.Lint import System.Directory (getModificationTime) @@ -1330,8 +1331,9 @@ fillDb = do displayLinterIssue = liftIO . hPutStrLn stderr . displayException handleSql displayLinterIssue $ do - workflowDefinitionGraph <- Yaml.decodeFileThrow $ testdataDir "workflows" "theses.yaml" - for_ (lintWorkflowGraph workflowDefinitionGraph) $ mapM_ throwM + graph <- Yaml.decodeFileThrow $ testdataDir "workflows" "theses.yaml" + for_ (lintWorkflowGraph graph) $ mapM_ throwM + workflowDefinitionGraph <- insertSharedWorkflowGraph graph let thesesWorkflowDef = WorkflowDefinition{..} where workflowDefinitionInstanceCategory = Just "theses" @@ -1366,8 +1368,9 @@ fillDb = do } handleSql displayLinterIssue $ do - workflowDefinitionGraph <- Yaml.decodeFileThrow $ testdataDir "workflows" "recognitions-ifi.yaml" - for_ (lintWorkflowGraph workflowDefinitionGraph) $ mapM_ throwM + graph <- Yaml.decodeFileThrow $ testdataDir "workflows" "recognitions-ifi.yaml" + for_ (lintWorkflowGraph graph) $ mapM_ throwM + workflowDefinitionGraph <- insertSharedWorkflowGraph graph let recognitionsWorkflowDef = WorkflowDefinition{..} where workflowDefinitionInstanceCategory = Just "recognitions-ifi" From b960929da4de96fd2a3de0258048671fedef3408 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 1 Feb 2021 17:58:19 +0100 Subject: [PATCH 4/4] chore(release): 24.0.0 --- CHANGELOG.md | 16 ++++++++++++++++ package-lock.json | 2 +- package.json | 2 +- package.yaml | 2 +- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b095bd0d..d2e87c15c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +## [24.0.0](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/compare/v23.7.0...v24.0.0) (2021-02-01) + + +### ⚠ BREAKING CHANGES + +* **jobs:** Job offloading + +### Features + +* **jobs:** batch job offloading ([09fb26f](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/commit/09fb26f1a892feba32185166223f8f95611ea9ef)) + + +### Bug Fixes + +* **workflows:** don't cache instance-list empty for correctness ([cb1e715](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/commit/cb1e715e9b2da2f5ac0bd03b636de0f961307efd)) + ## [23.7.0](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/compare/v23.6.0...v23.7.0) (2021-01-27) diff --git a/package-lock.json b/package-lock.json index d85e4e241..5d068e53e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "uni2work", - "version": "23.7.0", + "version": "24.0.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 5522e394c..cbb2e83ed 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "uni2work", - "version": "23.7.0", + "version": "24.0.0", "description": "", "keywords": [], "author": "", diff --git a/package.yaml b/package.yaml index e5aabb08a..a4dc1bd10 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: uniworx -version: 23.7.0 +version: 24.0.0 dependencies: - base