module Jobs ( module Types , module Jobs.Queue , handleJobs ) where import Import import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Types (JobCtl(JobCtlQueue)) import Jobs.Queue import Jobs.Crontab import Data.Conduit.TMChan import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT import Data.Aeson (fromJSON, toJSON) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson import Database.Persist.Sql (fromSqlKey) import Data.Semigroup (Max(..)) import Utils.Sql import Control.Monad.Random (evalRand, mkStdGen) import Cron import qualified Data.HashMap.Strict as HashMap import Data.HashMap.Strict (HashMap) import qualified Data.List.NonEmpty as NonEmpty import Data.Foldable (foldrM) import Control.Monad.Trans.Reader (mapReaderT) import Control.Monad.Trans.State (StateT, evalStateT, mapStateT) import qualified Control.Monad.State.Class as State import Control.Monad.Reader.Class (MonadReader(..)) import Control.Monad.Trans.Resource (MonadResourceBase, ResourceT, runResourceT, allocate) import Control.Monad.Trans.Maybe (MaybeT(..)) import Control.Monad.Logger import Control.Monad.Random (MonadRandom(..), evalRand) import Data.Time.Clock import Data.Time.Zones import Control.Concurrent.STM (retry) import Jobs.Handler.SendNotification import Jobs.Handler.SendTestEmail import Jobs.Handler.QueueNotification import Jobs.Handler.HelpRequest import Jobs.Handler.SetLogSettings data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException handleJobs :: (MonadResource m, MonadIO m) => [TMChan JobCtl] -> UniWorX -> m () -- | Read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs recvChans foundation@UniWorX{..} = do jobCrontab <- liftIO $ newTVarIO HashMap.empty jobConfirm <- liftIO $ newTVarIO HashMap.empty forM_ (zip [1..] recvChans) $ \(n, chan) -> let logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" doFork = fork . unsafeHandler foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' n in void $ allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan) -- Start cron operation void $ allocate (liftIO . fork . unsafeHandler foundation $ runReaderT execCrontab JobContext{..}) (liftIO . killThread) liftIO . unsafeHandler foundation . flip runReaderT JobContext{..} $ writeJobCtlBlock JobCtlDetermineCrontab execCrontab :: ReaderT JobContext (HandlerT UniWorX IO) () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it execCrontab = flip evalStateT HashMap.empty . forever $ do mapStateT (liftHandlerT . runDB . setSerializable) $ do let merge (Entity leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) | otherwise = lift $ delete leId runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge now <- liftIO getCurrentTime (currentCrontab, (jobCtl, nextMatch)) <- mapStateT (mapReaderT $ liftIO . atomically) $ do crontab <- liftBase . readTVar =<< asks jobCrontab State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab prevExec <- State.get case earliestJob prevExec crontab now of Nothing -> liftBase retry Just (_, MatchNone) -> liftBase retry Just x -> return (crontab, x) let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do newCrontab <- lift . lift . hoist lift $ determineCrontab' if | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab -> do now <- liftIO $ getCurrentTime instanceID <- getsYesod appInstanceID State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl case jobCtl of JobCtlQueue job -> do lift . lift $ upsertBy (UniqueCronLastExec $ toJSON job) CronLastExec { cronLastExecJob = toJSON job , cronLastExecTime = now , cronLastExecInstance = instanceID } [ CronLastExecTime =. now ] lift . lift $ queueDBJob job other -> writeJobCtl other | otherwise -> lift . mapReaderT (liftIO . atomically) $ lift . flip writeTVar newCrontab =<< asks jobCrontab case nextMatch of MatchAsap -> doJob MatchNone -> return () MatchAt nextTime -> do JobContext{jobCrontab} <- ask nextTime' <- applyJitter jobCtl nextTime $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] logFunc <- askLoggerIO whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') doJob where acc :: NominalDiffTime acc = 1e-3 applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime applyJitter seed t = do appInstance <- getsYesod appInstanceID let halfRange = truncate $ 0.5 / acc diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) return $ addUTCTime diff t earliestJob :: HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) earliestJob lastTimes crontab now = foldr go Nothing $ HashMap.toList crontab where go (jobCtl, cron) mbPrev | Just (_, t') <- mbPrev , t' < t = mbPrev | otherwise = Just (jobCtl, t) where t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) now cron waitUntil :: (Eq a, MonadResourceBase m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool waitUntil crontabTV crontab nextTime = runResourceT $ do diffT <- diffUTCTime nextTime <$> liftIO getCurrentTime let waitTime = fromInteger (truncate $ diffT / acc) * toRational acc waitTime' | diffT < acc = "Done" | otherwise = tshow (realToFrac waitTime :: NominalDiffTime) $logDebugS "waitUntil" [st|#{tshow diffT} (#{waitTime'})|] if | diffT < acc -> return True | otherwise -> do retVar <- liftIO newEmptyTMVarIO void $ allocate (liftIO $ forkFinally (threadDelay . round $ waitTime * 1e6) $ atomically . putTMVar retVar) (liftIO . killThread) let awaitDelayThread = False <$ takeTMVar retVar awaitCrontabChange = do crontab' <- readTVar crontabTV True <$ guard (crontab /= crontab') crontabChanged <- liftIO . atomically $ awaitCrontabChange <|> awaitDelayThread bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged handleJobs' :: Int -> Sink JobCtl (ReaderT JobContext Handler) () handleJobs' wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent $ tshow jctl resVars <- mapReaderT (liftIO . atomically) $ HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err | not sentRes -> $logErrorS logIdent $ tshow err _other -> return () where logIdent = "Jobs #" <> tshow wNum handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) handleCmd (JobCtlQueue job) = lift $ queueJob' job handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content performJob content -- `performJob` is expected to throw an exception if it detects that the job was not done runDB $ delete jId handleCmd JobCtlDetermineCrontab = do newCTab <- liftHandlerT . runDB $ setSerializable determineCrontab' -- $logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ lift . flip writeTVar newCTab =<< asks jobCrontab jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False let lock = runDB . setSerializable $ do qj@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId instanceID <- getsYesod appInstanceID threshold <- getsYesod $ appJobStaleThreshold . appSettings now <- liftIO getCurrentTime hadStale <- maybeT (return False) $ do lockTime <- MaybeT $ return queuedJobLockTime lockInstance <- MaybeT $ return queuedJobLockInstance if | lockInstance == instanceID , diffUTCTime now lockTime >= threshold -> return True | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow qj val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID , QueuedJobLockTime =. Just now ] liftIO . atomically $ writeTVar hasLock True return val unlock = whenM (liftIO . atomically $ readTVar hasLock) $ runDB . setSerializable $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] bracket lock (const unlock) act pruneLastExecs :: Crontab JobCtl -> DB () pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCrontab where ensureCrontab (Entity leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob , HashMap.member (JobCtlQueue job) crontab = return () | otherwise = delete leId determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab performJob :: Job -> HandlerT UniWorX IO () performJob = $(dispatchTH ''Job)