module Jobs ( module Types , module Jobs.Queue , handleJobs , stopJobCtl ) where import Import import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT import Data.Aeson (fromJSON) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson import Control.Monad.Random (evalRand, mkStdGen, uniformMay) import Cron import qualified Data.HashMap.Strict as HashMap import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.Map.Strict as Map import Data.Map.Strict ((!)) import Control.Monad.Trans.RWS.Lazy (RWST, mapRWST, evalRWST) import qualified Control.Monad.State.Class as State import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Cont (ContT(..), callCC) import Control.Monad.Random.Lazy (evalRandTIO, mapRandT) import Control.Monad.Logger import qualified Control.Monad.Catch as Exc import Data.Time.Zones import Control.Concurrent.STM (retry) import Control.Concurrent.STM.Delay import UnliftIO.Concurrent (forkIO, myThreadId) import Jobs.Handler.SendNotification import Jobs.Handler.SendTestEmail import Jobs.Handler.QueueNotification import Jobs.Handler.HelpRequest import Jobs.Handler.SetLogSettings import Jobs.Handler.DistributeCorrections import Jobs.Handler.SendCourseCommunication import Jobs.Handler.Invitation import Jobs.Handler.SendPasswordReset import Jobs.Handler.TransactionLog import Jobs.Handler.SynchroniseLdap import Jobs.Handler.PruneInvitations import Jobs.Handler.ChangeUserDisplayEmail import Jobs.Handler.Files import Jobs.Handler.PersonalisedSheetFiles import Jobs.HealthReport import Control.Exception.Base (AsyncException) import Type.Reflection (typeOf) data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException handleJobs :: ( MonadResource m , MonadLogger m , MonadUnliftIO m , MonadMask m ) => UniWorX -> m () -- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs foundation@UniWorX{..} | foundation ^. _appJobWorkers == 0 = return () | otherwise = do jobPoolManager <- allocateLinkedAsyncWithUnmask $ manageJobPool foundation jobCron <- allocateLinkedAsyncWithUnmask $ manageCrontab foundation let jobWorkers = Map.empty jobWorkerName = const $ error "Unknown worker" jobCrontab <- liftIO $ newTVarIO HashMap.empty jobConfirm <- liftIO $ newTVarIO HashMap.empty jobShutdown <- liftIO newEmptyTMVarIO jobCurrentCrontab <- liftIO $ newTVarIO Nothing atomically $ putTMVar appJobState JobState { jobContext = JobContext{..} , .. } manageCrontab :: forall m. ( MonadResource m , MonadUnliftIO m ) => UniWorX -> (forall a. m a -> m a) -> m () manageCrontab foundation@UniWorX{..} unmask = do ch <- allocateLinkedAsync $ do jState <- atomically $ readTMVar appJobState liftIO . unsafeHandler foundation . void $ do atomically . assertM_ (not . Map.null . jobWorkers) $ readTMVar appJobState runReaderT ?? foundation $ writeJobCtlBlock JobCtlDetermineCrontab void $ evalRWST (forever execCrontab) jState HashMap.empty let awaitTermination = guardM $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown handleAny (throwTo $ asyncThreadId ch) . unmask $ do doCancel <- atomically $ asum [ False <$ waitSTM ch , True <$ awaitTermination ] when doCancel $ cancel ch manageJobPool :: forall m. ( MonadResource m , MonadLogger m , MonadUnliftIO m , MonadMask m ) => UniWorX -> (forall a. m a -> m a) -> m () manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> flip runContT return . callCC $ \terminate' -> forever . join . lift . routeExc . atomically $ asum [ spawnMissingWorkers , reapDeadWorkers , terminateGracefully terminate' ] where shutdownOnException :: ((forall m'. Monad m' => m (m' ()) -> m (m' ())) -> m a) -> m a shutdownOnException act = do me <- myThreadId let routeExc :: forall m'. Monad m' => (forall b. m b -> m b) -> m (m' ()) -> m (m' ()) routeExc unmask' = handleAll (\exc -> return () <$ throwTo me exc) . unmask' actAsync <- allocateLinkedAsyncWithUnmask $ \unmask' -> act (routeExc unmask') let handleExc e = do $logWarnS "manageJobPool" [st|Received exception: #{displayException e}|] atomically $ do jState <- tryReadTMVar appJobState for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown () void $ wait actAsync throwM e unmask (wait actAsync) `catchAll` handleExc num :: Int num = fromIntegral $ foundation ^. _appJobWorkers spawnMissingWorkers, reapDeadWorkers :: STM (ContT () m ()) spawnMissingWorkers = do shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard $ not shouldTerminate' oldState <- takeTMVar appJobState let missing = num - Map.size (jobWorkers oldState) guard $ missing > 0 return $ do $logDebugS "manageJobPool" [st|Spawning #{missing} workers|] endo <- execWriterT . replicateM_ missing $ do workerId <- newWorkerId let logIdent = mkLogIdent workerId chan <- liftIO $ newTVarIO mempty let streamChan = join . atomically $ do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown if | shouldTerminate -> return $ return () | otherwise -> do queue <- readTVar chan nextVal <- case jqDequeue queue of Nothing -> retry Just (j, q) -> j <$ writeTVar chan q return $ yield nextVal >> streamChan runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId $logInfoS logIdent "Stopping" worker <- lift . lift . allocateAsync $ liftIO runWorker tell . Endo $ \cSt -> cSt { jobWorkers = Map.insert worker chan $ jobWorkers cSt , jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker } atomically . putTMVar appJobState $ endo `appEndo` oldState reapDeadWorkers = do oldState <- takeTMVar appJobState deadWorkers <- fmap (Map.fromList . catMaybes) . forM (Map.keys $ jobWorkers oldState) $ \a -> fmap (a,) <$> pollSTM a putTMVar appJobState oldState { jobWorkers = jobWorkers oldState `Map.withoutKeys` Map.keysSet deadWorkers } guard . not $ Map.null deadWorkers return . forM_ (Map.toList deadWorkers) $ \(jobAsync, result) -> do case result of Right () -> $logInfoS "JobPoolManager" [st|Job-Executor #{showWorkerId (jobWorkerName oldState jobAsync)} terminated|] Left e -> $logErrorS "JobPoolManager" [st|Job-Executer #{showWorkerId (jobWorkerName oldState jobAsync)} crashed: #{tshow e}|] void . lift . allocateLinkedAsync $ let go = do next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do let chan = jobWorkers oldState ! jobAsync (nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan lift . lift $ writeTVar chan newQueue jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers' return (nextVal, receiver) whenIsJust next $ \(nextVal, receiver) -> do atomically . modifyTVar' receiver $ jqInsert nextVal go in go terminateGracefully terminate = do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard shouldTerminate oldState <- takeTMVar appJobState guard $ 0 == Map.size (jobWorkers oldState) return $ do $logInfoS "JobPoolManager" "Shutting down" terminate () stopJobCtl :: MonadUnliftIO m => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobState} = do didStop <- atomically $ do jState <- tryReadTMVar appJobState for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown () whenIsJust didStop $ \jSt' -> void . forkIO . atomically $ do workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState mapM_ (void . waitCatchSTM) $ [ jobPoolManager jSt' , jobCron jSt' ] ++ workers execCrontab :: RWST JobState () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWorX) () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it execCrontab = do let mergeState :: MonadResource m => RWST _ () _ (ReaderT SqlBackend m) () mergeState = do let mergeLastExec (Entity _leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) | otherwise = return () mergeQueued (Entity _qjId QueuedJob{..}) | Just job <- Aeson.parseMaybe parseJSON queuedJobContent = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime) | otherwise = return () runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued mapRWST (liftHandler . runDB . setSerializable) mergeState refT <- liftIO getCurrentTime settings <- getsYesod appSettings' (currentCrontab, (jobCtl, nextMatch), currentState) <- mapRWST (liftIO . atomically) $ do crontab <- liftBase . readTVar =<< asks (jobCrontab . jobContext) State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab prevExec <- State.get case earliestJob settings prevExec crontab refT of Nothing -> liftBase retry Just (_, MatchNone) -> liftBase retry Just x -> return (crontab, x, prevExec) do lastTimes <- State.get now <- liftIO getCurrentTime let currentCrontab' = sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron crontabTVar <- asks jobCurrentCrontab atomically . writeTVar crontabTVar $ Just (now, currentCrontab') $logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab' let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do newCrontab <- lift $ hoist lift determineCrontab' when (newCrontab /= currentCrontab) $ mapRWST (liftIO . atomically) $ liftBase . void . flip swapTVar newCrontab =<< asks (jobCrontab . jobContext) mergeState newState <- State.get let upToDate = and [ ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab , ((==) `on` HashMap.lookup jobCtl) newState currentState ] when upToDate $ do now <- liftIO getCurrentTime foundation <- getYesod State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl case jobCtl of JobCtlQueue job -> lift $ queueDBJobCron job other -> runReaderT ?? foundation $ writeJobCtl other case nextMatch of MatchAsap -> doJob MatchNone -> return () MatchAt nextTime -> do crontab <- asks $ jobCrontab . jobContext nextTime' <- applyJitter jobCtl nextTime $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] logFunc <- askLoggerIO whenM (liftIO . flip runLoggingT logFunc $ waitUntil crontab currentCrontab nextTime') doJob where acc :: NominalDiffTime acc = 1e-3 debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime debouncingAcc AppSettings{appNotificationRateLimit} = \case JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit _ -> acc applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime applyJitter seed t = do appInstance <- getsYesod appInstanceID let halfRange = truncate $ 0.5 / acc diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) return $ addUTCTime diff t earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab where go' (jobCtl, cron) mbPrev | Just (_, t') <- mbPrev , t' < t = mbPrev | otherwise = Just (jobCtl, t) where t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool waitUntil crontabTV crontab nextTime = do diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc waitTime' | diffT < acc = "Done" | otherwise = tshow (realToFrac waitTime :: NominalDiffTime) $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|] if | diffT < acc -> return True | otherwise -> do delay <- liftIO . newDelay . round $ waitTime * 1e6 let awaitDelayThread = False <$ waitDelay delay awaitCrontabChange = do crontab' <- readTVar crontabTV True <$ guard (crontab /= crontab') crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged mkLogIdent :: JobWorkerId -> Text mkLogIdent wId = "Job-Executor " <> showWorkerId wId handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) () handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do $logDebugS logIdent $ tshow jctl res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try' $ handleCmd jctl sentRes <- mapReaderT (liftIO . atomically) $ do resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err | not sentRes -> case err of SomeException err' -> $logErrorS logIdent $ tshow (typeOf err') <> ": " <> pack (displayException err') _other -> return () where logIdent = mkLogIdent wNum try' = flip catches [ Exc.Handler $ \(e :: AsyncException) -> throwM e , Exc.Handler $ \(e :: SomeAsyncException) -> throwM e #ifdef DEVELOPMENT , Exc.Handler $ \case MailNotAvailable -> return $ Right () e -> return . Left $ SomeException e , Exc.Handler $ \SynchroniseLdapNoLdap -> return $ Right () #endif , Exc.Handler $ \(e :: SomeException) -> return $ Left e ] . fmap Right handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) handleCmd (JobCtlQueue job) = lift $ queueJob' job handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content instanceID' <- getsYesod $ view instanceID now <- liftIO getCurrentTime let cleanup = do when queuedJobWriteLastExec $ void $ upsertBy (UniqueCronLastExec queuedJobContent) CronLastExec { cronLastExecJob = queuedJobContent , cronLastExecTime = now , cronLastExecInstance = instanceID' } [ CronLastExecTime =. now , CronLastExecInstance =. instanceID' ] delete jId case performJob content of JobHandlerAtomic act -> runDBJobs . setSerializable $ do act & withJobWorkerState wNum (JobWorkerExecJob content) hoist lift cleanup JobHandlerException act -> do act & withJobWorkerState wNum (JobWorkerExecJob content) runDB $ setSerializable cleanup handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandler . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ lift . void . flip swapTVar newCTab =<< asks jobCrontab handleCmd (JobCtlGenerateHealthReport kind) = do hrStorage <- getsYesod appHealthReport newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind $logInfoS (tshow kind) $ toPathPiece newStatus unless (newStatus == HealthSuccess) $ do $logErrorS (tshow kind) $ tshow newReport liftIO $ do now <- getCurrentTime let updateReports = Set.insert (now, newReport) . Set.filter (((/=) `on` classifyHealthReport) newReport . snd) atomically . modifyTVar' hrStorage $ force . updateReports jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False let lock = runDB . setSerializable $ do qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId instanceID' <- getsYesod $ view instanceID threshold <- getsYesod $ view _appJobStaleThreshold now <- liftIO getCurrentTime hadStale <- maybeT (return False) $ do lockTime <- MaybeT $ return queuedJobLockTime lockInstance <- MaybeT $ return queuedJobLockInstance if | lockInstance == instanceID' , diffUTCTime now lockTime >= threshold -> return True | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj) val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID' , QueuedJobLockTime =. Just now ] liftIO . atomically $ writeTVar hasLock True return val unlock = whenM (liftIO . atomically $ readTVar hasLock) $ runDB . setSerializable $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] bracket lock (const unlock) act pruneLastExecs :: Crontab JobCtl -> DB () pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab where ensureCrontab (Entity leId CronLastExec{..}) = void . runMaybeT $ do now <- liftIO getCurrentTime flushInterval <- MaybeT . getsYesod . view $ appSettings . _appJobFlushInterval if | abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2 -> return () | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob , not $ HashMap.member (JobCtlQueue job) crontab -> lift $ delete leId | otherwise -> return () determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab performJob :: Job -> JobHandler UniWorX performJob = $(dispatchTH ''Job)