diff --git a/config/settings.yml b/config/settings.yml index ca8484522..d34af3ad9 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -16,7 +16,9 @@ mail-verp: separator: "+" at-replacement: "=" -job-workers: "_env:JOB_WORKERS:10" +job-workers: "_env:JOB_WORKERS:10" +job-flush-interval: "_env:JOB_FLUSH:30" +job-cron-interval: "_env:CRON_INTERVAL:60" detailed-logging: "_env:DETAILED_LOGGING:false" should-log-all: "_env:LOG_ALL:false" diff --git a/src/Cron.hs b/src/Cron.hs index c37267249..ceb86510e 100644 --- a/src/Cron.hs +++ b/src/Cron.hs @@ -11,6 +11,7 @@ module Cron ( matchesCron , CronNextMatch(..) , nextCronMatch + , module Cron.Types ) where import ClassyPrelude diff --git a/src/Cron/Types.hs b/src/Cron/Types.hs index b1172bde0..d9cfbf67f 100644 --- a/src/Cron/Types.hs +++ b/src/Cron/Types.hs @@ -19,6 +19,8 @@ import Control.Lens import Data.Time import Numeric.Natural + +import Data.HashMap.Strict (HashMap) -- | When the scheduled time for a job falls between two wakeups of the timing @@ -37,7 +39,7 @@ data CronMatch | CronMatchContiguous Natural Natural | CronMatchIntersect CronMatch CronMatch | CronMatchUnion CronMatch CronMatch - deriving (Show, Read) + deriving (Eq, Show, Read) data CronAbsolute = CronAsap @@ -50,7 +52,7 @@ data CronAbsolute , cronDayOfWeek , cronHour, cronMinute, cronSecond :: CronMatch } - deriving (Show, Read) + deriving (Eq, Show, Read) makeLenses_ ''CronAbsolute @@ -58,7 +60,7 @@ data CronPeriod = CronPeriod { cronMinInterval :: NominalDiffTime , cronNext :: CronAbsolute } - deriving (Show) + deriving (Eq, Show) makeLenses_ ''CronPeriod @@ -67,8 +69,8 @@ data Cron = Cron , cronRepeat :: Maybe CronPeriod , cronOffset :: CronScheduleOffset } - deriving (Show) + deriving (Eq, Show) makeLenses_ ''Cron -type Crontab a = Map a Cron +type Crontab a = HashMap a Cron diff --git a/src/Jobs.hs b/src/Jobs.hs index e3cd8f31b..089a4fcab 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -9,6 +9,7 @@ , DeriveDataTypeable , QuasiQuotes , NamedFieldPuns + , MultiWayIf #-} module Jobs @@ -18,7 +19,7 @@ module Jobs , handleJobs ) where -import Import hiding ((.=)) +import Import hiding ((.=), Proxy) import Handler.Utils.Mail import Handler.Utils.DateTime @@ -34,7 +35,7 @@ import qualified Data.Aeson as Aeson import Database.Persist.Sql (executeQQ, fromSqlKey, transactionSave) import Data.Monoid (Last(..)) -import Control.Monad.Trans.Writer (WriterT(..), execWriterT) +import Data.Semigroup (Max(..)) import Utils.Lens @@ -47,6 +48,29 @@ import qualified Data.CaseInsensitive as CI import Text.Shakespeare.Text import Text.Hamlet +import Cron +import qualified Data.HashMap.Strict as HashMap +import Data.HashMap.Strict (HashMap) + +import Data.List.NonEmpty (NonEmpty, nonEmpty) +import qualified Data.List.NonEmpty as NonEmpty + +import Data.Foldable (foldrM) + +import Control.Monad.Trans.Reader (mapReaderT) +import Control.Monad.Trans.Writer (WriterT, execWriterT) +import Control.Monad.Trans.State (StateT, evalStateT, mapStateT) +import qualified Control.Monad.State.Class as State +import Control.Monad.Writer.Class (MonadWriter(..)) +import Control.Monad.Reader.Class (MonadReader(..)) + +import Control.Monad.Random (MonadRandom(..), evalRand) + +import Data.Time.Clock +import Data.Time.Zones + +import Control.Concurrent.STM (retry) + data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime @@ -54,21 +78,134 @@ data JobQueueException = JInvalid QueuedJobId QueuedJob deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException - + handleJobs :: MonadIO m => [TMChan JobCtl] -> UniWorX -> m () -- | Read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... -handleJobs recvChans foundation@UniWorX{..} = liftIO . forM_ (zip [1..] recvChans) $ \(n, chan) -> let - logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" - logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" - in void . fork . unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan chan .| handleJobs' n - +handleJobs recvChans foundation@UniWorX{..} = liftIO $ do + jobCrontab <- newTVarIO HashMap.empty + jobConfirm <- newTVarIO HashMap.empty -handleJobs' :: Int -> Sink JobCtl Handler () -handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . handleCmd + forM_ (zip [1..] recvChans) $ \(n, chan) -> + let + logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" + logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" + in void . fork . unsafeHandler foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' n + + -- Start cron operation + void . fork . unsafeHandler foundation $ runReaderT execCrontab JobContext{..} + unsafeHandler foundation . flip runReaderT JobContext{..} $ + writeJobCtlBlock JobCtlDetermineCrontab + + +execCrontab :: ReaderT JobContext (HandlerT UniWorX IO) () +-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have +-- seen, wait for the time of the next job and fire it +execCrontab = flip evalStateT HashMap.empty . forever $ do + now <- liftIO getCurrentTime + (currentCrontab, (jobCtl, nextMatch)) <- mapStateT (mapReaderT $ liftIO . atomically) $ do + crontab <- liftBase . readTVar =<< asks jobCrontab + prevExec <- State.get + case earliestJob prevExec crontab now of + Nothing -> liftBase retry + Just (_, MatchNone) -> liftBase retry + Just x -> return (crontab, x) + + let doJob = do + now <- liftIO $ getCurrentTime + State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl + writeJobCtl jobCtl + + case nextMatch of + MatchAsap -> doJob + MatchNone -> return () + MatchAt nextTime -> do + JobContext{jobCrontab} <- ask + nextTime' <- applyJitter jobCtl nextTime + whenM (liftIO $ waitUntil jobCrontab currentCrontab nextTime') + doJob + where + acc :: NominalDiffTime + acc = 1e-3 + + applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime + applyJitter seed t = do + appInstance <- getsYesod appInstanceID + let + halfRange = floor $ 0.5 / acc + diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed) + return $ addUTCTime diff t + + earliestJob :: HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime) + earliestJob lastTimes crontab now = foldr go Nothing $ HashMap.toList crontab + where + go (jobCtl, cron) mbPrev + | Just (_, t') <- mbPrev + , t' < t + = mbPrev + | otherwise + = Just (jobCtl, t) + where + t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) now cron + + waitUntil :: (Eq a, MonadIO m) => TVar a -> a -> UTCTime -> m Bool + waitUntil crontabTV crontab nextTime = liftIO $ do + diffT <- diffUTCTime nextTime <$> getCurrentTime + if + | diffT < acc -> return True + | otherwise -> do + retVar <- newEmptyTMVarIO + delayThread <- forkFinally (threadDelay . floor $ toRational acc * 1e6) (atomically . putTMVar retVar) + let + awaitDelayThread = False <$ takeTMVar retVar + awaitCrontabChange = do + crontab' <- readTVar crontabTV + True <$ guard (crontab /= crontab') + crontabChanged <- atomically $ awaitCrontabChange <|> awaitDelayThread + bool (waitUntil crontabTV crontab nextTime) (False <$ killThread delayThread) crontabChanged + + +determineCrontab :: Handler (Crontab JobCtl) +determineCrontab = execWriterT $ do + AppSettings{..} <- getsYesod appSettings + + case appJobFlushInterval of + Just interval -> tell $ HashMap.singleton + JobCtlFlush + Cron + { cronInitial = CronAsap + , cronRepeat = Just CronPeriod + { cronMinInterval = interval + , cronNext = CronAsap + } + , cronOffset = CronScheduleBefore + } + Nothing -> return () + + now <- liftIO getCurrentTime + tell $ HashMap.singleton + JobCtlDetermineCrontab + Cron + { cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appJobCronInterval now + , cronRepeat = Nothing + , cronOffset = CronScheduleBefore + } + + +handleJobs' :: Int -> Sink JobCtl (ReaderT JobContext Handler) () +handleJobs' wNum = C.mapM_ $ \jctl -> do + $logDebugS logIdent $ tshow jctl + resVars <- mapReaderT (liftIO . atomically) $ + HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) + res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl + sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) + case res of + Just err + | not sentRes -> $logErrorS logIdent $ tshow err + _other -> return () where logIdent = "Jobs #" <> tshow wNum @@ -76,9 +213,10 @@ handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . ha handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) - - handleCmd JobCtlFlush = void . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) - handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do + + handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) + handleCmd (JobCtlQueue job) = lift $ queueJob' job + handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do @@ -91,6 +229,11 @@ handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . ha -- `performJob` is expected to throw a notification if it detects that the job was not done runDB $ delete jId + handleCmd JobCtlDetermineCrontab = do + newCTab <- lift determineCrontab + $logDebugS logIdent $ tshow newCTab + mapReaderT (liftIO . atomically) $ + lift . flip writeTVar newCTab =<< asks jobCrontab jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do @@ -122,6 +265,19 @@ writeJobCtl cmd = do chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl liftIO . atomically $ writeTMChan chan cmd +writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m () +writeJobCtlBlock cmd = do + getResVar <- asks jobConfirm + resVar <- liftIO . atomically $ do + var <- newEmptyTMVar + modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) + return var + lift $ writeJobCtl cmd + let + removeResVar = HashMap.update (nonEmpty . NonEmpty.filter (/= resVar)) cmd + mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar + maybe (return ()) throwM mExc + queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId queueJobUnsafe job = do now <- liftIO getCurrentTime diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 099a3d67d..99cfee4cd 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -7,6 +7,7 @@ module Jobs.Types ( Job(..), Notification(..) , JobCtl(..) + , JobContext(..) ) where import Import.NoFoundation @@ -14,6 +15,8 @@ import Import.NoFoundation import Data.Aeson (defaultOptions, Options(..), SumEncoding(..)) import Data.Aeson.TH (deriveJSON) +import Data.List.NonEmpty (NonEmpty) + data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } | JobSendTestEmail { jEmail :: Text, jMailContext :: MailContext } @@ -42,6 +45,14 @@ deriveJSON defaultOptions data JobCtl = JobCtlFlush | JobCtlPerform QueuedJobId + | JobCtlDetermineCrontab + | JobCtlQueue Job deriving (Eq, Ord, Read, Show, Generic, Typeable) instance Hashable JobCtl + + +data JobContext = JobContext + { jobCrontab :: TVar (Crontab JobCtl) + , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) + } diff --git a/src/Settings.hs b/src/Settings.hs index 56efc87a6..2d48e2643 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -83,6 +83,8 @@ data AppSettings = AppSettings , appMailObjectDomain :: Text , appMailVerp :: VerpMode , appJobWorkers :: Int + , appJobFlushInterval :: Maybe NominalDiffTime + , appJobCronInterval :: NominalDiffTime , appDetailedRequestLogging :: Bool -- ^ Use detailed request logging system @@ -271,7 +273,9 @@ instance FromJSON AppSettings where appMailObjectDomain <- o .: "mail-object-domain" appMailVerp <- o .: "mail-verp" - appJobWorkers <- o .: "job-workers" + appJobWorkers <- o .: "job-workers" + appJobFlushInterval <- o .:? "job-flush-interval" + appJobCronInterval <- o .: "job-cron-interval" appDetailedRequestLogging <- o .:? "detailed-logging" .!= defaultDev appShouldLogAll <- o .:? "should-log-all" .!= defaultDev