module Jobs ( module Types , module Jobs.Queue , handleJobs , stopJobCtl ) where import Import import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Types (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT import Data.Aeson (fromJSON) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson import Database.Persist.Sql (fromSqlKey) import Data.Semigroup (Max(..)) import Utils.Sql import Control.Monad.Random (evalRand, mkStdGen, getRandomR, uniformMay) import Cron import qualified Data.HashMap.Strict as HashMap import Data.HashMap.Strict (HashMap) import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.Map.Strict as Map import Data.Map.Strict ((!)) import Data.Foldable (foldrM) import Control.Monad.Trans.Reader (mapReaderT) import Control.Monad.Trans.Writer (execWriterT) import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST) import qualified Control.Monad.State.Class as State import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Reader.Class (MonadReader(..)) import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans.Maybe (MaybeT(..)) import Control.Monad.Trans.Cont (ContT(..), callCC) import Control.Monad.Random.Lazy (evalRandTIO, mapRandT) import Control.Monad.Logger import Data.Time.Zones import Control.Concurrent.STM (retry) import Control.Concurrent.STM.Delay import UnliftIO.Concurrent (forkIO) import Jobs.Handler.SendNotification import Jobs.Handler.SendTestEmail import Jobs.Handler.QueueNotification import Jobs.Handler.HelpRequest import Jobs.Handler.SetLogSettings import Jobs.Handler.DistributeCorrections import Jobs.Handler.SendCourseCommunication import Jobs.Handler.Invitation import Jobs.Handler.SendPasswordReset import Jobs.Handler.TransactionLog import Jobs.Handler.SynchroniseLdap import Jobs.Handler.PruneInvitations import Jobs.Handler.ChangeUserDisplayEmail import Jobs.HealthReport data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException handleJobs :: ( MonadResource m , MonadLogger m , MonadUnliftIO m , MonadMask m ) => UniWorX -> m () -- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs foundation@UniWorX{..} | foundation ^. _appJobWorkers == 0 = return () | otherwise = do UnliftIO{..} <- askUnliftIO jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> unliftIO $ manageJobPool foundation unmask jobCron <- allocateLinkedAsync . unliftIO $ manageCrontab foundation let jobWorkers = Map.empty jobWorkerName = const $ error "Unknown worker" jobCrontab <- liftIO $ newTVarIO HashMap.empty jobConfirm <- liftIO $ newTVarIO HashMap.empty jobShutdown <- liftIO newEmptyTMVarIO atomically $ putTMVar appJobState JobState { jobContext = JobContext{..} , .. } manageCrontab :: forall m. MonadResource m => UniWorX -> m () manageCrontab foundation@UniWorX{..} = do context <- atomically . fmap jobContext $ readTMVar appJobState let awaitTermination = atomically $ do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard shouldTerminate liftIO . race_ awaitTermination . unsafeHandler foundation . void $ do atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState runReaderT ?? foundation $ writeJobCtlBlock JobCtlDetermineCrontab evalRWST (forever execCrontab) context HashMap.empty manageJobPool :: forall m. ( MonadResource m , MonadLogger m , MonadUnliftIO m , MonadMask m ) => UniWorX -> (forall a. IO a -> IO a) -> m () manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ flip runContT return . forever . join . atomically $ asum [ spawnMissingWorkers , reapDeadWorkers , terminateGracefully ] where shutdownOnException :: m a -> m a shutdownOnException act = do UnliftIO{..} <- askUnliftIO actAsync <- allocateLinkedAsyncMasked $ unliftIO act let handleExc e = do atomically $ do jState <- tryReadTMVar appJobState for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown () void $ wait actAsync throwM e unmask (wait actAsync) `catchAll` handleExc num :: Int num = fromIntegral $ foundation ^. _appJobWorkers spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ()) spawnMissingWorkers = do shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard $ not shouldTerminate' oldState <- takeTMVar appJobState let missing = num - Map.size (jobWorkers oldState) guard $ missing > 0 return $ do $logDebugS "manageJobPool" [st|Spawning #{missing} workers|] endo <- execWriterT . replicateM_ missing $ do workerId <- newWorkerId let logIdent = mkLogIdent workerId chan <- liftIO $ newTVarIO mempty let streamChan = join . atomically $ do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown if | shouldTerminate -> return $ return () | otherwise -> do queue <- readTVar chan nextVal <- case jqDequeue queue of Nothing -> retry Just (j, q) -> j <$ writeTVar chan q return $ yield nextVal >> streamChan runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId $logInfoS logIdent "Stopped" worker <- lift . lift $ allocateLinkedAsync runWorker tell . Endo $ \cSt -> cSt { jobWorkers = Map.insert worker chan $ jobWorkers cSt , jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker } atomically . putTMVar appJobState $ endo `appEndo` oldState reapDeadWorkers = do oldState <- takeTMVar appJobState deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a putTMVar appJobState oldState { jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers } guard . not $ Map.null deadWorkers return . forM_ (Map.toList deadWorkers) $ \(jobAsync, result) -> do case result of Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|] Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|] void . lift . allocateLinkedAsync $ let go = do next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do let chan = jobWorkers oldState ! jobAsync (nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan lift . lift $ writeTVar chan newQueue jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers' return (nextVal, receiver) whenIsJust next $ \(nextVal, receiver) -> do atomically . modifyTVar' receiver $ jqInsert nextVal go in go terminateGracefully = do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard shouldTerminate oldState <- takeTMVar appJobState guard $ 0 == Map.size (jobWorkers oldState) return . callCC $ \terminate -> do $logInfoS "JobPoolManager" "Shutting down" terminate () stopJobCtl :: MonadUnliftIO m => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobState} = do didStop <- atomically $ do jState <- tryReadTMVar appJobState for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown () whenIsJust didStop $ \jSt' -> void . forkIO . atomically $ do workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState mapM_ (void . waitCatchSTM) $ [ jobPoolManager jSt' , jobCron jSt' ] ++ workers execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWorX) () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it execCrontab = do mapRWST (liftHandler . runDB . setSerializable) $ do let mergeLastExec (Entity _leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) | otherwise = return () mergeQueued (Entity _qjId QueuedJob{..}) | Just job <- Aeson.parseMaybe parseJSON queuedJobContent = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime) | otherwise = return () runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued refT <- liftIO getCurrentTime settings <- getsYesod appSettings' (currentCrontab, (jobCtl, nextMatch)) <- mapRWST (liftIO . atomically) $ do crontab <- liftBase . readTVar =<< asks jobCrontab State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab prevExec <- State.get case earliestJob settings prevExec crontab refT of Nothing -> liftBase retry Just (_, MatchNone) -> liftBase retry Just x -> return (crontab, x) -- do -- lastTimes <- State.get -- now <- liftIO getCurrentTime -- $logDebugS "Crontab" . intercalate "\n" . map tshow . sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do newCrontab <- lift . hoist lift $ determineCrontab' if | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab -> do now <- liftIO $ getCurrentTime foundation <- getYesod State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl case jobCtl of JobCtlQueue job -> lift $ queueDBJobCron job other -> runReaderT ?? foundation $ writeJobCtl other | otherwise -> mapRWST (liftIO . atomically) $ liftBase . void . flip swapTVar newCrontab =<< asks jobCrontab case nextMatch of MatchAsap -> doJob MatchNone -> return () MatchAt nextTime -> do JobContext{jobCrontab} <- ask nextTime' <- applyJitter jobCtl nextTime $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] logFunc <- askLoggerIO whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') doJob where acc :: NominalDiffTime acc = 1e-3 debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime debouncingAcc AppSettings{appNotificationRateLimit} = \case JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit _ -> acc applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime applyJitter seed t = do appInstance <- getsYesod appInstanceID let halfRange = truncate $ 0.5 / acc diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) return $ addUTCTime diff t earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab where go' (jobCtl, cron) mbPrev | Just (_, t') <- mbPrev , t' < t = mbPrev | otherwise = Just (jobCtl, t) where t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool waitUntil crontabTV crontab nextTime = runResourceT $ do diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc waitTime' | diffT < acc = "Done" | otherwise = tshow (realToFrac waitTime :: NominalDiffTime) $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|] if | diffT < acc -> return True | otherwise -> do delay <- liftIO . newDelay . round $ waitTime * 1e6 let awaitDelayThread = False <$ waitDelay delay awaitCrontabChange = do crontab' <- readTVar crontabTV True <$ guard (crontab /= crontab') crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged mkLogIdent :: JobWorkerId -> Text mkLogIdent wId = "Job-Executor " <> showWorkerId wId handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) () handleJobs' wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent $ tshow jctl res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl sentRes <- mapReaderT (liftIO . atomically) $ do resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err | not sentRes -> $logErrorS logIdent $ tshow err _other -> return () where logIdent = mkLogIdent wNum handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) handleCmd (JobCtlQueue job) = lift $ queueJob' job handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content instanceID' <- getsYesod $ view instanceID now <- liftIO getCurrentTime performJob content -- `performJob` is expected to throw an exception if it detects that the job was not done runDB . setSerializable $ do when queuedJobWriteLastExec $ void $ upsertBy (UniqueCronLastExec queuedJobContent) CronLastExec { cronLastExecJob = queuedJobContent , cronLastExecTime = now , cronLastExecInstance = instanceID' } [ CronLastExecTime =. now , CronLastExecInstance =. instanceID' ] delete jId handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandler . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ lift . void . flip swapTVar newCTab =<< asks jobCrontab handleCmd (JobCtlGenerateHealthReport kind) = do hrStorage <- getsYesod appHealthReport newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind $logInfoS (tshow kind) $ toPathPiece newStatus unless (newStatus == HealthSuccess) $ do $logErrorS (tshow kind) $ tshow newReport liftIO $ do now <- getCurrentTime let updateReports = Set.insert (now, newReport) . Set.filter (((/=) `on` classifyHealthReport) newReport . snd) atomically . modifyTVar' hrStorage $ force . updateReports jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False let lock = runDB . setSerializable $ do qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId instanceID' <- getsYesod $ view instanceID threshold <- getsYesod $ view _appJobStaleThreshold now <- liftIO getCurrentTime hadStale <- maybeT (return False) $ do lockTime <- MaybeT $ return queuedJobLockTime lockInstance <- MaybeT $ return queuedJobLockInstance if | lockInstance == instanceID' , diffUTCTime now lockTime >= threshold -> return True | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow qj val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID' , QueuedJobLockTime =. Just now ] liftIO . atomically $ writeTVar hasLock True return val unlock = whenM (liftIO . atomically $ readTVar hasLock) $ runDB . setSerializable $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] bracket lock (const unlock) act pruneLastExecs :: Crontab JobCtl -> DB () pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab where ensureCrontab (Entity leId CronLastExec{..}) = void . runMaybeT $ do now <- liftIO getCurrentTime flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval if | abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2 -> return () | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob , not $ HashMap.member (JobCtlQueue job) crontab -> lift $ delete leId | otherwise -> return () determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab performJob :: Job -> HandlerFor UniWorX () performJob = $(dispatchTH ''Job)