This repository has been archived on 2024-10-24. You can view files and clone it, but cannot push or open issues or pull requests.
fradrive-old/src/Jobs.hs
2018-10-14 15:00:01 +02:00

517 lines
22 KiB
Haskell

{-# LANGUAGE NoImplicitPrelude
, RecordWildCards
, TemplateHaskell
, OverloadedStrings
, FlexibleContexts
, ViewPatterns
, TypeFamilies
, DeriveGeneric
, DeriveDataTypeable
, QuasiQuotes
, NamedFieldPuns
, MultiWayIf
, NumDecimals
#-}
module Jobs
( module Types
, writeJobCtl
, queueJob, queueJob'
, handleJobs
) where
import Import hiding ((.=), Proxy)
import Handler.Utils.Mail
import Handler.Utils.DateTime
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Types (JobCtl(JobCtlQueue))
import Data.Conduit.TMChan
import qualified Data.Conduit.List as C
import qualified Data.Text.Lazy as LT
import Data.Aeson (fromJSON, toJSON)
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson
import Database.Persist.Sql (executeQQ, fromSqlKey, transactionSave)
import Data.Monoid (Last(..))
import Data.Semigroup (Max(..))
import Utils.Lens
import Control.Monad.Random (evalRand, uniform, mkStdGen)
import qualified Database.Esqueleto as E
import qualified Data.CaseInsensitive as CI
import Text.Shakespeare.Text
import Text.Hamlet
import Cron
import qualified Data.HashMap.Strict as HashMap
import Data.HashMap.Strict (HashMap)
import Data.List.NonEmpty (NonEmpty, nonEmpty)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Foldable (foldrM)
import Control.Monad.Trans.Reader (mapReaderT)
import Control.Monad.Trans.Writer (WriterT, execWriterT)
import Control.Monad.Trans.State (StateT, evalStateT, mapStateT)
import qualified Control.Monad.State.Class as State
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Trans.Resource (MonadResourceBase, ResourceT, runResourceT, allocate)
import Control.Monad.Trans.Maybe (MaybeT(..))
import Control.Monad.Logger
import Control.Monad.Random (MonadRandom(..), evalRand)
import Data.Time.Clock
import Data.Time.Zones
import Control.Concurrent.STM (retry)
import Database.PostgreSQL.Simple (sqlErrorHint)
import Control.Monad.Catch (handleIf)
data JobQueueException = JInvalid QueuedJobId QueuedJob
| JLocked QueuedJobId InstanceId UTCTime
| JNonexistant QueuedJobId
deriving (Read, Show, Eq, Generic, Typeable)
instance Exception JobQueueException
handleJobs :: (MonadResource m, MonadIO m) => [TMChan JobCtl] -> UniWorX -> m ()
-- | Read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs recvChans foundation@UniWorX{..} = do
jobCrontab <- liftIO $ newTVarIO HashMap.empty
jobConfirm <- liftIO $ newTVarIO HashMap.empty
forM_ (zip [1..] recvChans) $ \(n, chan) ->
let
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
doFork = fork . unsafeHandler foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' n
in void $ allocate (liftIO doFork) (liftIO . killThread)
-- Start cron operation
void $ allocate (liftIO . fork . unsafeHandler foundation $ runReaderT execCrontab JobContext{..}) (liftIO . killThread)
liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $
writeJobCtlBlock JobCtlDetermineCrontab
execCrontab :: ReaderT JobContext (HandlerT UniWorX IO) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab = flip evalStateT HashMap.empty . forever $ do
mapStateT (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
now <- liftIO getCurrentTime
(currentCrontab, (jobCtl, nextMatch)) <- mapStateT (mapReaderT $ liftIO . atomically) $ do
crontab <- liftBase . readTVar =<< asks jobCrontab
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob prevExec crontab now of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
let doJob = do
mJid <- mapStateT (mapReaderT $ liftHandlerT . runDB . setSerializable) $ do
newCrontab <- lift . lift $ determineCrontab
if
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID <- getsYesod appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
lift . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID
}
[ CronLastExecTime =. now ]
Just <$> lift (lift $ queueJobUnsafe job)
other -> Nothing <$ writeJobCtl other
| otherwise
-> lift . fmap (const Nothing) . mapReaderT (liftIO . atomically) $
lift . flip writeTVar newCrontab =<< asks jobCrontab
maybe (return ()) (writeJobCtl . JobCtlPerform) mJid
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
where
acc :: NominalDiffTime
acc = 1e-3
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = truncate $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob lastTimes crontab now = foldr go Nothing $ HashMap.toList crontab
where
go (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) now cron
waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = runResourceT $ do
diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime
let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc
waitTime'
| diffT < acc = "Done"
| otherwise = tshow (realToFrac waitTime :: NominalDiffTime)
$logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|]
if
| diffT < acc -> return True
| otherwise -> do
retVar <- liftIO newEmptyTMVarIO
void $ allocate (liftIO $ forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar) (liftIO . killThread)
let
awaitDelayThread = False <$ takeTMVar retVar
awaitCrontabChange = do
crontab' <- readTVar crontabTV
True <$ guard (crontab /= crontab')
crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
handleJobs' :: Int -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
resVars <- mapReaderT (liftIO . atomically) $
HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
_other -> return ()
where
logIdent = "Jobs #" <> tshow wNum
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of
Aeson.Success c -> return c
Aeson.Error t -> do
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
performJob content
-- `performJob` is expected to throw an exception if it detects that the job was not done
runDB $ delete jId
handleCmd JobCtlDetermineCrontab = do
newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab
-- $logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . flip writeTVar newCTab =<< asks jobCrontab
jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a
jLocked jId act = do
hasLock <- liftIO $ newTVarIO False
let
lock = runDB . setSerializable $ do
qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId
instanceID <- getsYesod appInstanceID
threshold <- getsYesod $ appJobStaleThreshold . appSettings
now <- liftIO getCurrentTime
hadStale <- maybeT (return False) $ do
lockTime <- MaybeT $ return queuedJobLockTime
lockInstance <- MaybeT $ return queuedJobLockInstance
if
| lockInstance == instanceID
, diffUTCTime now lockTime >= threshold
-> return True
| otherwise
-> throwM $ JLocked jId lockInstance lockTime
when hadStale .
$logWarnS "Jobs" $ "Ignored stale lock: " <> tshow qj
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID
, QueuedJobLockTime =. Just now
]
liftIO . atomically $ writeTVar hasLock True
return val
unlock = whenM (liftIO . atomically $ readTVar hasLock) $
runDB . setSerializable $
update jId [ QueuedJobLockInstance =. Nothing
, QueuedJobLockTime =. Nothing
]
bracket lock (const unlock) act
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
writeJobCtl cmd = do
tid <- liftIO myThreadId
chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl
liftIO . atomically $ writeTMChan chan cmd
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
writeJobCtlBlock cmd = do
getResVar <- asks jobConfirm
resVar <- liftIO . atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
lift $ writeJobCtl cmd
let
removeResVar = HashMap.update (nonEmpty . NonEmpty.filter (/= resVar)) cmd
mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do
now <- liftIO getCurrentTime
self <- getsYesod appInstanceID
insert QueuedJob
{ queuedJobContent = toJSON job
, queuedJobCreationInstance = self
, queuedJobCreationTime = now
, queuedJobLockInstance = Nothing
, queuedJobLockTime = Nothing
}
-- We should not immediately notify a worker; instead wait for the transaction to finish first
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
-- return jId
queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `JobCtlPerform`
queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
setSerializable :: DB a -> DB a
setSerializable act = setSerializable' (0 :: Integer)
where
act' = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act
setSerializable' (min 10 -> logBackoff) =
handleIf
(\e -> sqlErrorHint e == "The transaction might succeed if retried.")
(\e -> $logWarnS "SQL" (tshow e) *> threadDelay (1e3 * 2 ^ logBackoff) *> setSerializable' (succ logBackoff))
act'
pruneLastExecs :: Crontab JobCtl -> DB ()
pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab
where
ensureCrontab (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
, HashMap.member (JobCtlQueue job) crontab
= return ()
| otherwise = delete leId
determineCrontab :: DB (Crontab JobCtl)
-- ^ Extract all future jobs from the database (sheet deadlines, ...)
determineCrontab = (\ct -> ct <$ pruneLastExecs ct) <=< execWriterT $ do
AppSettings{..} <- getsYesod appSettings
case appJobFlushInterval of
Just interval -> tell $ HashMap.singleton
JobCtlFlush
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = interval
}
Nothing -> return ()
now <- liftIO getCurrentTime
tell $ HashMap.singleton
JobCtlDetermineCrontab
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = appJobCronInterval
}
let
sheetJobs (Entity nSheet Sheet{..}) = do
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetActive{..})
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ sheetActiveFrom
, cronRepeat = CronRepeatOnChange -- Allow repetition of the notification (if something changes), but wait at least an hour
, cronRateLimit = appNotificationRateLimit
}
tell $ HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationSheetInactive{..})
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ . max sheetActiveFrom $ addUTCTime (-nominalDay) sheetActiveTo
, cronRepeat = CronRepeatOnChange
, cronRateLimit = appNotificationRateLimit
}
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ sheetJobs
determineNotificationCandidates :: Notification -> DB [Entity User]
determineNotificationCandidates NotificationSubmissionRated{..} = E.select . E.from $ \(user `E.InnerJoin` submissionUser) -> do
E.on $ user E.^. UserId E.==. submissionUser E.^. SubmissionUserUser
E.where_ $ submissionUser E.^. SubmissionUserSubmission E.==. E.val nSubmission
return user
determineNotificationCandidates NotificationSheetActive{..} = E.select . E.from $ \(user `E.InnerJoin` courseParticipant `E.InnerJoin` sheet) -> do
E.on $ sheet E.^. SheetCourse E.==. courseParticipant E.^. CourseParticipantCourse
E.on $ user E.^. UserId E.==. courseParticipant E.^. CourseParticipantUser
E.where_ $ sheet E.^. SheetId E.==. E.val nSheet
return user
determineNotificationCandidates NotificationSheetInactive{..} = E.select . E.from $ \(user `E.InnerJoin` courseParticipant `E.InnerJoin` sheet) -> do
E.on $ sheet E.^. SheetCourse E.==. courseParticipant E.^. CourseParticipantCourse
E.on $ user E.^. UserId E.==. courseParticipant E.^. CourseParticipantUser
E.where_ $ sheet E.^. SheetId E.==. E.val nSheet
return user
classifyNotification :: Notification -> DB NotificationTrigger
classifyNotification NotificationSubmissionRated{..} = do
Sheet{sheetType} <- belongsToJust submissionSheet =<< getJust nSubmission
return $ case sheetType of
NotGraded -> NTSubmissionRated
_other -> NTSubmissionRatedGraded
classifyNotification NotificationSheetActive{} = return NTSheetActive
classifyNotification NotificationSheetInactive{} = return NTSheetInactive
performJob :: Job -> HandlerT UniWorX IO ()
performJob JobQueueNotification{jNotification} = do
jIds <- runDB. setSerializable $ do
candidates <- determineNotificationCandidates jNotification
nClass <- classifyNotification jNotification
mapM (queueJobUnsafe . flip JobSendNotification jNotification) $ do
Entity uid User{userNotificationSettings} <- candidates
guard $ notificationAllowed userNotificationSettings nClass
return uid
forM_ jIds $ writeJobCtl . JobCtlPerform
performJob JobSendNotification{ jNotification = NotificationSubmissionRated{..}, jRecipient } = userMailT jRecipient $ do
(Course{..}, Sheet{..}, Submission{..}, corrector) <- liftHandlerT . runDB $ do
submission@Submission{submissionRatingBy} <- getJust nSubmission
sheet <- belongsToJust submissionSheet submission
course <- belongsToJust sheetCourse sheet
corrector <- traverse getJust submissionRatingBy
return (course, sheet, submission, corrector)
setSubjectI $ MsgMailSubjectSubmissionRated courseShorthand
csid <- encrypt nSubmission
MsgRenderer mr <- getMailMsgRenderer
let termDesc = mr . ShortTermIdentifier $ unTermKey courseTerm
submissionRatingTime' <- traverse (formatTimeMail SelFormatDateTime) submissionRatingTime
let tid = courseTerm
ssh = courseSchool
csh = courseShorthand
shn = sheetName
-- TODO: provide convienience template-haskell for `addAlternatives`
addAlternatives $ do
provideAlternative $ Aeson.object
[ "submission" Aeson..= ciphertext csid
, "submission-rating-points" Aeson..= submissionRatingPoints
, "submission-rating-comment" Aeson..= submissionRatingComment
, "submission-rating-time" Aeson..= submissionRatingTime
, "submission-rating-by" Aeson..= (userDisplayName <$> corrector)
, "submission-rating-passed" Aeson..= ((>=) <$> submissionRatingPoints <*> preview _passingPoints sheetType)
, "sheet-name" Aeson..= sheetName
, "sheet-type" Aeson..= sheetType
, "course-name" Aeson..= courseName
, "course-shorthand" Aeson..= courseShorthand
, "course-term" Aeson..= courseTerm
, "course-school" Aeson..= courseSchool
]
-- provideAlternative $ \(MsgRenderer mr) -> ($(textFile "templates/mail/submissionRated.txt") :: TextUrl (Route UniWorX)) -- textFile does not support control statements
providePreferredAlternative ($(ihamletFile "templates/mail/submissionRated.hamlet") :: HtmlUrlI18n UniWorXMessage (Route UniWorX))
performJob JobSendNotification{ jNotification = NotificationSheetActive{..}, jRecipient } = userMailT jRecipient $ do
(Course{..}, Sheet{..}) <- liftHandlerT . runDB $ do
sheet <- getJust nSheet
course <- belongsToJust sheetCourse sheet
return (course, sheet)
setSubjectI $ MsgMailSubjectSheetActive courseShorthand sheetName
MsgRenderer mr <- getMailMsgRenderer
let termDesc = mr . ShortTermIdentifier $ unTermKey courseTerm
tid = courseTerm
ssh = courseSchool
csh = courseShorthand
shn = sheetName
addAlternatives $ do
providePreferredAlternative ($(ihamletFile "templates/mail/sheetActive.hamlet") :: HtmlUrlI18n UniWorXMessage (Route UniWorX))
performJob JobSendNotification{ jNotification = NotificationSheetInactive{..}, jRecipient } = userMailT jRecipient $ do
(Course{..}, Sheet{..}) <- liftHandlerT . runDB $ do
sheet <- getJust nSheet
course <- belongsToJust sheetCourse sheet
return (course, sheet)
setSubjectI $ MsgMailSubjectSheetInactive courseShorthand sheetName
MsgRenderer mr <- getMailMsgRenderer
let termDesc = mr . ShortTermIdentifier $ unTermKey courseTerm
tid = courseTerm
ssh = courseSchool
csh = courseShorthand
shn = sheetName
addAlternatives $ do
providePreferredAlternative ($(ihamletFile "templates/mail/sheetInactive.hamlet") :: HtmlUrlI18n UniWorXMessage (Route UniWorX))
performJob JobSendTestEmail{..} = mailT jMailContext $ do
_mailTo .= [Address Nothing jEmail]
setSubjectI MsgMailTestSubject
now <- liftIO getCurrentTime
nDT <- formatTimeMail SelFormatDateTime now
nD <- formatTimeMail SelFormatDate now
nT <- formatTimeMail SelFormatTime now
addPart $ \(MsgRenderer mr) -> ([text|
#{mr MsgMailTestContent}
#{mr MsgMailTestDateTime}
* #{nDT}
* #{nD}
* #{nT}
|] :: TextUrl (Route UniWorX))