Implement Cron

This commit is contained in:
Gregor Kleen 2018-10-12 23:37:16 +02:00
parent 9183ff9aa4
commit 7bdf015560
6 changed files with 196 additions and 20 deletions

View File

@ -16,7 +16,9 @@ mail-verp:
separator: "+"
at-replacement: "="
job-workers: "_env:JOB_WORKERS:10"
job-workers: "_env:JOB_WORKERS:10"
job-flush-interval: "_env:JOB_FLUSH:30"
job-cron-interval: "_env:CRON_INTERVAL:60"
detailed-logging: "_env:DETAILED_LOGGING:false"
should-log-all: "_env:LOG_ALL:false"

View File

@ -11,6 +11,7 @@ module Cron
( matchesCron
, CronNextMatch(..)
, nextCronMatch
, module Cron.Types
) where
import ClassyPrelude

View File

@ -19,6 +19,8 @@ import Control.Lens
import Data.Time
import Numeric.Natural
import Data.HashMap.Strict (HashMap)
-- | When the scheduled time for a job falls between two wakeups of the timing
@ -37,7 +39,7 @@ data CronMatch
| CronMatchContiguous Natural Natural
| CronMatchIntersect CronMatch CronMatch
| CronMatchUnion CronMatch CronMatch
deriving (Show, Read)
deriving (Eq, Show, Read)
data CronAbsolute
= CronAsap
@ -50,7 +52,7 @@ data CronAbsolute
, cronDayOfWeek
, cronHour, cronMinute, cronSecond :: CronMatch
}
deriving (Show, Read)
deriving (Eq, Show, Read)
makeLenses_ ''CronAbsolute
@ -58,7 +60,7 @@ data CronPeriod = CronPeriod
{ cronMinInterval :: NominalDiffTime
, cronNext :: CronAbsolute
}
deriving (Show)
deriving (Eq, Show)
makeLenses_ ''CronPeriod
@ -67,8 +69,8 @@ data Cron = Cron
, cronRepeat :: Maybe CronPeriod
, cronOffset :: CronScheduleOffset
}
deriving (Show)
deriving (Eq, Show)
makeLenses_ ''Cron
type Crontab a = Map a Cron
type Crontab a = HashMap a Cron

View File

@ -9,6 +9,7 @@
, DeriveDataTypeable
, QuasiQuotes
, NamedFieldPuns
, MultiWayIf
#-}
module Jobs
@ -18,7 +19,7 @@ module Jobs
, handleJobs
) where
import Import hiding ((.=))
import Import hiding ((.=), Proxy)
import Handler.Utils.Mail
import Handler.Utils.DateTime
@ -34,7 +35,7 @@ import qualified Data.Aeson as Aeson
import Database.Persist.Sql (executeQQ, fromSqlKey, transactionSave)
import Data.Monoid (Last(..))
import Control.Monad.Trans.Writer (WriterT(..), execWriterT)
import Data.Semigroup (Max(..))
import Utils.Lens
@ -47,6 +48,29 @@ import qualified Data.CaseInsensitive as CI
import Text.Shakespeare.Text
import Text.Hamlet
import Cron
import qualified Data.HashMap.Strict as HashMap
import Data.HashMap.Strict (HashMap)
import Data.List.NonEmpty (NonEmpty, nonEmpty)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Foldable (foldrM)
import Control.Monad.Trans.Reader (mapReaderT)
import Control.Monad.Trans.Writer (WriterT, execWriterT)
import Control.Monad.Trans.State (StateT, evalStateT, mapStateT)
import qualified Control.Monad.State.Class as State
import Control.Monad.Writer.Class (MonadWriter(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Random (MonadRandom(..), evalRand)
import Data.Time.Clock
import Data.Time.Zones
import Control.Concurrent.STM (retry)
data JobQueueException = JInvalid QueuedJobId QueuedJob
| JLocked QueuedJobId InstanceId UTCTime
@ -54,21 +78,134 @@ data JobQueueException = JInvalid QueuedJobId QueuedJob
deriving (Read, Show, Eq, Generic, Typeable)
instance Exception JobQueueException
handleJobs :: MonadIO m => [TMChan JobCtl] -> UniWorX -> m ()
-- | Read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs recvChans foundation@UniWorX{..} = liftIO . forM_ (zip [1..] recvChans) $ \(n, chan) -> let
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
in void . fork . unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan chan .| handleJobs' n
handleJobs recvChans foundation@UniWorX{..} = liftIO $ do
jobCrontab <- newTVarIO HashMap.empty
jobConfirm <- newTVarIO HashMap.empty
handleJobs' :: Int -> Sink JobCtl Handler ()
handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . handleCmd
forM_ (zip [1..] recvChans) $ \(n, chan) ->
let
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
in void . fork . unsafeHandler foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' n
-- Start cron operation
void . fork . unsafeHandler foundation $ runReaderT execCrontab JobContext{..}
unsafeHandler foundation . flip runReaderT JobContext{..} $
writeJobCtlBlock JobCtlDetermineCrontab
execCrontab :: ReaderT JobContext (HandlerT UniWorX IO) ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab = flip evalStateT HashMap.empty . forever $ do
now <- liftIO getCurrentTime
(currentCrontab, (jobCtl, nextMatch)) <- mapStateT (mapReaderT $ liftIO . atomically) $ do
crontab <- liftBase . readTVar =<< asks jobCrontab
prevExec <- State.get
case earliestJob prevExec crontab now of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
let doJob = do
now <- liftIO $ getCurrentTime
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
writeJobCtl jobCtl
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
whenM (liftIO $ waitUntil jobCrontab currentCrontab nextTime')
doJob
where
acc :: NominalDiffTime
acc = 1e-3
applyJitter :: (MonadHandler m, HandlerSite m ~ UniWorX, Hashable seed) => seed -> UTCTime -> m UTCTime
applyJitter seed t = do
appInstance <- getsYesod appInstanceID
let
halfRange = floor $ 0.5 / acc
diff = evalRand ( (* acc) . fromInteger <$> getRandomR (- halfRange, halfRange)) $ mkStdGen (hash appInstance `hashWithSalt` seed)
return $ addUTCTime diff t
earliestJob :: HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob lastTimes crontab now = foldr go Nothing $ HashMap.toList crontab
where
go (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) now cron
waitUntil :: (Eq a, MonadIO m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = liftIO $ do
diffT <- diffUTCTime nextTime <$> getCurrentTime
if
| diffT < acc -> return True
| otherwise -> do
retVar <- newEmptyTMVarIO
delayThread <- forkFinally (threadDelay . floor $ toRational acc * 1e6) (atomically . putTMVar retVar)
let
awaitDelayThread = False <$ takeTMVar retVar
awaitCrontabChange = do
crontab' <- readTVar crontabTV
True <$ guard (crontab /= crontab')
crontabChanged <- atomically $ awaitCrontabChange <|> awaitDelayThread
bool (waitUntil crontabTV crontab nextTime) (False <$ killThread delayThread) crontabChanged
determineCrontab :: Handler (Crontab JobCtl)
determineCrontab = execWriterT $ do
AppSettings{..} <- getsYesod appSettings
case appJobFlushInterval of
Just interval -> tell $ HashMap.singleton
JobCtlFlush
Cron
{ cronInitial = CronAsap
, cronRepeat = Just CronPeriod
{ cronMinInterval = interval
, cronNext = CronAsap
}
, cronOffset = CronScheduleBefore
}
Nothing -> return ()
now <- liftIO getCurrentTime
tell $ HashMap.singleton
JobCtlDetermineCrontab
Cron
{ cronInitial = CronTimestamp . utcToLocalTimeTZ appTZ $ addUTCTime appJobCronInterval now
, cronRepeat = Nothing
, cronOffset = CronScheduleBefore
}
handleJobs' :: Int -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
resVars <- mapReaderT (liftIO . atomically) $
HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err
| not sentRes -> $logErrorS logIdent $ tshow err
_other -> return ()
where
logIdent = "Jobs #" <> tshow wNum
@ -76,9 +213,10 @@ handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . ha
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlFlush = void . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of
Aeson.Success c -> return c
Aeson.Error t -> do
@ -91,6 +229,11 @@ handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . ha
-- `performJob` is expected to throw a notification if it detects that the job was not done
runDB $ delete jId
handleCmd JobCtlDetermineCrontab = do
newCTab <- lift determineCrontab
$logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . flip writeTVar newCTab =<< asks jobCrontab
jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a
jLocked jId act = do
@ -122,6 +265,19 @@ writeJobCtl cmd = do
chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl
liftIO . atomically $ writeTMChan chan cmd
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
writeJobCtlBlock cmd = do
getResVar <- asks jobConfirm
resVar <- liftIO . atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
lift $ writeJobCtl cmd
let
removeResVar = HashMap.update (nonEmpty . NonEmpty.filter (/= resVar)) cmd
mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe job = do
now <- liftIO getCurrentTime

View File

@ -7,6 +7,7 @@
module Jobs.Types
( Job(..), Notification(..)
, JobCtl(..)
, JobContext(..)
) where
import Import.NoFoundation
@ -14,6 +15,8 @@ import Import.NoFoundation
import Data.Aeson (defaultOptions, Options(..), SumEncoding(..))
import Data.Aeson.TH (deriveJSON)
import Data.List.NonEmpty (NonEmpty)
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Text, jMailContext :: MailContext }
@ -42,6 +45,14 @@ deriveJSON defaultOptions
data JobCtl = JobCtlFlush
| JobCtlPerform QueuedJobId
| JobCtlDetermineCrontab
| JobCtlQueue Job
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl
data JobContext = JobContext
{ jobCrontab :: TVar (Crontab JobCtl)
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
}

View File

@ -83,6 +83,8 @@ data AppSettings = AppSettings
, appMailObjectDomain :: Text
, appMailVerp :: VerpMode
, appJobWorkers :: Int
, appJobFlushInterval :: Maybe NominalDiffTime
, appJobCronInterval :: NominalDiffTime
, appDetailedRequestLogging :: Bool
-- ^ Use detailed request logging system
@ -271,7 +273,9 @@ instance FromJSON AppSettings where
appMailObjectDomain <- o .: "mail-object-domain"
appMailVerp <- o .: "mail-verp"
appJobWorkers <- o .: "job-workers"
appJobWorkers <- o .: "job-workers"
appJobFlushInterval <- o .:? "job-flush-interval"
appJobCronInterval <- o .: "job-cron-interval"
appDetailedRequestLogging <- o .:? "detailed-logging" .!= defaultDev
appShouldLogAll <- o .:? "should-log-all" .!= defaultDev