fradrive/src/Jobs.hs

739 lines
32 KiB
Haskell

-- SPDX-FileCopyrightText: 2022 Gregor Kleen <gregor.kleen@ifi.lmu.de>,Sarah Vaupel <sarah.vaupel@ifi.lmu.de>,Sarah Vaupel <vaupel.sarah@campus.lmu.de>,Steffen Jost <jost@tcs.ifi.lmu.de>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
{-# OPTIONS_GHC -fprof-auto #-}
module Jobs
( module Types
, module Jobs.Queue
, handleJobs
, stopJobCtl
) where
import Import hiding (StateT)
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Queue
import Jobs.Offload
import Jobs.Crontab
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (mapMaybe)
import qualified Data.Text.Lazy as LT
import Data.Aeson (fromJSON)
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson
import Control.Monad.Random (evalRand, mkStdGen, uniformMay)
import Cron
import qualified Data.HashMap.Strict as HashMap
import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Map.Strict as Map
import Data.Map.Strict ((!))
import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST)
import Control.Monad.Trans.State.Strict (StateT, evalStateT)
import qualified Control.Monad.State.Class as State
import Control.Monad.Trans.Cont (ContT(..), callCC)
import Control.Monad.Random.Lazy (evalRandTIO, mapRandT)
import Control.Monad.Logger
import qualified Control.Monad.Catch as Exc
import Data.Time.Zones
import Control.Concurrent.STM (stateTVar, retry)
import Control.Concurrent.STM.Delay
import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay)
import qualified Database.Esqueleto.Legacy as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.ByteString as ByteString
import Handler.Utils.Files (sourceFileChunks, _SourceFilesContentUnavailable)
import qualified Data.IntervalMap.Strict as IntervalMap
import Jobs.Handler.SendNotification
import Jobs.Handler.SendTestEmail
import Jobs.Handler.QueueNotification
import Jobs.Handler.HelpRequest
import Jobs.Handler.SetLogSettings
import Jobs.Handler.DistributeCorrections
import Jobs.Handler.SendCourseCommunication
import Jobs.Handler.Invitation
import Jobs.Handler.SendPasswordReset
import Jobs.Handler.TransactionLog
import Jobs.Handler.SynchroniseUserdb
import Jobs.Handler.SynchroniseAvs
import Jobs.Handler.PruneInvitations
import Jobs.Handler.ChangeUserDisplayEmail
import Jobs.Handler.Files
import Jobs.Handler.ExternalApis
import Jobs.Handler.PersonalisedSheetFiles
import Jobs.Handler.PruneOldSentMails
import Jobs.Handler.StudyFeatures
import Jobs.Handler.LMS
import Jobs.Handler.Print
import Jobs.HealthReport
import Control.Exception.Base (AsyncException)
import Type.Reflection (typeOf)
import System.Clock
data JobQueueException = JInvalid QueuedJobId QueuedJob
| JLocked QueuedJobId InstanceId UTCTime
| JNonexistant QueuedJobId
deriving (Read, Show, Eq, Generic)
instance Exception JobQueueException
handleJobs :: ( MonadResource m
, MonadLogger m
, MonadUnliftIO m
, MonadMask m
)
=> UniWorX -> m ()
-- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs foundation@UniWorX{..}
| foundation ^. _appJobWorkers == 0 = return ()
| otherwise = do
jobPoolManager <- allocateLinkedAsyncWithUnmask $ manageJobPool foundation
jobCron <- allocateLinkedAsyncWithUnmask $ manageCrontab foundation
let jobWorkers = Map.empty
jobWorkerName = const $ error "Unknown worker"
jobCrontab <- liftIO $ newTVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
jobShutdown <- liftIO newEmptyTMVarIO
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
jobHeldLocks <- liftIO $ newTVarIO Set.empty
jobOffload <- liftIO newEmptyTMVarIO
jobLastFlush <- liftIO $ newTVarIO Nothing
registerJobHeldLocksCount jobHeldLocks
registerJobWorkerQueueDepth appJobState
atomically $ putTMVar appJobState JobState
{ jobContext = JobContext{..}
, ..
}
manageCrontab :: forall m.
( MonadResource m
, MonadUnliftIO m
)
=> UniWorX -> (forall a. m a -> m a) -> m ()
manageCrontab foundation@UniWorX{..} unmask = do
ch <- allocateLinkedAsync $ do
jState <- atomically $ readTMVar appJobState
liftIO . unsafeHandler foundation . void $ do
atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState
when (has (_appJobCronInterval . _Just) foundation) $
runReaderT ?? foundation $
writeJobCtlBlock JobCtlDetermineCrontab
void $ evalRWST (forever execCrontab) jState HashMap.empty
let awaitTermination = guardM $
readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
handleAny (throwTo $ asyncThreadId ch) . unmask $ do
doCancel <- atomically $ asum
[ False <$ waitSTM ch
, True <$ awaitTermination
]
when doCancel $
cancel ch
manageJobPool :: forall m.
( MonadResource m
, MonadLogger m
, MonadUnliftIO m
, MonadMask m
)
=> UniWorX -> (forall a. m a -> m a) -> m ()
manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
flip runContT return . callCC $ \terminate' ->
forever . join . lift . routeExc $ do
transferInfo <- runMaybeT $ do
moveThreshold <- hoistMaybe $ appJobMoveThreshold appSettings'
let MkFixed (fromInteger -> delayTime) = realToFrac moveThreshold / 2 :: Micro
liftIO $ (,) <$> getTime Monotonic <*> newDelay delayTime
atomically . asum $
[ spawnMissingWorkers
, reapDeadWorkers
] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo
++ maybeToList (manageOffloadHandler <$> mkJobOffloadHandler (appDatabaseConf appSettings') (appJobMode appSettings'))
++
[ terminateGracefully terminate'
]
where
shutdownOnException :: ((forall m'. Monad m' => m (m' ()) -> m (m' ())) -> m a) -> m a
shutdownOnException act = do
me <- myThreadId
let
routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ())
routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask'
actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask')
let handleExc e = do
$logWarnS "manageJobPool" [st|Received exception: #{displayException e}|]
atomically $ do
jState <- tryReadTMVar appJobState
for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown ()
void $ wait actAsync
throwM e
unmask (wait actAsync) `catchAll` handleExc
num :: Int
num = fromIntegral $ foundation ^. _appJobWorkers
spawnMissingWorkers, reapDeadWorkers :: STM (ContT () m ())
spawnMissingWorkers = do
shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard $ not shouldTerminate'
oldState <- takeTMVar appJobState
let missing = num - Map.size (jobWorkers oldState)
guard $ missing > 0
return $ do
$logDebugS "manageJobPool" [st|Spawning #{missing} workers|]
endo <- execWriterT . replicateM_ missing $ do
workerId <- newWorkerId
let logIdent = mkLogIdent workerId
chan <- liftIO $ newTVarIO mempty
let
streamChan = join . atomically $ do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
if
| shouldTerminate ->
return $ return ()
| otherwise -> do
mNext <- stateTVar chan $ \q -> maybe (Nothing, q) (over _1 Just) $ jqDequeue q
nextVal <- hoistMaybe mNext
return $ yield nextVal >> streamChan
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
$logInfoS logIdent "Started"
runConduit $ streamChan .| handleJobs' workerId
$logInfoS logIdent "Stopping"
worker <- lift . lift . allocateAsync $ liftIO runWorker
tell . Endo $ \cSt -> cSt
{ jobWorkers = Map.insert worker chan $ jobWorkers cSt
, jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker
}
atomically . putTMVar appJobState $ endo `appEndo` oldState
reapDeadWorkers = do
oldState <- takeTMVar appJobState
deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a
putTMVar appJobState oldState
{ jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers
}
guard . not $ Map.null deadWorkers
return . forM_ (Map.toList deadWorkers) $ \(jobAsync, _result) -> do
-- TOO MUCH LOGGING
-- case result of
-- Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|]
-- Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|]
void . lift . allocateLinkedAsync $
let go = do
next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do
let chan = jobWorkers oldState ! jobAsync
(nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan
lift . lift $ writeTVar chan newQueue
jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
return (nextVal, receiver)
whenIsJust next $ \(nextVal, receiver) -> do
atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!)
go
in go
terminateGracefully :: (() -> ContT () m ()) -> STM (ContT () m ())
terminateGracefully terminate = do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard shouldTerminate
oldState <- takeTMVar appJobState
guard $ 0 == Map.size (jobWorkers oldState)
return $ do
$logInfoS "JobPoolManager" "Shutting down"
terminate ()
transferJobs :: TimeSpec -> STM (ContT () m ())
transferJobs oldTime = do
moveThreshold <- hoistMaybe $ appJobMoveThreshold appSettings'
let isOld ts = oldTime - ts >= realToFrac moveThreshold
oldState <- readTMVar appJobState
wState <- mapM readTVar $ jobWorkers oldState
let receivers = Map.keysSet $ Map.filter ((== 0) . jqDepth) wState
senders' = Map.keysSet $ Map.filter (ianyOf jqContents $ \(_, Down qTime) _ -> isOld qTime) wState
senders = senders' `Set.difference` receivers
sendJobs = Map.restrictKeys wState senders ^.. folded . backwards jqContents . filtered jobMovable
guard $ not (null receivers)
&& not (null senders)
&& not (null sendJobs)
let movePairs = flip zip sendJobs . evalRand (uniforms receivers) . mkStdGen $ hash oldTime
iforMOf_ (_jobWorkers .> itraversed) oldState $ \w tv -> if
| w `elem` senders
-> writeTVar tv mempty
| w `elem` receivers
-> forM_ movePairs $ \(recv, j) -> if
| recv == w -> readTVar tv >>= jqInsert j >>= (writeTVar tv $!)
| otherwise -> return ()
| otherwise
-> return ()
return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|]
manageOffloadHandler :: ReaderT UniWorX m JobOffloadHandler -> STM (ContT () m ())
manageOffloadHandler spawn = do
shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
guard $ not shouldTerminate'
JobContext{jobOffload} <- jobContext <$> readTMVar appJobState
cOffload <- tryReadTMVar jobOffload
let respawn = do
nOffload <- lift $ runReaderT spawn foundation
atomically $ do
putTMVar jobOffload nOffload
whenIsJust cOffload $ \pOffload -> do
pOutgoing <- readTVar $ jobOffloadOutgoing pOffload
modifyTVar (jobOffloadOutgoing nOffload) (pOutgoing <>)
respawn <$ case cOffload of
Nothing -> return ()
Just JobOffloadHandler{..} -> waitSTM jobOffloadHandler
stopJobCtl :: MonadUnliftIO m => UniWorX -> m ()
-- ^ Stop all worker threads currently running
stopJobCtl UniWorX{appJobState} = do
didStop <- atomically $ do
jState <- tryReadTMVar appJobState
for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown ()
whenIsJust didStop $ \jSt' -> void . forkIO . atomically $ do
workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState
mapM_ (void . waitCatchSTM) $
[ jobPoolManager jSt'
, jobCron jSt'
] ++ workers
execCrontab :: RWST JobState () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWorX) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab = do
let
mergeState :: (MonadResource m, BackendCompatible SqlReadBackend backend) => RWST _ () _ (ReaderT backend m) ()
mergeState = mapRWST (withReaderT $ projectBackend @SqlReadBackend) $ do
let
mergeLastExec (Entity _leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = return ()
mergeQueued (Entity _qjId QueuedJob{..})
| Just job <- Aeson.parseMaybe parseJSON queuedJobContent
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime)
| otherwise = return ()
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
mapRWST (liftHandler . runDBRead . setSerializableReadOnlyBatch) mergeState
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
(currentCrontab, (jobCtl, nextMatch), currentState) <- mapRWST (liftIO . atomically) $ do
crontab <- liftBase . readTVar =<< asks (jobCrontab . jobContext)
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x, prevExec)
do
lastTimes <- State.get
now <- liftIO getCurrentTime
let currentCrontab' = sortOn cmpProj . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (job, getMax <$> HashMap.lookup job lastTimes, ) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
where cmpProj (j, lT, qT) = (qT, Down $ prioritiseJob j, lT, j)
crontabTVar <- asks jobCurrentCrontab
atomically . writeTVar crontabTVar $ Just (now, currentCrontab')
$logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab'
let doJob = mapRWST (liftHandler . runDBJobs) $ do
-- newCrontab <- lift $ hoist lift determineCrontab'
-- when (newCrontab /= currentCrontab) $
-- mapRWST (liftIO . atomically) $
-- liftBase . flip writeTVar newCrontab =<< asks (jobCrontab . jobContext)
newCrontab <- liftIO . readTVarIO =<< asks (jobCrontab . jobContext)
mergeState
newState <- State.get
let upToDate = and
[ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
, ((==) `on` HashMap.lookup jobCtl) newState currentState
]
when upToDate $ do
now <- liftIO getCurrentTime
foundation <- getYesod
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> lift $ queueDBJobCron job
other -> runReaderT ?? foundation $ writeJobCtl other
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
crontab <- asks $ jobCrontab . jobContext
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil crontab currentCrontab nextTime')
doJob
where
acc :: NominalDiffTime
acc = 1e-3
debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime
debouncingAcc AppSettings{appNotificationRateLimit} = \case
JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit
_ -> acc
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = truncate $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
go' (jobCtl, cron) = Just . ($ (jobCtl, t)) . maybe id (minOn cmpProj)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
cmpProj (j, qT) = ( qT
, Down $ prioritiseJob j
, getMax <$> HashMap.lookup j lastTimes
, j
)
waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = do
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
waitTime'
| diffT < acc = "Done"
| otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
$logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
if
| diffT < acc -> return True
| otherwise -> do
delay <- liftIO . newDelay . round $ waitTime / 2 * 1e6
let
awaitDelayThread = False <$ waitDelay delay
awaitCrontabChange = do
crontab' <- readTVar crontabTV
True <$ guard (crontab /= crontab')
crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
mkLogIdent :: JobWorkerId -> Text
mkLogIdent wId = "Job-Executor " <> showWorkerId wId
handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorkerState wNum JobWorkerBusy $ do
$logDebugS logIdent $ tshow jctl
res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try' $ handleCmd jctl
sentRes <- mapReaderT (liftIO . atomically) $ do
resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> case err of
SomeException err' -> $logErrorS logIdent $ tshow (typeOf err') <> ": " <> pack (displayException err') <> "JOB: " <> tshow jctl
_other -> return ()
where
logIdent = mkLogIdent wNum
try' = flip catches
[ Exc.Handler $ \(e :: AsyncException) -> throwM e
, Exc.Handler $ \(e :: SomeAsyncException) -> throwM e
#ifdef DEVELOPMENT
, Exc.Handler $ \case
MailNotAvailable -> return $ Right ()
e -> return . Left $ SomeException e
, Exc.Handler $ \SynchroniseUserdbNoUserdb -> return $ Right () -- TODO
#endif
, Exc.Handler $ \(e :: SomeException) -> return $ Left e
] . fmap Right
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd :: JobCtl -> ReaderT JobContext Handler ()
handleCmd JobCtlTest = $logDebugS logIdent "JobCtlTest"
handleCmd JobCtlFlush = do
$logDebugS logIdent "JobCtlFlush..."
maxFlush <- getsYesod $ view _appJobMaxFlush
let selectOpts = [ Asc QueuedJobCreationTime ]
& maybe id (\maxCount -> (:) (LimitTo $ fromIntegral maxCount)) maxFlush
heldLocks <- asks jobHeldLocks >>= readTVarIO
void . lift . runDB . runConduit
$ selectKeys [ QueuedJobId /<-. Set.toList heldLocks ] selectOpts
.| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
lFlushTVar <- asks jobLastFlush
atomically . modifyTVar' lFlushTVar . max . Just =<< liftIO getCurrentTime
$logInfoS logIdent "JobCtlFlush"
handleCmd (JobCtlQueue job) = do
$logDebugS logIdent "JobCtlQueue..."
lift $ queueJob' job
$logInfoS logIdent "JobCtlQueue"
handleCmd (JobCtlPerform jId) = do
jMode <- getsYesod $ view _appJobMode
case jMode of
JobsOffload -> performOffload
_otherwise -> performLocal
where
performOffload = hoist atomically $ do
JobOffloadHandler{..} <- lift . readTMVar =<< asks jobOffload
lift $ modifyTVar jobOffloadOutgoing (`snoc` jId)
performLocal = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do
content <- case fromJSON queuedJobContent of
Aeson.Success c -> return c
Aeson.Error t -> do
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
$logInfoS logIdent $ tshow content
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
instanceID' <- getsYesod $ view instanceID
now <- liftIO getCurrentTime
jMode <- getsYesod $ view _appJobMode
let cleanup = do
when (queuedJobWriteLastExec && modeWriteLastExec) $
void $ upsertBy
(UniqueCronLastExec queuedJobContent)
CronLastExec
{ cronLastExecJob = queuedJobContent
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now
, CronLastExecInstance =. instanceID'
]
delete jId
modeWriteLastExec = case jMode of
JobsDrop{..} -> jobsWriteFakeLastExec
_otherwise -> True
case jMode of
JobsDrop{} -> runDB $ setSerializableBatch cleanup
_otherwise -> case performJob content of
JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do
act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializableBatch cleanup
JobHandlerAtomicWithFinalizer act fin ->
withJobWorkerState wNum (JobWorkerExecJob content) $ do
fin <=< runDBJobs . setSerializableBatch $
act <* hoist lift cleanup
JobHandlerAtomicDeferrableWithFinalizer act fin -> do
withJobWorkerState wNum (JobWorkerExecJob content) $
fin =<< runDBRead (setSerializableDeferrableBatch act)
runDB $ setSerializableBatch cleanup
handleCmd JobCtlDetermineCrontab = do
$logDebugS logIdent "DetermineCrontab..."
newCTab <- liftHandler . runDBRead $ setSerializableReadOnlyBatch determineCrontab
$logInfoS logIdent "DetermineCrontab"
$logDebugS logIdent "PruneLastExecs..."
liftHandler . runDB $ pruneLastExecs newCTab
$logInfoS logIdent "PruneLastExecs"
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . flip writeTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport
$logDebugS logIdent [st|#{tshow kind}...|]
newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind
$logInfoS logIdent [st|#{tshow kind}: #{toPathPiece newStatus}|]
unless (newStatus > HealthFailure) $ do
$logErrorS logIdent [st|#{tshow kind}: #{tshow newReport}|]
liftIO $ do
now <- getCurrentTime
let updateReports = Set.insert (now, newReport)
. Set.filter (((/=) `on` classifyHealthReport) newReport . snd)
atomically . modifyTVar' hrStorage $ force . updateReports
handleCmd (JobCtlSleep secs@(MkFixed (fromIntegral -> msecs))) = do
$logInfoS logIdent [st|Sleeping #{tshow secs}s...|]
threadDelay msecs
$logInfoS logIdent [st|Slept #{tshow secs}s.|]
handleCmd JobCtlPrewarmCache{..} = do
prewarm <- getsYesod appFileSourcePrewarm
for_ prewarm $ \lh -> lift . runDBRead $
runConduit $ sourceFileChunkIds .| C.map E.unValue
.| awaitForever (\cRef -> handleC handleUnavailable $ sourceFileChunks (withLRU lh cRef) cRef .| C.map (cRef, ))
.| C.mapM_ (sinkChunkCache lh)
where
handleUnavailable e
| is _SourceFilesContentUnavailable e = return ()
| otherwise = throwM e
withLRU lh cRef range getChunk = do
touched <- touchLRUHandle lh (cRef, range) jcTargetTime
case touched of
Just (bs, _) -> return $ Just (bs, Nothing)
Nothing -> over (mapped . _2) Just <$> getChunk
(minBoundDgst, maxBoundDgst) = jcChunkInterval
sourceFileChunkIds = E.selectSource . E.from $ \fileContentEntry -> do
let cRef = fileContentEntry E.^. FileContentEntryChunkHash
eRef = fileContentEntry E.^. FileContentEntryHash
E.where_ . E.and $ catMaybes
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
, maxBoundDgst <&> \b -> cRef E.<. E.val b
]
E.where_ $ matchesPrewarmSource eRef jcPrewarmSource
return cRef
sinkChunkCache lh (cRef, (c, range)) = insertLRUHandle lh (cRef, range) jcTargetTime (c, ByteString.length c)
handleCmd JobCtlInhibitInject{..} = maybeT_ $ do
PrewarmCacheConf{..} <- MaybeT . getsYesod $ view _appFileSourcePrewarmConf
let inhibitInterval = IntervalMap.ClosedInterval
(addUTCTime (-precStart) jcTargetTime)
(addUTCTime (precInhibit - precStart) jcTargetTime)
sourceFileReferences = prewarmSourceReferences jcPrewarmSource
refs <- lift . lift . runDBRead . runConduit $ sourceFileReferences .| C.foldl (flip Set.insert) Set.empty
guard . not $ null refs
inhibitTVar <- getsYesod appFileInjectInhibit
atomically . modifyTVar' inhibitTVar $ force . IntervalMap.insertWith Set.union inhibitInterval refs
matchesPrewarmSource :: E.SqlExpr (E.Value FileContentReference) -> JobCtlPrewarmSource -> E.SqlExpr (E.Value Bool)
matchesPrewarmSource eRef = \case
JobCtlPrewarmSheetFile{..} -> E.or
[ E.exists . E.from $ \sheetFile ->
E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet
E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType
E.&&. sheetFile E.^. SheetFileContent E.==. E.just eRef
, E.exists . E.from $ \personalisedSheetFile ->
E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileContent E.==. E.just eRef
]
prewarmSourceReferences :: JobCtlPrewarmSource -> ConduitT () FileContentReference (ReaderT SqlReadBackend (HandlerFor UniWorX)) ()
prewarmSourceReferences = \case
JobCtlPrewarmSheetFile{..} -> (.| C.mapMaybe E.unValue) $ do
E.selectSource . E.from $ \sheetFile -> do
E.where_ $ sheetFile E.^. SheetFileSheet E.==. E.val jcpsSheet
E.&&. sheetFile E.^. SheetFileType E.==. E.val jcpsSheetFileType
E.where_ . E.isJust $ sheetFile E.^. SheetFileContent
return $ sheetFile E.^. SheetFileContent
E.selectSource . E.from $ \personalisedSheetFile -> do
E.where_ $ personalisedSheetFile E.^. PersonalisedSheetFileSheet E.==. E.val jcpsSheet
E.&&. personalisedSheetFile E.^. PersonalisedSheetFileType E.==. E.val jcpsSheetFileType
E.where_ . E.isJust $ personalisedSheetFile E.^. PersonalisedSheetFileContent
return $ personalisedSheetFile E.^. PersonalisedSheetFileContent
jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a
jLocked jId act = flip evalStateT False $ do
let
lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob)
lock = hoist (hoist $ runDB . setSerializableBatch) $ do
qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId
instanceID' <- getsYesod $ view instanceID
threshold <- getsYesod $ view _appJobStaleThreshold
now <- liftIO getCurrentTime
heldLocks <- asks jobHeldLocks
isHeld <- (jId `Set.member`) <$> readTVarIO heldLocks
hadStale <- maybeT (return False) $ do
lockTime <- MaybeT $ return queuedJobLockTime
lockInstance <- MaybeT $ return queuedJobLockInstance
if
| lockInstance == instanceID'
, diffUTCTime now lockTime >= threshold
, not isHeld
-> return True
| otherwise
-> throwM $ JLocked jId lockInstance lockTime
when hadStale .
$logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj)
State.put True
val <- lift . lift $ updateGet jId [ QueuedJobLockInstance =. Just instanceID'
, QueuedJobLockTime =. Just now
]
atomically . modifyTVar' heldLocks $ Set.insert jId
return $ Entity jId val
unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) ()
unlock (Entity jId' _) = whenM State.get $ do
atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks
lift . lift . runDB . setSerializableBatch $
update jId' [ QueuedJobLockInstance =. Nothing
, QueuedJobLockTime =. Nothing
]
bracket lock unlock $ lift . act
pruneLastExecs :: Crontab JobCtl -> DB ()
pruneLastExecs crontab = do
jMode <- getsYesod $ view _appJobMode
when (is _JobsLocal jMode) $ do
Sum deleted <- runConduit $ selectSource [] [] .| C.foldMapM ensureCrontab
when (deleted > 0) $
$logInfoS "pruneLastExeces" [st|Deleted #{deleted} entries|]
where
ensureCrontab :: Entity CronLastExec -> DB (Sum Natural)
ensureCrontab (Entity leId CronLastExec{..}) = maybeT (return mempty) $ do
now <- liftIO getCurrentTime
flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval
if
| abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2
-> return mempty
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
, not $ HashMap.member (JobCtlQueue job) crontab
-> Sum 1 <$ lift (delete leId)
| otherwise
-> return mempty
performJob :: Job -> JobHandler UniWorX
performJob = $(dispatchTH ''Job)