Merge branch 'master' into profile

This commit is contained in:
Gregor Kleen 2021-02-01 19:37:02 +01:00
commit 90e3965b92
32 changed files with 763 additions and 487 deletions

View File

@ -2,6 +2,22 @@
All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.
## [24.0.0](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/compare/v23.7.0...v24.0.0) (2021-02-01)
### ⚠ BREAKING CHANGES
* **jobs:** Job offloading
### Features
* **jobs:** batch job offloading ([09fb26f](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/commit/09fb26f1a892feba32185166223f8f95611ea9ef))
### Bug Fixes
* **workflows:** don't cache instance-list empty for correctness ([cb1e715](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/commit/cb1e715e9b2da2f5ac0bd03b636de0f961307efd))
## [23.7.0](https://gitlab2.rz.ifi.lmu.de/uni2work/uni2work/compare/v23.6.0...v23.7.0) (2021-01-27)

View File

@ -1,5 +1,10 @@
WorkflowDefinition
SharedWorkflowGraph
hash WorkflowGraphReference
graph (WorkflowGraph FileReference SqlBackendKey) -- UserId
Primary hash
WorkflowDefinition
graph SharedWorkflowGraphId
scope WorkflowScope'
name WorkflowDefinitionName
instanceCategory WorkflowInstanceCategory Maybe
@ -21,7 +26,7 @@ WorkflowDefinitionInstanceDescription
WorkflowInstance
definition WorkflowDefinitionId Maybe
graph (WorkflowGraph FileReference SqlBackendKey) -- UserId
graph SharedWorkflowGraphId
scope (WorkflowScope TermIdentifier SchoolShorthand SqlBackendKey) -- TermId, SchoolId, CourseId
name WorkflowInstanceName
category WorkflowInstanceCategory Maybe
@ -37,5 +42,5 @@ WorkflowInstanceDescription
WorkflowWorkflow
instance WorkflowInstanceId Maybe
scope (WorkflowScope TermIdentifier SchoolShorthand SqlBackendKey) -- TermId, SchoolId, CourseId
graph (WorkflowGraph FileReference SqlBackendKey) -- UserId
graph SharedWorkflowGraphId
state (WorkflowState FileReference SqlBackendKey) -- UserId

2
package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "uni2work",
"version": "23.7.0",
"version": "24.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {

View File

@ -1,6 +1,6 @@
{
"name": "uni2work",
"version": "23.7.0",
"version": "24.0.0",
"description": "",
"keywords": [],
"author": "",

View File

@ -1,5 +1,5 @@
name: uniworx
version: 23.7.0
version: 24.0.0
dependencies:
- base

View File

@ -1416,9 +1416,8 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite ->
wInitiate win rScope = selectLanguageI18n <=< $memcacheAuthHere' (Right diffDay) (evalCtx, route, mAuthId) . maybeT (unauthorizedI18n MsgUnauthorizedWorkflowInitiate) $ do -- @isWrite@ not included since it should make no difference regarding initiation (in the end that will always be a write)
scope <- MaybeT . $cachedHereBinary rScope . runMaybeT $ fromRouteWorkflowScope rScope
Entity _ WorkflowInstance{..} <- $cachedHereBinary (win, scope) . MaybeT . getBy . UniqueWorkflowInstance win $ scope ^. _DBWorkflowScope
wiGraph <- lift $ getSharedIdWorkflowGraph workflowInstanceGraph
let
wiGraph :: IdWorkflowGraph
wiGraph = _DBWorkflowGraph # workflowInstanceGraph
edges = do
WGN{..} <- wiGraph ^.. _wgNodes . folded
WorkflowGraphEdgeInitial{..} <- wgnEdges ^.. folded
@ -1434,11 +1433,9 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite ->
(wwId, edges) <- memcacheAuth' (Right diffDay) (AuthCacheWorkflowWorkflowEdgeActors cID) $ do
wwId <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decrypt cID
WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId $ get wwId
wwGraph <- lift $ getSharedIdWorkflowGraph workflowWorkflowGraph
let
wwGraph :: IdWorkflowGraph
wwGraph = _DBWorkflowGraph # workflowWorkflowGraph
wwNode = wpTo $ last workflowWorkflowState
return . (wwId, ) . (Set.fromList :: _ -> Set (WorkflowRole UserId)) . foldMap toNullable $ do
@ -1455,11 +1452,9 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite ->
(wwId, roles) <- memcacheAuth' (Right diffDay) (AuthCacheWorkflowWorkflowViewers cID) $ do
wwId <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decrypt cID
WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId $ get wwId
wwGraph <- lift $ getSharedIdWorkflowGraph workflowWorkflowGraph
let
wwGraph :: IdWorkflowGraph
wwGraph = _DBWorkflowGraph # workflowWorkflowGraph
nodeViewers = do
WorkflowAction{..} <- otoList workflowWorkflowState
(node, WGN{..}) <- itoListOf (_wgNodes . ifolded) wwGraph
@ -1483,9 +1478,7 @@ tagAccessPredicate AuthWorkflow = APDB $ \evalCtx eval' mAuthId route isWrite ->
wwId <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decrypt wwCID
WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId $ get wwId
stIx <- catchIfMaybeT (const True :: CryptoIDError -> Bool) $ decryptWorkflowStateIndex wwId stCID
let
wwGraph :: IdWorkflowGraph
wwGraph = _DBWorkflowGraph # workflowWorkflowGraph
wwGraph <- lift $ getSharedIdWorkflowGraph workflowWorkflowGraph
act <- workflowStateIndex stIx $ _DBWorkflowState # workflowWorkflowState
let
cState = wpTo act
@ -1767,8 +1760,8 @@ mayViewWorkflowAction' eval mAuthId wwId WorkflowAction{..} = hoist (withReaderT
WorkflowWorkflow{..} <- MaybeT . lift $ get wwId
rScope <- hoist lift . toRouteWorkflowScope $ _DBWorkflowScope # workflowWorkflowScope
cID <- hoist lift . catchMaybeT (Proxy @CryptoIDError) . lift $ encrypt wwId
let WorkflowGraph{..} = _DBWorkflowGraph # workflowWorkflowGraph
canonRoute = _WorkflowScopeRoute # (rScope, WorkflowWorkflowR cID WWWorkflowR)
WorkflowGraph{..} <- lift . lift $ getSharedIdWorkflowGraph workflowWorkflowGraph
let canonRoute = _WorkflowScopeRoute # (rScope, WorkflowWorkflowR cID WWWorkflowR)
evalWorkflowRole'' role = lift $ is _Authorized <$> evalWorkflowRoleFor' eval mAuthId (Just wwId) role canonRoute False
WorkflowNodeView{..} <- hoistMaybe $ Map.lookup wpTo wgNodes >>= wgnViewers
guardM $ orM

View File

@ -313,7 +313,8 @@ routeNormalizers = map (hoist (hoist liftHandler . withReaderT projectBackend) .
(_, WorkflowWorkflowR cID (WWFilesR wpl _)) <- hoistMaybe $ route ^? _WorkflowScopeRoute
wwId <- decrypt cID
WorkflowWorkflow{..} <- MaybeT . $cachedHereBinary wwId . lift $ get wwId
[wpl'] <- return . filter (== wpl) . sortOn (CI.original . unWorkflowPayloadLabel) . foldMap Map.keys $ wgnPayloadView <$> wgNodes workflowWorkflowGraph
wwGraph <- lift . lift $ getSharedDBWorkflowGraph workflowWorkflowGraph
[wpl'] <- return . filter (== wpl) . sortOn (CI.original . unWorkflowPayloadLabel) . foldMap Map.keys $ wgnPayloadView <$> wgNodes wwGraph
(caseChanged `on` unWorkflowPayloadLabel) wpl wpl'
return $ route
& typesUsing @RouteChildren @WorkflowPayloadLabel . filtered (== wpl) .~ wpl'

View File

@ -17,6 +17,8 @@ import qualified Data.Text.Lazy.Builder as Text.Builder
import qualified Data.HashSet as HashSet
import qualified Data.HashMap.Strict as HashMap
import qualified Data.UUID as UUID
deriveJSON defaultOptions
{ constructorTagModifier = camelToPathPiece' 1
@ -32,9 +34,12 @@ getAdminCrontabR = do
let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _3 . _MatchNone)
instanceId <- getsYesod appInstanceID
selectRep $ do
provideRep $ do
crontabBearer <- runMaybeT . hoist runDB $ do
guardM $ hasGlobalGetParam GetGenerateToken
uid <- MaybeT maybeAuthId
guardM . lift . existsBy $ UniqueUserGroupMember UserGroupCrontab uid
@ -49,6 +54,10 @@ getAdminCrontabR = do
<section>
<pre .token>
#{toPathPiece t}
<section>
<dl .deflist>
<dt .deflist__dt>_{MsgInstanceId}
<dd .deflist__dd .uuid>#{UUID.toText instanceId}
<section>
$maybe (genTime, crontab) <- mCrontab
<p>

View File

@ -26,6 +26,7 @@ getMetricsR = selectRep $ do
samples <- sortBy metricSort <$> collectMetrics
metricsBearer <- runMaybeT . hoist runDB $ do
guardM $ hasGlobalGetParam GetGenerateToken
uid <- MaybeT maybeAuthId
guardM . lift . existsBy $ UniqueUserGroupMember UserGroupMetrics uid

View File

@ -81,14 +81,15 @@ workflowEdgeForm mwwId mPrev = runMaybeT $ do
MsgRenderer mr <- getMsgRenderer
ctx' <- bitraverse (MaybeT . getEntity) (MaybeT . getEntity) mwwId
let (scope, graph) = case ctx of
Left WorkflowInstance{..} -> ( _DBWorkflowScope # workflowInstanceScope
, _DBWorkflowGraph # workflowInstanceGraph
)
Right WorkflowWorkflow{..} -> ( _DBWorkflowScope # workflowWorkflowScope
, _DBWorkflowGraph # workflowWorkflowGraph
)
wState = ctx ^? _Right . _workflowWorkflowState . to last . _wpTo
let (scope, sharedGraphId) = case ctx' of
Left (Entity _ WorkflowInstance{..}) -> ( _DBWorkflowScope # workflowInstanceScope
, workflowInstanceGraph
)
Right (Entity _ WorkflowWorkflow{..}) -> ( _DBWorkflowScope # workflowWorkflowScope
, workflowWorkflowGraph
)
graph <- lift $ getSharedIdWorkflowGraph sharedGraphId
let wState = ctx ^? _Right . _workflowWorkflowState . to last . _wpTo
wPayload' = ctx ^? _Right . _workflowWorkflowState . re _DBWorkflowState
ctx = bimap entityVal entityVal ctx'
mAuthId <- maybeAuthId

View File

@ -5,6 +5,7 @@ module Handler.Workflow.Definition.Edit
) where
import Import
import Utils.Workflow
import Handler.Utils
import Handler.Workflow.Definition.Form
@ -29,7 +30,7 @@ postAWDEditR wds' wdn = do
| Entity _ WorkflowDefinitionInstanceDescription{..} <- iDescs
]
wdfGraph <- toWorkflowGraphForm workflowDefinitionGraph
wdfGraph <- toWorkflowGraphForm =<< getSharedDBWorkflowGraph workflowDefinitionGraph
return WorkflowDefinitionForm
{ wdfScope = workflowDefinitionScope
@ -44,9 +45,10 @@ postAWDEditR wds' wdn = do
act <- formResultMaybe editRes $ \WorkflowDefinitionForm{..} -> do
wdfGraph' <- fromWorkflowGraphForm wdfGraph
wdfGraph'' <- insertSharedWorkflowGraph wdfGraph'
insConflict <- replaceUnique wdId WorkflowDefinition
{ workflowDefinitionGraph = wdfGraph'
{ workflowDefinitionGraph = wdfGraph''
, workflowDefinitionScope = wdfScope
, workflowDefinitionName = wdfName
, workflowDefinitionInstanceCategory = wdfInstanceCategory

View File

@ -3,6 +3,7 @@ module Handler.Workflow.Definition.Instantiate
) where
import Import
import Utils.Workflow
import Handler.Utils
import Handler.Utils.Workflow.Form
@ -22,9 +23,10 @@ postAWDInstantiateR wds' wdn = do
& over _wisTerm unTermKey
& over _wisSchool unSchoolKey
& over _wisCourse (view _SqlKey)
workflowInstanceGraph <- insertSharedWorkflowGraph wifGraph'
instId <- insertUnique WorkflowInstance
{ workflowInstanceDefinition = Just wdId
, workflowInstanceGraph = wifGraph'
, workflowInstanceGraph
, workflowInstanceScope = wifScope'
, workflowInstanceName = wifName
, workflowInstanceCategory = wifCategory

View File

@ -5,6 +5,7 @@ module Handler.Workflow.Definition.New
import Import
import Handler.Utils
import Handler.Workflow.Definition.Form
import Utils.Workflow
getAdminWorkflowDefinitionNewR, postAdminWorkflowDefinitionNewR :: Handler Html
@ -15,9 +16,10 @@ postAdminWorkflowDefinitionNewR = do
act <- formResultMaybe newRes $ \WorkflowDefinitionForm{ .. } -> do
wdfGraph' <- fromWorkflowGraphForm wdfGraph
workflowDefinitionGraph <- insertSharedWorkflowGraph wdfGraph'
insRes <- insertUnique WorkflowDefinition
{ workflowDefinitionGraph = wdfGraph'
{ workflowDefinitionGraph
, workflowDefinitionScope = wdfScope
, workflowDefinitionName = wdfName
, workflowDefinitionInstanceCategory = wdfInstanceCategory

View File

@ -64,7 +64,7 @@ workflowInstanceForm forcedDefId template = renderWForm FormStandard $ do
[ (workflowDefinitionInstanceDescriptionLanguage, (workflowDefinitionInstanceDescriptionTitle, workflowDefinitionInstanceDescriptionDescription))
| Entity _ WorkflowDefinitionInstanceDescription{..} <- descs
]
defGraph <- for defEnt $ toWorkflowGraphForm . workflowDefinitionGraph . entityVal
defGraph <- for defEnt $ toWorkflowGraphForm <=< lift . lift . getSharedDBWorkflowGraph . workflowDefinitionGraph . entityVal
wifScopeRes <- aFormToWForm . hoistAForm lift $ workflowInstanceScopeForm (workflowDefinitionScope . entityVal <$> defEnt) (fslI MsgWorkflowScope) (wifScope <$> template)
wifNameRes <- wreq ciField (fslI MsgWorkflowInstanceName) (fmap wifName template <|> fmap (workflowDefinitionName . entityVal) defEnt)

View File

@ -45,7 +45,8 @@ workflowInstanceInitiateR rScope win = do
((edgeRes, edgeView), edgeEnc) <- liftHandler . runFormPost $ renderAForm FormStandard edgeForm
edgeAct <- formResultMaybe edgeRes $ \edgeRes' -> do
workflowWorkflowState <- view _DBWorkflowState <$> followEdge (_DBWorkflowGraph # workflowInstanceGraph) edgeRes' Nothing
wGraph <- getSharedIdWorkflowGraph workflowInstanceGraph
workflowWorkflowState <- view _DBWorkflowState <$> followEdge wGraph edgeRes' Nothing
wwId <- insert WorkflowWorkflow
{ workflowWorkflowInstance = Just wiId

View File

@ -25,13 +25,14 @@ adminWorkflowInstanceNewR wdId = do
act <- formResultMaybe instRes $ \WorkflowInstanceForm{..} -> do
wifGraph' <- fromWorkflowGraphForm wifGraph
workflowInstanceGraph <- insertSharedWorkflowGraph wifGraph'
let wifScope' = wifScope
& over _wisTerm unTermKey
& over _wisSchool unSchoolKey
& over _wisCourse (view _SqlKey)
instId <- insertUnique WorkflowInstance
{ workflowInstanceDefinition = wdId
, workflowInstanceGraph = wifGraph'
, workflowInstanceGraph
, workflowInstanceScope = wifScope'
, workflowInstanceName = wifName
, workflowInstanceCategory = wifCategory

View File

@ -231,8 +231,8 @@ workflowWorkflowList (title, heading) WWListColumns{..} sqlPred = do
MaybeT $ selectWorkflowInstanceDescription wiId
cID <- encrypt wwId
rScope <- lift . runMaybeT . toRouteWorkflowScope $ _DBWorkflowScope # workflowWorkflowScope
let WorkflowGraph{..} = ww ^. _entityVal . _workflowWorkflowGraph . from _DBWorkflowGraph
hasWorkflowRole' :: WorkflowRole UserId -> DB Bool
WorkflowGraph{..} <- lift . getSharedIdWorkflowGraph $ ww ^. _entityVal . _workflowWorkflowGraph
let hasWorkflowRole' :: WorkflowRole UserId -> DB Bool
hasWorkflowRole' role = maybeT (return False) $ do
rScope' <- hoistMaybe rScope
let canonRoute = _WorkflowScopeRoute # (rScope', WorkflowWorkflowR cID WWWorkflowR)
@ -360,6 +360,8 @@ workflowWorkflowList (title, heading) WWListColumns{..} sqlPred = do
jwiScope <- toRouteWorkflowScope $ _DBWorkflowScope # workflowInstanceScope
let jwiName = workflowInstanceName
return JsonWorkflowInstance{..}
let Entity _ WorkflowWorkflow{..} = res ^. resultWorkflowWorkflow
WorkflowGraph{..} <- getSharedIdWorkflowGraph workflowWorkflowGraph
(fmap getLast -> wState) <-
let go :: forall m.
( MonadHandler m
@ -410,9 +412,7 @@ workflowWorkflowList (title, heading) WWListColumns{..} sqlPred = do
tell . Just $ Last (stCID, nTo, aUser, wpTime, payload)
Entity _ WorkflowWorkflow{..} = res ^. resultWorkflowWorkflow
wState = review _DBWorkflowState workflowWorkflowState
WorkflowGraph{..} = _DBWorkflowGraph # workflowWorkflowGraph
in runConduit $ sourceWorkflowActionInfos wwId wState .| execWriterC (C.mapM_ go)
let jwwLastAction = wState <&> \(jwaIx, jwaTo, jwaUser, jwaTime, _) -> JsonWorkflowAction{..}

View File

@ -83,8 +83,8 @@ workflowR rScope cID = do
WorkflowWorkflow{..} <- get404 wwId
maybeT notFound . void . assertM (== review _DBWorkflowScope workflowWorkflowScope) $ fromRouteWorkflowScope rScope
mEdgeForm <- workflowEdgeForm (Right wwId) Nothing
wGraph <- getSharedIdWorkflowGraph workflowWorkflowGraph
let canonRoute = _WorkflowScopeRoute # (rScope, WorkflowWorkflowR cID WWWorkflowR)
wGraph = _DBWorkflowGraph # workflowWorkflowGraph
mEdge <- for mEdgeForm $ \edgeForm -> do
((edgeRes, edgeView), edgeEnc) <- liftHandler . runFormPost $ renderAForm FormStandard edgeForm

View File

@ -10,6 +10,7 @@ module Jobs
import Import hiding (StateT)
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Queue
import Jobs.Offload
import Jobs.Crontab
import qualified Data.Conduit.List as C
@ -105,6 +106,7 @@ handleJobs foundation@UniWorX{..}
jobShutdown <- liftIO newEmptyTMVarIO
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
jobHeldLocks <- liftIO $ newTVarIO Set.empty
jobOffload <- liftIO newEmptyTMVarIO
registerJobHeldLocksCount jobHeldLocks
registerJobWorkerQueueDepth appJobState
atomically $ putTMVar appJobState JobState
@ -155,7 +157,9 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
atomically . asum $
[ spawnMissingWorkers
, reapDeadWorkers
] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo ++
] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo
++ maybeToList (manageOffloadHandler <$> mkJobOffloadHandler (appDatabaseConf appSettings') (appJobMode appSettings'))
++
[ terminateGracefully terminate'
]
where
@ -286,6 +290,27 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|]
manageOffloadHandler :: ReaderT UniWorX m JobOffloadHandler -> STM (ContT () m ())
manageOffloadHandler spawn = do
shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard $ not shouldTerminate'
JobContext{jobOffload} <- jobContext <$> readTMVar appJobState
cOffload <- tryReadTMVar jobOffload
let respawn = do
nOffload <- lift $ runReaderT spawn foundation
atomically $ do
putTMVar jobOffload nOffload
whenIsJust cOffload $ \pOffload -> do
pOutgoing <- readTVar $ jobOffloadOutgoing pOffload
modifyTVar (jobOffloadOutgoing nOffload) (pOutgoing <>)
respawn <$ case cOffload of
Nothing -> return ()
Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler
stopJobCtl :: MonadUnliftIO m => UniWorX -> m ()
-- ^ Stop all worker threads currently running
stopJobCtl UniWorX{appJobState} = do
@ -471,46 +496,55 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
$logDebugS logIdent "JobCtlQueue..."
lift $ queueJob' job
$logInfoS logIdent "JobCtlQueue"
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do
content <- case fromJSON queuedJobContent of
Aeson.Success c -> return c
Aeson.Error t -> do
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
$logInfoS logIdent $ tshow content
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
handleCmd (JobCtlPerform jId) = do
jMode <- getsYesod $ view _appJobMode
case jMode of
JobsLocal{} -> performLocal
JobsOffload -> performOffload
where
performOffload = hoist atomically $ do
JobOffloadHandler{..} <- lift . readTMVar =<< asks jobOffload
lift $ modifyTVar jobOffloadOutgoing (`snoc` jId)
performLocal = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do
content <- case fromJSON queuedJobContent of
Aeson.Success c -> return c
Aeson.Error t -> do
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
instanceID' <- getsYesod $ view instanceID
now <- liftIO getCurrentTime
$logInfoS logIdent $ tshow content
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
let cleanup = do
when queuedJobWriteLastExec $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
{ cronLastExecJob = queuedJobContent
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now
, CronLastExecInstance =. instanceID'
]
delete jId
instanceID' <- getsYesod $ view instanceID
now <- liftIO getCurrentTime
case performJob content of
JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do
act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializableBatch cleanup
JobHandlerAtomicWithFinalizer act fin -> do
res <- runDBJobs . setSerializableBatch $ do
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
return res
fin res
let cleanup = do
when queuedJobWriteLastExec $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
{ cronLastExecJob = queuedJobContent
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now
, CronLastExecInstance =. instanceID'
]
delete jId
case performJob content of
JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do
act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializableBatch cleanup
JobHandlerAtomicWithFinalizer act fin -> do
res <- runDBJobs . setSerializableBatch $ do
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
return res
fin res
handleCmd JobCtlDetermineCrontab = do
$logDebugS logIdent "DetermineCrontab..."
newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab'

View File

@ -27,17 +27,6 @@ determineCrontab :: DB (Crontab JobCtl)
determineCrontab = execWriterT $ do
UniWorX{ appSettings' = AppSettings{..} } <- getYesod
case appJobFlushInterval of
Just interval -> tell $ HashMap.singleton
JobCtlFlush
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = interval
, cronNotAfter = Right CronNotScheduled
}
Nothing -> return ()
whenIsJust appJobCronInterval $ \interval ->
tell $ HashMap.singleton
JobCtlDetermineCrontab
@ -48,77 +37,6 @@ determineCrontab = execWriterT $ do
, cronNotAfter = Right CronNotScheduled
}
oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1]
whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton
(JobCtlQueue JobPruneInvitations)
Cron
{ cronInitial = CronTimestamp $ utcToLocalTime oldestInvUTC
, cronRepeat = CronRepeatOnChange
, cronRateLimit = nominalDay
, cronNotAfter = Right CronNotScheduled
}
oldestSessionFile <- lift $ preview (_head . _entityVal . _sessionFileTouched) <$> selectList [] [Asc SessionFileTouched, LimitTo 1]
whenIsJust oldestSessionFile $ \oldest -> tell $ HashMap.singleton
(JobCtlQueue JobPruneSessionFiles)
Cron
{ cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appSessionFilesExpire oldest
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appSessionFilesExpire / 2
, cronNotAfter = Right CronNotScheduled
}
oldestFallbackPersonalisedSheetFilesKey <- lift $ preview (_head . _entityVal . _fallbackPersonalisedSheetFilesKeyGenerated) <$> selectList [] [Asc FallbackPersonalisedSheetFilesKeyGenerated, LimitTo 1]
whenIsJust oldestFallbackPersonalisedSheetFilesKey $ \oldest -> tell $ HashMap.singleton
(JobCtlQueue JobPruneFallbackPersonalisedSheetFilesKeys)
Cron
{ cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appFallbackPersonalisedSheetFilesKeysExpire oldest
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appFallbackPersonalisedSheetFilesKeysExpire / 2
, cronNotAfter = Right CronNotScheduled
}
oldestSentMail <- lift $ preview (_head . _entityVal . _sentMailSentAt) <$> selectList [] [Asc SentMailSentAt, LimitTo 1]
whenIsJust ((,) <$> appMailRetainSent <*> oldestSentMail) $ \(retain, oldest) -> tell $ HashMap.singleton
(JobCtlQueue JobPruneOldSentMails)
Cron
{ cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime retain oldest
, cronRepeat = CronRepeatOnChange
, cronRateLimit = retain / 2
, cronNotAfter = Right CronNotScheduled
}
whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobInjectFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = iInterval
, cronNotAfter = Right CronNotScheduled
}
whenIsJust appRechunkFiles $ \rInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobRechunkFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = rInterval
, cronNotAfter = Right CronNotScheduled
}
whenIsJust appCheckMissingFiles $ \rInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobDetectMissingFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = rInterval
, cronNotAfter = Right CronNotScheduled
}
tell . flip foldMap universeF $ \kind ->
case appHealthCheckInterval kind of
Just int -> HashMap.singleton
@ -131,354 +49,437 @@ determineCrontab = execWriterT $ do
}
Nothing -> mempty
let newyear = cronCalendarAny
{ cronDayOfYear = cronMatchOne 1
}
in tell $ HashMap.singleton
(JobCtlQueue JobTruncateTransactionLog)
when (is _JobsLocal appJobMode) $ do
case appJobFlushInterval of
Just interval -> tell $ HashMap.singleton
JobCtlFlush
Cron
{ cronInitial = newyear
, cronRepeat = CronRepeatScheduled newyear
, cronRateLimit = minNominalYear
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = interval
, cronNotAfter = Right CronNotScheduled
}
Nothing -> return ()
oldestLogEntry <- fmap listToMaybe . lift . E.select . E.from $ \transactionLog -> do
E.where_ . E.not_ . E.isNothing $ transactionLog E.^. TransactionLogRemote
E.orderBy [E.asc $ transactionLog E.^. TransactionLogTime]
E.limit 1
return $ transactionLog E.^. TransactionLogTime
for_ oldestLogEntry $ \(E.Value oldestEntry) ->
tell $ HashMap.singleton
(JobCtlQueue JobDeleteTransactionLogIPs)
oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1]
whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton
(JobCtlQueue JobPruneInvitations)
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appTransactionLogIPRetentionTime oldestEntry
{ cronInitial = CronTimestamp $ utcToLocalTime oldestInvUTC
, cronRepeat = CronRepeatOnChange
, cronRateLimit = nominalDay
, cronNotAfter = Right CronNotScheduled
}
let
getNextIntervals within interval cInterval = do
now <- liftIO getPOSIXTime
return $ do
let
epochInterval = within / 2
(currEpoch, epochNow) = now `divMod'` epochInterval
currInterval = epochNow `div'` interval
numIntervals = max 1 . floor $ epochInterval / interval
n = ceiling $ 4 * cInterval / interval
i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ]
let
((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals
nextIntervalTime
= posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval
return (nextEpoch, nextInterval, nextIntervalTime, numIntervals)
if
| is _Just appLdapConf
, Just syncWithin <- appSynchroniseLdapUsersWithin
, Just cInterval <- appJobCronInterval
-> do
nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
tell $ HashMap.singleton
(JobCtlQueue JobSynchroniseLdap
{ jEpoch = fromInteger nextEpoch
, jNumIterations = fromInteger numIntervals
, jIteration = fromInteger nextInterval
})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
, cronRepeat = CronRepeatNever
, cronRateLimit = appSynchroniseLdapUsersInterval
, cronNotAfter = Left syncWithin
}
| otherwise
-> return ()
whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do
nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
tell $ HashMap.singleton
(JobCtlQueue JobPruneUnreferencedFiles
{ jEpoch = fromInteger nextEpoch
, jNumIterations = fromInteger numIntervals
, jIteration = fromInteger nextInterval
}
)
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
, cronRepeat = CronRepeatNever
, cronRateLimit = appPruneUnreferencedFilesInterval
, cronNotAfter = Left within
}
let
sheetJobs (Entity nSheet Sheet{..}) = do
for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetActive{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
}
for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom
guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom)
(fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet])
guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet]
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetHint{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
}
for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom
guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left nominalDay
}
for_ sheetActiveTo $ \aTo -> do
whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo'
, cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo
}
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
when sheetAutoDistribute $
tell $ HashMap.singleton
(JobCtlQueue $ JobDistributeCorrections nSheet)
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
, cronRepeat = CronRepeatNever
, cronRateLimit = 3600 -- Irrelevant due to `cronRepeat`
, cronNotAfter = Left nominalDay
}
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs
let
correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB ()
correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } )
oldestSessionFile <- lift $ preview (_head . _entityVal . _sessionFileTouched) <$> selectList [] [Asc SessionFileTouched, LimitTo 1]
whenIsJust oldestSessionFile $ \oldest -> tell $ HashMap.singleton
(JobCtlQueue JobPruneSessionFiles)
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay time
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
{ cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appSessionFilesExpire oldest
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appSessionFilesExpire / 2
, cronNotAfter = Right CronNotScheduled
}
submissionsByCorrector :: Entity Submission -> Map (UserId, SheetId) (Max UTCTime)
submissionsByCorrector (Entity _ sub)
| Just ratingBy <- submissionRatingBy sub
, Just assigned <- submissionRatingAssigned sub
, not $ submissionRatingDone sub
= Map.singleton (ratingBy, submissionSheet sub) $ Max assigned
| otherwise
= Map.empty
oldestFallbackPersonalisedSheetFilesKey <- lift $ preview (_head . _entityVal . _fallbackPersonalisedSheetFilesKeyGenerated) <$> selectList [] [Asc FallbackPersonalisedSheetFilesKeyGenerated, LimitTo 1]
whenIsJust oldestFallbackPersonalisedSheetFilesKey $ \oldest -> tell $ HashMap.singleton
(JobCtlQueue JobPruneFallbackPersonalisedSheetFilesKeys)
Cron
{ cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime appFallbackPersonalisedSheetFilesKeysExpire oldest
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appFallbackPersonalisedSheetFilesKeysExpire / 2
, cronNotAfter = Right CronNotScheduled
}
collateSubmissionsByCorrector acc entity = Map.unionWith (<>) acc $ submissionsByCorrector entity
correctorNotifications <=< runConduit $
transPipe lift ( selectSource [ SubmissionRatingBy !=. Nothing, SubmissionRatingAssigned !=. Nothing ] []
)
.| C.fold collateSubmissionsByCorrector Map.empty
oldestSentMail <- lift $ preview (_head . _entityVal . _sentMailSentAt) <$> selectList [] [Asc SentMailSentAt, LimitTo 1]
whenIsJust ((,) <$> appMailRetainSent <*> oldestSentMail) $ \(retain, oldest) -> tell $ HashMap.singleton
(JobCtlQueue JobPruneOldSentMails)
Cron
{ cronInitial = CronTimestamp . utcToLocalTime $ addUTCTime retain oldest
, cronRepeat = CronRepeatOnChange
, cronRateLimit = retain / 2
, cronNotAfter = Right CronNotScheduled
}
let
examSelect = E.selectSource . E.from $ \(exam `E.InnerJoin` course `E.InnerJoin` school) -> do
E.on $ school E.^. SchoolId E.==. course E.^. CourseSchool
E.on $ course E.^. CourseId E.==. exam E.^. ExamCourse
return (exam, course, school)
examJobs (Entity nExam Exam{..}, _, Entity _ School{..}) = do
newestResult <- lift . E.select . E.from $ \examResult -> do
E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam
return . E.max_ $ examResult E.^. ExamResultLastChanged
whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobInjectFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = iInterval
, cronNotAfter = Right CronNotScheduled
}
whenIsJust examVisibleFrom $ \visibleFrom -> do
case over (mapped . _Value) ((max `on` NTop) examFinished) newestResult of
[E.Value (NTop (Just ts))] ->
whenIsJust appRechunkFiles $ \rInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobRechunkFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = rInterval
, cronNotAfter = Right CronNotScheduled
}
whenIsJust appCheckMissingFiles $ \rInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobDetectMissingFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = rInterval
, cronNotAfter = Right CronNotScheduled
}
let newyear = cronCalendarAny
{ cronDayOfYear = cronMatchOne 1
}
in tell $ HashMap.singleton
(JobCtlQueue JobTruncateTransactionLog)
Cron
{ cronInitial = newyear
, cronRepeat = CronRepeatScheduled newyear
, cronRateLimit = minNominalYear
, cronNotAfter = Right CronNotScheduled
}
oldestLogEntry <- fmap listToMaybe . lift . E.select . E.from $ \transactionLog -> do
E.where_ . E.not_ . E.isNothing $ transactionLog E.^. TransactionLogRemote
E.orderBy [E.asc $ transactionLog E.^. TransactionLogTime]
E.limit 1
return $ transactionLog E.^. TransactionLogTime
for_ oldestLogEntry $ \(E.Value oldestEntry) ->
tell $ HashMap.singleton
(JobCtlQueue JobDeleteTransactionLogIPs)
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appTransactionLogIPRetentionTime oldestEntry
, cronRepeat = CronRepeatOnChange
, cronRateLimit = nominalDay
, cronNotAfter = Right CronNotScheduled
}
let
getNextIntervals within interval cInterval = do
now <- liftIO getPOSIXTime
return $ do
let
epochInterval = within / 2
(currEpoch, epochNow) = now `divMod'` epochInterval
currInterval = epochNow `div'` interval
numIntervals = max 1 . floor $ epochInterval / interval
n = ceiling $ 4 * cInterval / interval
i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ]
let
((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals
nextIntervalTime
= posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval
return (nextEpoch, nextInterval, nextIntervalTime, numIntervals)
if
| is _Just appLdapConf
, Just syncWithin <- appSynchroniseLdapUsersWithin
, Just cInterval <- appJobCronInterval
-> do
nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval cInterval
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamResult{..})
(JobCtlQueue JobSynchroniseLdap
{ jEpoch = fromInteger nextEpoch
, jNumIterations = fromInteger numIntervals
, jIteration = fromInteger nextInterval
})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom ts
, cronRepeat = CronRepeatOnChange
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
, cronRepeat = CronRepeatNever
, cronRateLimit = appSynchroniseLdapUsersInterval
, cronNotAfter = Left syncWithin
}
| otherwise
-> return ()
whenIsJust ((,) <$> appPruneUnreferencedFilesWithin <*> appJobCronInterval) $ \(within, cInterval) -> do
nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval cInterval
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
tell $ HashMap.singleton
(JobCtlQueue JobPruneUnreferencedFiles
{ jEpoch = fromInteger nextEpoch
, jNumIterations = fromInteger numIntervals
, jIteration = fromInteger nextInterval
}
)
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
, cronRepeat = CronRepeatNever
, cronRateLimit = appPruneUnreferencedFilesInterval
, cronNotAfter = Left within
}
let
sheetJobs (Entity nSheet Sheet{..}) = do
for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetActive{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aFrom
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
}
for_ (max <$> sheetVisibleFrom <*> sheetHintFrom) $ \hFrom -> maybeT (return ()) $ do
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom hFrom) > 300) sheetActiveFrom
guardM $ or2M (return $ maybe True (\sFrom -> abs (diffUTCTime sFrom hFrom) > 300) sheetSolutionFrom)
(fmap not . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet])
guardM . lift . lift $ exists [SheetFileType ==. SheetHint, SheetFileSheet ==. nSheet]
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetHint{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ hFrom
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) sheetActiveTo
}
for_ (max <$> sheetVisibleFrom <*> sheetSolutionFrom) $ \sFrom -> maybeT (return ()) $ do
guard $ maybe True (\aFrom -> abs (diffUTCTime aFrom sFrom) > 300) sheetActiveFrom
guardM . lift . lift $ exists [SheetFileType ==. SheetSolution, SheetFileSheet ==. nSheet]
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetSolution{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sFrom
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left nominalDay
}
for_ sheetActiveTo $ \aTo -> do
whenIsJust (max aTo <$> sheetVisibleFrom) $ \aTo' -> do
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetSoonInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . maybe id max sheetActiveFrom $ addUTCTime (-nominalDay) aTo'
, cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationExpiration $ max visibleFrom ts
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ aTo
}
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
when sheetAutoDistribute $
tell $ HashMap.singleton
(JobCtlQueue $ JobDistributeCorrections nSheet)
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ aTo
, cronRepeat = CronRepeatNever
, cronRateLimit = 3600 -- Irrelevant due to `cronRepeat`
, cronNotAfter = Left nominalDay
}
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs
let
correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB ()
correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } )
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay time
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
submissionsByCorrector :: Entity Submission -> Map (UserId, SheetId) (Max UTCTime)
submissionsByCorrector (Entity _ sub)
| Just ratingBy <- submissionRatingBy sub
, Just assigned <- submissionRatingAssigned sub
, not $ submissionRatingDone sub
= Map.singleton (ratingBy, submissionSheet sub) $ Max assigned
| otherwise
= Map.empty
collateSubmissionsByCorrector acc entity = Map.unionWith (<>) acc $ submissionsByCorrector entity
correctorNotifications <=< runConduit $
transPipe lift ( selectSource [ SubmissionRatingBy !=. Nothing, SubmissionRatingAssigned !=. Nothing ] []
)
.| C.fold collateSubmissionsByCorrector Map.empty
let
examSelect = E.selectSource . E.from $ \(exam `E.InnerJoin` course `E.InnerJoin` school) -> do
E.on $ school E.^. SchoolId E.==. course E.^. CourseSchool
E.on $ course E.^. CourseId E.==. exam E.^. ExamCourse
return (exam, course, school)
examJobs (Entity nExam Exam{..}, _, Entity _ School{..}) = do
newestResult <- lift . E.select . E.from $ \examResult -> do
E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam
return . E.max_ $ examResult E.^. ExamResultLastChanged
whenIsJust examVisibleFrom $ \visibleFrom -> do
case over (mapped . _Value) ((max `on` NTop) examFinished) newestResult of
[E.Value (NTop (Just ts))] ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamResult{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom ts
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationExpiration $ max visibleFrom ts
}
_other -> return ()
whenIsJust examRegisterFrom $ \registerFrom ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamRegistrationActive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom registerFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) examRegisterTo
}
whenIsJust ((,) <$> examRegisterFrom <*> examRegisterTo) $ \(registerFrom, registerTo) ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamRegistrationSoonInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) registerTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ registerTo
}
whenIsJust ((,) <$> examRegisterFrom <*> examDeregisterUntil) $ \(registerFrom, deregisterUntil) ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamDeregistrationSoonInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) deregisterUntil
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ deregisterUntil
}
let closeTime = case (examClosed, examFinished) of
(mClose, Just finish)
| isn't _ExamCloseSeparate schoolExamCloseMode -> Just $ maybe id min mClose finish
(Just close, _)
| is _ExamCloseSeparate schoolExamCloseMode -> Just close
_other -> Nothing
case closeTime of
Just close -> do
-- If an exam that was previously under `ExamCloseSeparate` rules transitions to `ExamCloseOnFinish`, it might suddenly have been closed an arbitrary time ago
-- If `cronNotAfter` was only `appNotificationExpiration` in that case, no notification might ever be sent
-- That's probably fine.
changedResults <- lift . E.select . E.from $ \examResult -> do
E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam
E.&&. examResult E.^. ExamResultLastChanged E.>. E.val close
return $ examResult E.^. ExamResultId
case newestResult of
[E.Value (Just lastChange)]
| not $ null changedResults
-> tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResultsChanged{ nExamResults = Set.fromList $ map E.unValue changedResults })
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
_other -> return ()
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResults{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ close
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
Nothing -> return ()
in runConduit $ transPipe lift examSelect .| C.mapM_ examJobs
let
externalExamJobs nExternalExam = do
newestResult <- lift . E.select . E.from $ \externalExamResult -> do
E.where_ $ externalExamResult E.^. ExternalExamResultExam E.==. E.val nExternalExam
return . E.max_ $ externalExamResult E.^. ExternalExamResultLastChanged
case newestResult of
[E.Value (Just lastChange)] ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamOfficeExternalExamResults{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange
, cronRepeat = CronRepeatOnChange
, cronRateLimit = nominalDay
, cronNotAfter = Left appNotificationExpiration
}
_other -> return ()
whenIsJust examRegisterFrom $ \registerFrom ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamRegistrationActive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ max visibleFrom registerFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Left appNotificationExpiration) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) examRegisterTo
}
whenIsJust ((,) <$> examRegisterFrom <*> examRegisterTo) $ \(registerFrom, registerTo) ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamRegistrationSoonInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) registerTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ registerTo
}
whenIsJust ((,) <$> examRegisterFrom <*> examDeregisterUntil) $ \(registerFrom, deregisterUntil) ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamDeregistrationSoonInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max visibleFrom . max registerFrom $ addUTCTime (-nominalDay) deregisterUntil
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp $ utcToLocalTimeTZ appTZ deregisterUntil
}
let closeTime = case (examClosed, examFinished) of
(mClose, Just finish)
| isn't _ExamCloseSeparate schoolExamCloseMode -> Just $ maybe id min mClose finish
(Just close, _)
| is _ExamCloseSeparate schoolExamCloseMode -> Just close
_other -> Nothing
case closeTime of
Just close -> do
-- If an exam that was previously under `ExamCloseSeparate` rules transitions to `ExamCloseOnFinish`, it might suddenly have been closed an arbitrary time ago
-- If `cronNotAfter` was only `appNotificationExpiration` in that case, no notification might ever be sent
-- That's probably fine.
changedResults <- lift . E.select . E.from $ \examResult -> do
E.where_ $ examResult E.^. ExamResultExam E.==. E.val nExam
E.&&. examResult E.^. ExamResultLastChanged E.>. E.val close
return $ examResult E.^. ExamResultId
case newestResult of
[E.Value (Just lastChange)]
| not $ null changedResults
-> tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResultsChanged{ nExamResults = Set.fromList $ map E.unValue changedResults })
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
_other -> return ()
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamOfficeExamResults{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ close
, cronRepeat = CronRepeatNever
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Left appNotificationExpiration
}
Nothing -> return ()
in runConduit $ transPipe lift examSelect .| C.mapM_ examJobs
runConduit $ transPipe lift (selectKeys [] []) .| C.mapM_ externalExamJobs
let
externalExamJobs nExternalExam = do
newestResult <- lift . E.select . E.from $ \externalExamResult -> do
E.where_ $ externalExamResult E.^. ExternalExamResultExam E.==. E.val nExternalExam
return . E.max_ $ externalExamResult E.^. ExternalExamResultLastChanged
allocations <- lift $ selectList [] []
case newestResult of
[E.Value (Just lastChange)] ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationExamOfficeExternalExamResults{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay lastChange
, cronRepeat = CronRepeatOnChange
, cronRateLimit = nominalDay
, cronNotAfter = Left appNotificationExpiration
}
_other -> return ()
let
allocationTimes :: EntityField Allocation (Maybe UTCTime) -> MergeHashMap UTCTime [Entity Allocation]
allocationTimes aField = flip foldMap allocations $ \allocEnt -> case allocEnt ^. fieldLens aField of
Nothing -> mempty
Just t -> _MergeHashMap # HashMap.singleton t (pure allocEnt)
runConduit $ transPipe lift (selectKeys [] []) .| C.mapM_ externalExamJobs
forM_ allocations $ \(Entity nAllocation _) -> do
doneSince <- lift $ fmap (E.unValue <=< listToMaybe) . E.select . E.from $ \participant -> do
E.where_ $ participant E.^. CourseParticipantAllocated E.==. E.just (E.val nAllocation)
return . E.max_ $ participant E.^. CourseParticipantRegistration
whenIsJust doneSince $ \doneSince' ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationResults{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay doneSince'
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationCollateDelay $ addUTCTime appNotificationExpiration doneSince'
}
allocations <- lift $ selectList [] []
let
allocationTimes :: EntityField Allocation (Maybe UTCTime) -> MergeHashMap UTCTime [Entity Allocation]
allocationTimes aField = flip foldMap allocations $ \allocEnt -> case allocEnt ^. fieldLens aField of
Nothing -> mempty
Just t -> _MergeHashMap # HashMap.singleton t (pure allocEnt)
forM_ allocations $ \(Entity nAllocation _) -> do
doneSince <- lift $ fmap (E.unValue <=< listToMaybe) . E.select . E.from $ \participant -> do
E.where_ $ participant E.^. CourseParticipantAllocated E.==. E.just (E.val nAllocation)
return . E.max_ $ participant E.^. CourseParticipantRegistration
whenIsJust doneSince $ \doneSince' ->
iforM_ (allocationTimes AllocationStaffRegisterFrom) $ \staffRegisterFrom allocs ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationResults{..})
(JobCtlQueue $ JobQueueNotification NotificationAllocationStaffRegister{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appNotificationCollateDelay doneSince'
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffRegisterFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = Right . CronTimestamp . utcToLocalTimeTZ appTZ . addUTCTime appNotificationCollateDelay $ addUTCTime appNotificationExpiration doneSince'
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffRegisterTo . to NTop . filtered (> NTop (Just staffRegisterFrom))) allocs
}
iforM_ (allocationTimes AllocationRegisterFrom) $ \registerFrom allocs ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationRegister{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationRegisterTo . to NTop . filtered (> NTop (Just registerFrom))) allocs
}
iforM_ (allocationTimes AllocationStaffAllocationFrom) $ \staffAllocationFrom allocs ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationAllocation{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffAllocationFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just staffAllocationFrom))) allocs
}
iforM_ (allocationTimes AllocationRegisterTo) $ \registerTo allocs' -> do
let allocs = flip filter allocs' $ \(Entity _ Allocation{..}) -> maybe True (> registerTo) allocationStaffAllocationTo
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationUnratedApplications{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs
}
iforM_ (allocationTimes AllocationStaffRegisterFrom) $ \staffRegisterFrom allocs ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationStaffRegister{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffRegisterFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffRegisterTo . to NTop . filtered (> NTop (Just staffRegisterFrom))) allocs
}
iforM_ (allocationTimes AllocationRegisterFrom) $ \registerFrom allocs ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationRegister{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationRegisterTo . to NTop . filtered (> NTop (Just registerFrom))) allocs
}
iforM_ (allocationTimes AllocationStaffAllocationFrom) $ \staffAllocationFrom allocs ->
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationAllocation{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ staffAllocationFrom
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just staffAllocationFrom))) allocs
}
iforM (allocationTimes AllocationRegisterTo) $ \registerTo allocs' -> do
let allocs = flip filter allocs' $ \(Entity _ Allocation{..}) -> maybe True (> registerTo) allocationStaffAllocationTo
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationAllocationUnratedApplications{ nAllocations = setOf (folded . _entityKey) allocs })
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ registerTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs
}

View File

@ -73,9 +73,7 @@ fileReferences (E.just -> fHash)
]
workflowFileReferences :: MonadResource m => ConduitT () FileContentReference (SqlPersistT m) ()
workflowFileReferences = mconcat [ E.selectSource (E.from $ pure . (E.^. WorkflowDefinitionGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
, E.selectSource (E.from $ pure . (E.^. WorkflowInstanceGraph )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
, E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowGraph )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
workflowFileReferences = mconcat [ E.selectSource (E.from $ pure . (E.^. SharedWorkflowGraphGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
, E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowState )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
]

70
src/Jobs/Offload.hs Normal file
View File

@ -0,0 +1,70 @@
module Jobs.Offload
( mkJobOffloadHandler
) where
import Import hiding (bracket, js)
import Jobs.Types
import Jobs.Queue
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import Database.Persist.Postgresql (PostgresConf, pgConnStr)
import Data.Text.Encoding (decodeUtf8')
import UnliftIO.Exception (bracket)
jobOffloadChannel :: Text
jobOffloadChannel = "job-offload"
mkJobOffloadHandler :: forall m.
( MonadResource m
, MonadUnliftIO m
, MonadThrow m, MonadReader UniWorX m
, MonadLogger m
)
=> PostgresConf -> JobMode
-> Maybe (m JobOffloadHandler)
mkJobOffloadHandler dbConf jMode
| is _JobsLocal jMode, hasn't (_jobsAcceptOffload . only True) jMode = Nothing
| otherwise = Just $ do
jobOffloadOutgoing <- newTVarIO mempty
jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do
myPid <- liftIO $ PG.getBackendPID pgConn
let shouldListen = has (_jobsAcceptOffload . only True) jMode
when shouldListen $
void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel)
foreverBreak $ \(($ ()) -> terminate) -> do
UniWorX{appJobState} <- ask
shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
when shouldTerminate terminate
let
getInput = do
n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn
if | notificationPid == myPid || notificationChannel /= encodeUtf8 jobOffloadChannel -> getInput
| otherwise -> return n
getOutput = atomically $ do
jQueue <- readTVar jobOffloadOutgoing
case jQueue of
j :< js -> j <$ writeTVar jobOffloadOutgoing js
_other -> mzero
io <- lift $ if
| shouldListen -> getInput `race` getOutput
| otherwise -> Right <$> getOutput
case io of
Left PG.Notification{..}
| Just jId <- fromPathPiece =<< either (const Nothing) Just (decodeUtf8' notificationData)
-> writeJobCtl $ JobCtlPerform jId
| otherwise
-> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow notificationData
Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId)
return JobOffloadHandler{..}

View File

@ -9,7 +9,7 @@ module Jobs.Types
, classifyJobCtl
, YesodJobDB
, JobHandler(..), _JobHandlerAtomic, _JobHandlerException
, JobContext(..)
, JobOffloadHandler(..), JobContext(..)
, JobState(..), _jobWorkers, _jobWorkerName, _jobContext, _jobPoolManager, _jobCron, _jobShutdown, _jobCurrentCrontab
, jobWorkerNames
, JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob
@ -238,10 +238,16 @@ showWorkerId = tshow . hashUnique . jobWorkerUnique
newWorkerId :: MonadIO m => m JobWorkerId
newWorkerId = JobWorkerId <$> liftIO newUnique
data JobOffloadHandler = JobOffloadHandler
{ jobOffloadHandler :: Async ()
, jobOffloadOutgoing :: TVar (Seq QueuedJobId)
}
data JobContext = JobContext
{ jobCrontab :: TVar (Crontab JobCtl)
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
, jobHeldLocks :: TVar (Set QueuedJobId)
, jobOffload :: TMVar JobOffloadHandler
}

View File

@ -47,6 +47,8 @@ import Data.Time.Format
import qualified Data.Time.Zones as TZ
import Utils.Workflow
data ManualMigration
= Migration20180813SimplifyUserTheme
@ -97,6 +99,7 @@ data ManualMigration
| Migration20201106StoredMarkup
| Migration20201119RoomTypes
| Migration20210115ExamPartsFrom
| Migration20210201SharedWorkflowGraphs
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
deriving anyclass (Universe, Finite)
@ -968,6 +971,55 @@ customMigrations = mapF $ \case
migrateExam _ = return ()
in runConduit $ getExam .| C.mapM_ migrateExam
Migration20210201SharedWorkflowGraphs -> do
unlessM (tableExists "shared_workflow_graph")
[executeQQ|CREATE TABLE "shared_workflow_graph" ("hash" bytea primary key, "graph" jsonb not null)|]
whenM (tableExists "workflow_definition") $ do
[executeQQ|ALTER TABLE "workflow_definition" ADD COLUMN "graph_id" bytea references shared_workflow_graph(hash)|]
let getDefinitions = [queryQQ|SELECT "id", "graph" FROM "workflow_definition"|]
migrateDefinition [ fromPersistValue -> Right (wdId :: WorkflowDefinitionId), fromPersistValue -> Right (graph :: DBWorkflowGraph) ] = do
swgId <- insertSharedWorkflowGraph graph
[executeQQ|UPDATE "workflow_definition" SET "graph_id" = #{swgId} WHERE "id" = #{wdId}|]
migrateDefinition _ = return ()
in runConduit $ getDefinitions .| C.mapM_ migrateDefinition
[executeQQ|
ALTER TABLE "workflow_definition" DROP COLUMN "graph";
ALTER TABLE "workflow_definition" ALTER COLUMN "graph_id" SET not null;
ALTER TABLE "workflow_definition" RENAME COLUMN "graph_id" TO "graph";
|]
whenM (tableExists "workflow_instance") $ do
[executeQQ|ALTER TABLE "workflow_instance" ADD COLUMN "graph_id" bytea references shared_workflow_graph(hash)|]
let getInstances = [queryQQ|SELECT "id", "graph" FROM "workflow_instance"|]
migrateInstance [ fromPersistValue -> Right (wiId :: WorkflowInstanceId), fromPersistValue -> Right (graph :: DBWorkflowGraph) ] = do
swgId <- insertSharedWorkflowGraph graph
[executeQQ|UPDATE "workflow_instance" SET "graph_id" = #{swgId} WHERE "id" = #{wiId}|]
migrateInstance _ = return ()
in runConduit $ getInstances .| C.mapM_ migrateInstance
[executeQQ|
ALTER TABLE "workflow_instance" DROP COLUMN "graph";
ALTER TABLE "workflow_instance" ALTER COLUMN "graph_id" SET not null;
ALTER TABLE "workflow_instance" RENAME COLUMN "graph_id" TO "graph";
|]
whenM (tableExists "workflow_workflow") $ do
[executeQQ|ALTER TABLE "workflow_workflow" ADD COLUMN "graph_id" bytea references shared_workflow_graph(hash)|]
let getWorkflows = [queryQQ|SELECT "id", "graph" FROM "workflow_workflow"|]
migrateWorkflow [ fromPersistValue -> Right (wwId :: WorkflowWorkflowId), fromPersistValue -> Right (graph :: DBWorkflowGraph) ] = do
swgId <- insertSharedWorkflowGraph graph
[executeQQ|UPDATE "workflow_workflow" SET "graph_id" = #{swgId} WHERE "id" = #{wwId}|]
migrateWorkflow _ = return ()
in runConduit $ getWorkflows .| C.mapM_ migrateWorkflow
[executeQQ|
ALTER TABLE "workflow_workflow" DROP COLUMN "graph";
ALTER TABLE "workflow_workflow" ALTER COLUMN "graph_id" SET not null;
ALTER TABLE "workflow_workflow" RENAME COLUMN "graph_id" TO "graph";
|]
tableExists :: MonadIO m => Text -> ReaderT SqlBackend m Bool
tableExists table = do

View File

@ -1,7 +1,7 @@
{-# LANGUAGE UndecidableInstances #-}
module Model.Types.Workflow
( WorkflowGraph(..)
( WorkflowGraph(..), WorkflowGraphReference(..)
, WorkflowGraphNodeLabel
, WorkflowGraphNode(..)
, WorkflowNodeView(..)
@ -37,6 +37,8 @@ import Model.Types.Security (AuthDNF, PredDNF)
import Model.Types.File (FileContentReference, FileFieldUserOption, FileField, _fieldAdditionalFiles, FileReferenceTitleMapConvertible(..))
import Database.Persist.Sql (PersistFieldSql(..))
import Web.HttpApiData (ToHttpApiData, FromHttpApiData)
import Data.ByteArray (ByteArrayAccess)
import Data.Maybe (fromJust)
@ -77,6 +79,15 @@ deriving instance (Eq fileid, Eq userid, Typeable fileid, Typeable userid, Eq (F
deriving instance (Ord fileid, Ord userid, Typeable fileid, Typeable userid, Ord (FileField fileid)) => Ord (WorkflowGraph fileid userid)
deriving instance (Show fileid, Show userid, Show (FileField fileid)) => Show (WorkflowGraph fileid userid)
newtype WorkflowGraphReference = WorkflowGraphReference (Digest SHA3_256)
deriving (Eq, Ord, Read, Show, Lift, Generic, Typeable)
deriving newtype ( PersistField, PersistFieldSql
, PathPiece, ToHttpApiData, FromHttpApiData, ToJSON, FromJSON
, Hashable, NFData
, ByteArrayAccess
, Binary
)
----- WORKFLOW GRAPH: NODES -----
newtype WorkflowGraphNodeLabel = WorkflowGraphNodeLabel { unWorkflowGraphNodeLabel :: CI Text }
@ -1051,3 +1062,7 @@ instance Binary WorkflowScope'
instance (Binary termid, Binary schoolid, Binary courseid) => Binary (WorkflowScope termid schoolid courseid)
instance Binary userid => Binary (WorkflowRole userid)
----- TH Jail -----
makeWrapped ''WorkflowGraphReference

View File

@ -206,9 +206,16 @@ data AppSettings = AppSettings
, appInitialInstanceID :: Maybe (Either FilePath UUID)
, appRibbon :: Maybe Text
, appJobMode :: JobMode
, appMemcacheAuth :: Bool
} deriving Show
data JobMode = JobsLocal { jobsAcceptOffload :: Bool }
| JobsOffload
deriving (Eq, Ord, Read, Show, Generic, Typeable)
deriving anyclass (Hashable)
data ApprootScope = ApprootUserGenerated | ApprootDefault
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
deriving anyclass (Universe, Finite, Hashable)
@ -342,6 +349,11 @@ deriveFromJSON defaultOptions
{ fieldLabelModifier = camelToPathPiece' 2
} ''UserDefaultConf
deriveJSON defaultOptions
{ fieldLabelModifier = camelToPathPiece' 1
, constructorTagModifier = camelToPathPiece' 1
} ''JobMode
instance FromJSON LdapConf where
parseJSON = withObject "LdapConf" $ \o -> do
ldapTls <- o .:? "tls"
@ -596,6 +608,8 @@ instance FromJSON AppSettings where
appMemcacheAuth <- o .:? "memcache-auth" .!= False
appJobMode <- o .:? "job-mode" .!= JobsLocal True
return AppSettings{..}
makeClassy_ ''AppSettings

View File

@ -70,6 +70,7 @@ import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Catch
import Control.Monad.Morph (hoist)
import Control.Monad.Fail
import Control.Monad.Trans.Cont (ContT, evalContT, callCC)
import Language.Haskell.TH
import Language.Haskell.TH.Instances ()
@ -943,6 +944,11 @@ forever' :: Monad m
-> m b
forever' start cont = cont start >>= flip forever' cont
foreverBreak :: Monad m
=> ((r -> ContT r m b) -> ContT r m a)
-> m r
foreverBreak cont = evalContT . callCC $ forever . cont
--------------
-- Foldable --

View File

@ -5,6 +5,7 @@
module Utils.Lens ( module Utils.Lens ) where
import Import.NoModel
import Settings
import Model
import Model.Rating
import qualified ClassyPrelude.Yesod as Yesod (HasHttpManager(..))
@ -272,6 +273,9 @@ makePrisms ''AllocationPriority
makePrisms ''RoomReference
makeLenses_ ''RoomReference
makePrisms ''JobMode
makeLenses_ ''JobMode
-- makeClassy_ ''Load
--------------------------

View File

@ -32,6 +32,7 @@ data GlobalGetParam = GetLang
| GetDownload
| GetError
| GetSelectTable
| GetGenerateToken
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic)
deriving anyclass (Universe, Finite)

View File

@ -10,6 +10,8 @@ module Utils.Workflow
, decryptWorkflowStateIndex, encryptWorkflowStateIndex
, isTopWorkflowScope, isTopWorkflowScopeSql
, selectWorkflowInstanceDescription
, SharedWorkflowGraphException(..), getSharedDBWorkflowGraph, getSharedIdWorkflowGraph
, insertSharedWorkflowGraph
) where
import Import.NoFoundation
@ -19,11 +21,15 @@ import qualified Crypto.MAC.KMAC as Crypto
import qualified Data.ByteArray as BA
import qualified Data.Binary as Binary
import Crypto.Hash.Algorithms (SHAKE256)
import qualified Crypto.Hash as Crypto
import Language.Haskell.TH (nameBase)
import qualified Data.Aeson as Aeson
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
{-# ANN module ("HLint: ignore Use newtype instead of data" :: String) #-}
type RouteWorkflowScope = WorkflowScope TermId SchoolId (TermId, SchoolId, CourseShorthand)
type DBWorkflowScope = WorkflowScope TermIdentifier SchoolShorthand SqlBackendKey
@ -130,3 +136,35 @@ selectWorkflowInstanceDescription wiId = withReaderT (projectBackend @SqlReadBac
return $ workflowInstanceDescription E.^. WorkflowInstanceDescriptionLanguage
descLang <- traverse selectLanguage . nonEmpty $ E.unValue <$> descLangs
fmap join . for descLang $ \descLang' -> getBy $ UniqueWorkflowInstanceDescription wiId descLang'
data SharedWorkflowGraphException
= SharedWorkflowGraphNotFound SharedWorkflowGraphId
deriving (Eq, Ord, Read, Show, Generic, Typeable)
deriving anyclass (Exception)
getSharedDBWorkflowGraph :: ( MonadHandler m
, BackendCompatible SqlReadBackend backend
)
=> SharedWorkflowGraphId
-> ReaderT backend m DBWorkflowGraph
getSharedDBWorkflowGraph swgId = $cachedHereBinary swgId . withReaderT (projectBackend @SqlReadBackend) $ do
maybe (liftHandler . throwM $ SharedWorkflowGraphNotFound swgId) (return . sharedWorkflowGraphGraph) =<< get swgId
getSharedIdWorkflowGraph :: ( MonadHandler m
, BackendCompatible SqlReadBackend backend
)
=> SharedWorkflowGraphId
-> ReaderT backend m IdWorkflowGraph
getSharedIdWorkflowGraph = fmap (review _DBWorkflowGraph) . getSharedDBWorkflowGraph
insertSharedWorkflowGraph :: ( MonadIO m
, BackendCompatible SqlBackend backend
)
=> DBWorkflowGraph
-> ReaderT backend m SharedWorkflowGraphId
insertSharedWorkflowGraph graph = withReaderT (projectBackend @SqlBackend) $
swgId' <$ repsert swgId' (SharedWorkflowGraph swgId graph)
where
swgId = WorkflowGraphReference . Crypto.hashlazy $ Aeson.encode graph
swgId' = SharedWorkflowGraphKey swgId

View File

@ -34,6 +34,7 @@ import qualified Data.Conduit.Combinators as C
import qualified Data.Yaml as Yaml
import Utils.Workflow
import Utils.Workflow.Lint
import System.Directory (getModificationTime)
@ -1330,8 +1331,9 @@ fillDb = do
displayLinterIssue = liftIO . hPutStrLn stderr . displayException
handleSql displayLinterIssue $ do
workflowDefinitionGraph <- Yaml.decodeFileThrow $ testdataDir </> "workflows" </> "theses.yaml"
for_ (lintWorkflowGraph workflowDefinitionGraph) $ mapM_ throwM
graph <- Yaml.decodeFileThrow $ testdataDir </> "workflows" </> "theses.yaml"
for_ (lintWorkflowGraph graph) $ mapM_ throwM
workflowDefinitionGraph <- insertSharedWorkflowGraph graph
let
thesesWorkflowDef = WorkflowDefinition{..}
where workflowDefinitionInstanceCategory = Just "theses"
@ -1366,8 +1368,9 @@ fillDb = do
}
handleSql displayLinterIssue $ do
workflowDefinitionGraph <- Yaml.decodeFileThrow $ testdataDir </> "workflows" </> "recognitions-ifi.yaml"
for_ (lintWorkflowGraph workflowDefinitionGraph) $ mapM_ throwM
graph <- Yaml.decodeFileThrow $ testdataDir </> "workflows" </> "recognitions-ifi.yaml"
for_ (lintWorkflowGraph graph) $ mapM_ throwM
workflowDefinitionGraph <- insertSharedWorkflowGraph graph
let
recognitionsWorkflowDef = WorkflowDefinition{..}
where workflowDefinitionInstanceCategory = Just "recognitions-ifi"

2
testdata/workflows vendored

@ -1 +1 @@
Subproject commit b05854beedd35e2d5ffe43628b747efa86e92ffb
Subproject commit cf7dcf58c524176bbdd27ff279d68a5ab90cd06e