module Jobs ( module Types , module Jobs.Queue , handleJobs , stopJobCtl ) where import Import import Utils.Lens import Handler.Utils import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Types (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab import Data.Conduit.TMChan import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT import Data.Aeson (fromJSON, toJSON) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson import Database.Persist.Sql (fromSqlKey) import Data.Semigroup (Max(..)) import Utils.Sql import Control.Monad.Random (evalRand, mkStdGen, getRandomR) import Cron import qualified Data.HashMap.Strict as HashMap import Data.HashMap.Strict (HashMap) import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.Map.Strict as Map import Data.Foldable (foldrM) import Control.Monad.Trans.Reader (mapReaderT) import Control.Monad.Trans.State (evalStateT, mapStateT) import qualified Control.Monad.State.Class as State import Control.Monad.Reader.Class (MonadReader(..)) import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT, allocate, release) import Control.Monad.Trans.Maybe (MaybeT(..)) import Control.Monad.Logger import Data.Time.Zones import Control.Concurrent.STM (retry) import Jobs.Handler.SendNotification import Jobs.Handler.SendTestEmail import Jobs.Handler.QueueNotification import Jobs.Handler.HelpRequest import Jobs.Handler.SetLogSettings import Jobs.Handler.DistributeCorrections import Jobs.Handler.SendCourseCommunication import Jobs.Handler.Invitation import Jobs.HealthReport data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException handleJobs :: ( MonadResource m , MonadIO m ) => UniWorX -> m () -- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs foundation@UniWorX{..} = do let num = foundation ^. _appJobWorkers jobCrontab <- liftIO $ newTMVarIO HashMap.empty jobConfirm <- liftIO $ newTVarIO HashMap.empty forM_ [1..num] $ \n -> do (bChan, chan) <- atomically $ newBroadcastTMChan >>= (\c -> (c, ) <$> dupTMChan c) let logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n (_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan) atomically . modifyTVar' appJobCtl $ Map.insert tId bChan -- Start cron operation when (num > 0) $ do registeredCron <- liftIO newEmptyTMVarIO let execCrontab' = whenM (atomically $ readTMVar registeredCron) $ runReaderT (execCrontab foundation) JobContext{..} unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab) registeredCron' <- atomically $ do registeredCron' <- tryPutTMVar appCronThread cData registeredCron' <$ putTMVar registeredCron registeredCron' when registeredCron' $ liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $ writeJobCtlBlock JobCtlDetermineCrontab stopJobCtl :: MonadIO m => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobCtl, appCronThread} = do mcData <- atomically $ tryReadTMVar appCronThread whenIsJust mcData $ \(rKey, _) -> do liftIO $ release rKey atomically . guardM $ isEmptyTMVar appCronThread wMap <- liftIO $ readTVarIO appJobCtl atomically $ forM_ wMap closeTMChan atomically $ do wMap' <- readTVar appJobCtl guard . none (`Map.member` wMap') $ Map.keysSet wMap execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it execCrontab foundation = evalStateT go HashMap.empty where go = do cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do mapStateT (liftHandlerT . runDB . setSerializable) $ do let merge (Entity leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) | otherwise = lift $ delete leId runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge refT <- liftIO getCurrentTime settings <- getsYesod appSettings' currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab case crontab' of Nothing -> return Nothing Just crontab -> Just <$> do State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab prevExec <- State.get case earliestJob settings prevExec crontab refT of Nothing -> liftBase retry Just (_, MatchNone) -> liftBase retry Just x -> return (crontab, x) case currentState of Nothing -> return False Just (currentCrontab, (jobCtl, nextMatch)) -> do let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do newCrontab <- lift . lift . hoist lift $ determineCrontab' if | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab -> do now <- liftIO $ getCurrentTime instanceID' <- getsYesod appInstanceID State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl case jobCtl of JobCtlQueue job -> do void . lift . lift $ upsertBy (UniqueCronLastExec $ toJSON job) CronLastExec { cronLastExecJob = toJSON job , cronLastExecTime = now , cronLastExecInstance = instanceID' } [ CronLastExecTime =. now ] lift . lift $ queueDBJob job other -> writeJobCtl other | otherwise -> lift . mapReaderT (liftIO . atomically) $ lift . void . flip swapTMVar newCrontab =<< asks jobCrontab case nextMatch of MatchAsap -> doJob MatchNone -> return () MatchAt nextTime -> do JobContext{jobCrontab} <- ask nextTime' <- applyJitter jobCtl nextTime $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] logFunc <- askLoggerIO whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') doJob return True when cont go where acc :: NominalDiffTime acc = 1e-3 debouncingAcc :: AppSettings -> JobCtl -> NominalDiffTime debouncingAcc AppSettings{appNotificationRateLimit} = \case JobCtlQueue (JobQueueNotification _) -> appNotificationRateLimit _ -> acc applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime applyJitter seed t = do appInstance <- getsYesod appInstanceID let halfRange = truncate $ 0.5 / acc diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) return $ addUTCTime diff t earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab where go' (jobCtl, cron) mbPrev | Just (_, t') <- mbPrev , t' < t = mbPrev | otherwise = Just (jobCtl, t) where t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TMVar a -> a -> UTCTime -> m Bool waitUntil crontabTV crontab nextTime = runResourceT $ do diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc waitTime' | diffT < acc = "Done" | otherwise = tshow (realToFrac waitTime :: NominalDiffTime) $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|] if | diffT < acc -> return True | otherwise -> do retVar <- liftIO newEmptyTMVarIO void . liftIO . forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar let awaitDelayThread = False <$ takeTMVar retVar awaitCrontabChange = do crontab' <- tryReadTMVar crontabTV True <$ guard (Just crontab /= crontab') crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) () handleJobs' foundation wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent $ tshow jctl resVars <- mapReaderT (liftIO . atomically) $ HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err | not sentRes -> $logErrorS logIdent $ tshow err _other -> return () where logIdent = "Jobs #" <> tshow wNum handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) handleCmd (JobCtlQueue job) = lift $ queueJob' job handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content performJob content -- `performJob` is expected to throw an exception if it detects that the job was not done runDB $ delete jId handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab' -- logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ lift . void . flip swapTMVar newCTab =<< asks jobCrontab handleCmd (JobCtlGenerateHealthReport kind) = do hrStorage <- getsYesod appHealthReport newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind $logInfoS (tshow kind) $ toPathPiece newStatus unless (newStatus == HealthSuccess) $ do $logErrorS (tshow kind) $ tshow newReport liftIO $ do now <- getCurrentTime let updateReports = Set.insert (now, newReport) . Set.filter (((/=) `on` classifyHealthReport) newReport . snd) atomically . modifyTVar' hrStorage $ force . updateReports jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False let lock = runDB . setSerializable $ do qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId instanceID' <- getsYesod $ view instanceID threshold <- getsYesod $ view _appJobStaleThreshold now <- liftIO getCurrentTime hadStale <- maybeT (return False) $ do lockTime <- MaybeT $ return queuedJobLockTime lockInstance <- MaybeT $ return queuedJobLockInstance if | lockInstance == instanceID' , diffUTCTime now lockTime >= threshold -> return True | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow qj val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID' , QueuedJobLockTime =. Just now ] liftIO . atomically $ writeTVar hasLock True return val unlock = whenM (liftIO . atomically $ readTVar hasLock) $ runDB . setSerializable $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] bracket lock (const unlock) act pruneLastExecs :: Crontab JobCtl -> DB () pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab where ensureCrontab (Entity leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob , HashMap.member (JobCtlQueue job) crontab = return () | otherwise = delete leId determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab performJob :: Job -> HandlerT UniWorX IO () performJob = $(dispatchTH ''Job)