diff --git a/src/Application.hs b/src/Application.hs index bf7927e51..ab612883c 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -36,6 +36,8 @@ import System.Log.FastLogger ( defaultBufSize, newStderrLoggerSet , toLogStr, rmLoggerSet ) +import Handler.Utils (runAppLoggingT) + import qualified Data.Map.Strict as Map import Foreign.Store @@ -222,13 +224,6 @@ makeFoundation appSettings'@AppSettings{..} = do $logDebugS "setup" "Done" return foundation -runAppLoggingT :: UniWorX -> LoggingT m a -> m a -runAppLoggingT app@(appLogger -> (_, loggerTVar)) = flip runLoggingT logFunc - where - logFunc loc src lvl str = do - f <- messageLoggerSource app <$> readTVarIO loggerTVar - f loc src lvl str - clusterSetting :: forall key m p. ( MonadIO m , ClusterSetting key diff --git a/src/Handler/Utils.hs b/src/Handler/Utils.hs index 0384c83e5..8877dc8de 100644 --- a/src/Handler/Utils.hs +++ b/src/Handler/Utils.hs @@ -37,6 +37,8 @@ import System.FilePath.Posix (takeBaseName, takeFileName) import qualified Data.List as List import qualified Data.List.NonEmpty as NonEmpty +import Control.Monad.Logger + -- | Check whether the user's preference for files is inline-viewing or downloading downloadFiles :: (MonadHandler m, HandlerSite m ~ UniWorX) => m Bool @@ -247,3 +249,12 @@ guardAuthorizedFor :: ( HandlerSite h ~ UniWorX, MonadHandler h, MonadLogger h => Route UniWorX -> a -> m (ReaderT SqlBackend h) a guardAuthorizedFor link val = val <$ guardM (lift $ (== Authorized) <$> evalAccessDB link False) + + +runAppLoggingT :: UniWorX -> LoggingT m a -> m a +runAppLoggingT app@(appLogger -> (_, loggerTVar)) = flip runLoggingT logFunc + where + logFunc loc src lvl str = do + f <- messageLoggerSource app <$> readTVarIO loggerTVar + f loc src lvl str + diff --git a/src/Jobs.hs b/src/Jobs.hs index 5ba9f1fa4..867718bab 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -7,6 +7,7 @@ module Jobs import Import import Utils.Lens +import Handler.Utils import Jobs.Types as Types hiding (JobCtl(JobCtlQueue)) import Jobs.Types (JobCtl(JobCtlQueue)) @@ -93,7 +94,7 @@ handleJobs foundation@UniWorX{..} = do logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" removeChan = atomically . modifyTVar' appJobCtl . Map.delete =<< myThreadId - doFork = flip forkFinally (\_ -> removeChan) . unsafeHandler foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' n + doFork = flip forkFinally (\_ -> removeChan) . runAppLoggingT foundation . bracket_ logStart logStop . flip runReaderT JobContext{..} . runConduit $ sourceTMChan chan .| handleJobs' foundation n (_, tId) <- allocate (liftIO doFork) (\_ -> liftIO . atomically $ closeTMChan chan) atomically . modifyTVar' appJobCtl $ Map.insert tId bChan @@ -101,7 +102,7 @@ handleJobs foundation@UniWorX{..} = do when (num > 0) $ do registeredCron <- liftIO newEmptyTMVarIO let execCrontab' = whenM (atomically $ readTMVar registeredCron) $ - unsafeHandler foundation $ runReaderT execCrontab JobContext{..} + runReaderT (execCrontab foundation) JobContext{..} unregister = atomically . whenM (fromMaybe False <$> tryReadTMVar registeredCron) . void $ tryTakeTMVar appCronThread cData <- allocate (liftIO . forkFinally execCrontab' $ \_ -> unregister) (\_ -> liftIO . atomically . void $ tryTakeTMVar jobCrontab) registeredCron' <- atomically $ do @@ -126,73 +127,75 @@ stopJobCtl UniWorX{appJobCtl, appCronThread} = do guard . none (`Map.member` wMap') $ Map.keysSet wMap -execCrontab :: ReaderT JobContext (HandlerT UniWorX IO) () +execCrontab :: MonadIO m => UniWorX -> ReaderT JobContext m () -- ^ Keeping a `HashMap` of the latest execution times of `JobCtl`s we have -- seen, wait for the time of the next job and fire it -execCrontab = evalStateT go HashMap.empty +execCrontab foundation = evalStateT go HashMap.empty where go = do - mapStateT (liftHandlerT . runDB . setSerializable) $ do - let - merge (Entity leId CronLastExec{..}) - | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob - = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) - | otherwise = lift $ delete leId - runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge + cont <- mapStateT (mapReaderT $ liftIO . unsafeHandler foundation) $ do + mapStateT (liftHandlerT . runDB . setSerializable) $ do + let + merge (Entity leId CronLastExec{..}) + | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob + = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) + | otherwise = lift $ delete leId + runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ merge - refT <- liftIO getCurrentTime - settings <- getsYesod appSettings' - currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do - crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab - case crontab' of - Nothing -> return Nothing - Just crontab -> Just <$> do - State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab - prevExec <- State.get - case earliestJob settings prevExec crontab refT of - Nothing -> liftBase retry - Just (_, MatchNone) -> liftBase retry - Just x -> return (crontab, x) + refT <- liftIO getCurrentTime + settings <- getsYesod appSettings' + currentState <- mapStateT (mapReaderT $ liftIO . atomically) $ do + crontab' <- liftBase . tryReadTMVar =<< asks jobCrontab + case crontab' of + Nothing -> return Nothing + Just crontab -> Just <$> do + State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab + prevExec <- State.get + case earliestJob settings prevExec crontab refT of + Nothing -> liftBase retry + Just (_, MatchNone) -> liftBase retry + Just x -> return (crontab, x) - case currentState of - Nothing -> return () - Just (currentCrontab, (jobCtl, nextMatch)) -> do - let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do - newCrontab <- lift . lift . hoist lift $ determineCrontab' - if - | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab - -> do - now <- liftIO $ getCurrentTime - instanceID' <- getsYesod appInstanceID - State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl - case jobCtl of - JobCtlQueue job -> do - void . lift . lift $ upsertBy - (UniqueCronLastExec $ toJSON job) - CronLastExec - { cronLastExecJob = toJSON job - , cronLastExecTime = now - , cronLastExecInstance = instanceID' - } - [ CronLastExecTime =. now ] - lift . lift $ queueDBJob job - other -> writeJobCtl other - | otherwise - -> lift . mapReaderT (liftIO . atomically) $ - lift . void . flip swapTMVar newCrontab =<< asks jobCrontab + case currentState of + Nothing -> return False + Just (currentCrontab, (jobCtl, nextMatch)) -> do + let doJob = mapStateT (mapReaderT $ liftHandlerT . runDBJobs . setSerializable) $ do + newCrontab <- lift . lift . hoist lift $ determineCrontab' + if + | ((==) `on` HashMap.lookup jobCtl) newCrontab currentCrontab + -> do + now <- liftIO $ getCurrentTime + instanceID' <- getsYesod appInstanceID + State.modify $ HashMap.alter (Just . ($ Max now) . maybe id (<>)) jobCtl + case jobCtl of + JobCtlQueue job -> do + void . lift . lift $ upsertBy + (UniqueCronLastExec $ toJSON job) + CronLastExec + { cronLastExecJob = toJSON job + , cronLastExecTime = now + , cronLastExecInstance = instanceID' + } + [ CronLastExecTime =. now ] + lift . lift $ queueDBJob job + other -> writeJobCtl other + | otherwise + -> lift . mapReaderT (liftIO . atomically) $ + lift . void . flip swapTMVar newCrontab =<< asks jobCrontab - case nextMatch of - MatchAsap -> doJob - MatchNone -> return () - MatchAt nextTime -> do - JobContext{jobCrontab} <- ask - nextTime' <- applyJitter jobCtl nextTime - $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] - logFunc <- askLoggerIO - whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') - doJob + case nextMatch of + MatchAsap -> doJob + MatchNone -> return () + MatchAt nextTime -> do + JobContext{jobCrontab} <- ask + nextTime' <- applyJitter jobCtl nextTime + $logDebugS "Cron" [st|Waiting until #{tshow (utcToLocalTimeTZ appTZ nextTime')} to execute #{tshow jobCtl}|] + logFunc <- askLoggerIO + whenM (liftIO . flip runLoggingT logFunc $ waitUntil jobCrontab currentCrontab nextTime') + doJob - go + return True + when cont go where acc :: NominalDiffTime acc = 1e-3 @@ -244,12 +247,12 @@ execCrontab = evalStateT go HashMap.empty bool (waitUntil crontabTV crontab nextTime) (return False) crontabChanged -handleJobs' :: Natural -> Sink JobCtl (ReaderT JobContext Handler) () -handleJobs' wNum = C.mapM_ $ \jctl -> do +handleJobs' :: (MonadIO m, MonadLogger m, MonadCatch m) => UniWorX -> Natural -> Sink JobCtl (ReaderT JobContext m) () +handleJobs' foundation wNum = C.mapM_ $ \jctl -> do $logDebugS logIdent $ tshow jctl resVars <- mapReaderT (liftIO . atomically) $ HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) - res <- fmap (either Just $ const Nothing) . try $ handleCmd jctl + res <- fmap (either Just $ const Nothing) . try . (mapReaderT $ liftIO . unsafeHandler foundation) $ handleCmd jctl sentRes <- liftIO . atomically $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) case res of Just err