Make calls to unsafeHandler shorter lived

This commit is contained in:
Gregor Kleen 2019-05-30 23:37:48 +02:00
parent 5cfe4e049f
commit 98d76e30ea
3 changed files with 80 additions and 71 deletions

View File

@ -36,6 +36,8 @@ import System.Log.FastLogger ( defaultBufSize, newStderrLoggerSet
, toLogStr, rmLoggerSet
)
import Handler.Utils (runAppLoggingT)
import qualified Data.Map.Strict as Map
import Foreign.Store
@ -222,13 +224,6 @@ makeFoundation appSettings'@AppSettings{..} = do
$logDebugS "setup" "Done"
return foundation
runAppLoggingT :: UniWorX -> LoggingT m a -> m a
runAppLoggingT app@(appLogger -> (_, loggerTVar)) = flip runLoggingT logFunc
where
logFunc loc src lvl str = do
f <- messageLoggerSource app <$> readTVarIO loggerTVar
f loc src lvl str
clusterSetting :: forall key m p.
( MonadIO m
, ClusterSetting key

View File

@ -37,6 +37,8 @@ import System.FilePath.Posix (takeBaseName, takeFileName)
import qualified Data.List as List
import qualified Data.List.NonEmpty as NonEmpty
import Control.Monad.Logger
-- | Check whether the user's preference for files is inline-viewing or downloading
downloadFiles :: (MonadHandler m, HandlerSite m ~ UniWorX) => m Bool
@ -247,3 +249,12 @@ guardAuthorizedFor :: ( HandlerSite h ~ UniWorX, MonadHandler h, MonadLogger h
=> Route UniWorX -> a -> m (ReaderT SqlBackend h) a
guardAuthorizedFor link val =
val <$ guardM (lift $ (== Authorized) <$> evalAccessDB link False)
runAppLoggingT :: UniWorX -> LoggingT m a -> m a
runAppLoggingT app@(appLogger -> (_, loggerTVar)) = flip runLoggingT logFunc
where
logFunc loc src lvl str = do
f <- messageLoggerSource app <$> readTVarIO loggerTVar
f loc src lvl str

View File

@ -7,6 +7,7 @@ module Jobs
import Import
import Utils.Lens
import Handler.Utils
import Jobs.Types as Types hiding (JobCtl(JobCtlQueue))
import Jobs.Types (JobCtl(JobCtlQueue))
@ -93,7 +94,7 @@ handleJobs foundation@UniWorX{..} = do
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId
doFork = flip forkFinally (\_ -> removeChan) . unsafeHandler foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' n
doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n
(_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan)
atomically . modifyTVar' appJobCtl $ Map.insert tId bChan
@ -101,7 +102,7 @@ handleJobs foundation@UniWorX{..} = do
when (num > 0) $ do
registeredCron <- liftIO newEmptyTMVarIO
let execCrontab' = whenM (atomically $ readTMVar registeredCron) $
unsafeHandler foundation $ runReaderT execCrontab JobContext{..}
runReaderT (execCrontab foundation) JobContext{..}
unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread
cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab)
registeredCron' <- atomically $ do
@ -126,73 +127,75 @@ stopJobCtl UniWorX{appJobCtl, appCronThread} = do
guard . none (`Map.member` wMap') $ Map.keysSet wMap
execCrontab :: ReaderT JobContext (HandlerT UniWorX IO) ()
execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m ()
-- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have
-- seen, wait for the time of the next job and fire it
execCrontab = evalStateT go HashMap.empty
execCrontab foundation = evalStateT go HashMap.empty
where
go = do
mapStateT (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do
mapStateT (liftHandlerT . runDB . setSerializable) $ do
let
merge (Entity leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do
crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab
case crontab' of
Nothing -> return Nothing
Just crontab -> Just <$> do
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do
crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab
case crontab' of
Nothing -> return Nothing
Just crontab -> Just <$> do
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
prevExec <- State.get
case earliestJob settings prevExec crontab refT of
Nothing -> liftBase retry
Just (_, MatchNone) -> liftBase retry
Just x -> return (crontab, x)
case currentState of
Nothing -> return ()
Just (currentCrontab, (jobCtl, nextMatch)) -> do
let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do
newCrontab <- lift . lift . hoist lift $ determineCrontab'
if
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID' <- getsYesod appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
void . lift . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift . lift $ queueDBJob job
other -> writeJobCtl other
| otherwise
-> lift . mapReaderT (liftIO . atomically) $
lift . void . flip swapTMVar newCrontab =<< asks jobCrontab
case currentState of
Nothing -> return False
Just (currentCrontab, (jobCtl, nextMatch)) -> do
let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do
newCrontab <- lift . lift . hoist lift $ determineCrontab'
if
| ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab
-> do
now <- liftIO $ getCurrentTime
instanceID' <- getsYesod appInstanceID
State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl
case jobCtl of
JobCtlQueue job -> do
void . lift . lift $ upsertBy
(UniqueCronLastExec $ toJSON job)
CronLastExec
{ cronLastExecJob = toJSON job
, cronLastExecTime = now
, cronLastExecInstance = instanceID'
}
[ CronLastExecTime =. now ]
lift . lift $ queueDBJob job
other -> writeJobCtl other
| otherwise
-> lift . mapReaderT (liftIO . atomically) $
lift . void . flip swapTMVar newCrontab =<< asks jobCrontab
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
case nextMatch of
MatchAsap -> doJob
MatchNone -> return ()
MatchAt nextTime -> do
JobContext{jobCrontab} <- ask
nextTime' <- applyJitter jobCtl nextTime
$logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|]
logFunc <- askLoggerIO
whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime')
doJob
go
return True
when cont go
where
acc :: NominalDiffTime
acc = 1e-3
@ -244,12 +247,12 @@ execCrontab = evalStateT go HashMap.empty
bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged
handleJobs' :: Natural -> Sink JobCtl (ReaderT JobContext Handler) ()
handleJobs' wNum = C.mapM_ $ \jctl -> do
handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) ()
handleJobs' foundation wNum = C.mapM_ $ \jctl -> do
$logDebugS logIdent $ tshow jctl
resVars <- mapReaderT (liftIO . atomically) $
HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl
res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl
sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
case res of
Just err