fix(jobs): adjust job handling to hopefully reduce load

This commit is contained in:
Gregor Kleen 2020-12-09 13:25:26 +01:00
parent 271696c617
commit ed38f93537
8 changed files with 84 additions and 30 deletions

View File

@ -144,14 +144,14 @@ yesodMiddleware = cacheControlMiddleware . storeBearerMiddleware . csrfMiddlewar
approotHost <- hoistMaybe $ approotScopeHost rApproot app
let doRedirect = do
url <- approotRender rApproot route
$logErrorS "normalizeApprootMiddleware" url
$logDebugS "normalizeApprootMiddleware" url
redirect url
if | approotHost /= reqHost
, rApproot /= ApprootUserGenerated
-> doRedirect
| approotHost /= reqHost -> do
resp <- try $ lift handler
$logErrorS "normalizeApprootMiddleware" $ tshow (is _Right resp, preview _Left resp)
$logDebugS "normalizeApprootMiddleware" $ tshow (is _Right resp, preview _Left resp)
case resp of
Right _ -> doRedirect
Left sc | is _HCRedirect sc -> throwM sc

View File

@ -6,7 +6,10 @@ import Import
import Jobs
import Handler.Utils.DateTime
import Data.Aeson.Encode.Pretty (encodePrettyToTextBuilder)
import qualified Data.Aeson.Encode.Pretty as Pretty
import Data.Aeson.Encode.Pretty (encodePrettyToTextBuilder')
import qualified Data.Text as Text
getAdminCrontabR :: Handler Html
@ -16,7 +19,7 @@ getAdminCrontabR = do
JobState{jobCurrentCrontab} <- MaybeT $ tryReadTMVar jState
MaybeT $ readTVar jobCurrentCrontab
let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _1 . _MatchNone)
let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _3 . _MatchNone)
siteLayoutMsg MsgMenuAdminCrontab $ do
setTitleI MsgMenuAdminCrontab
@ -26,7 +29,7 @@ getAdminCrontabR = do
<p>
^{formatTimeW SelFormatDateTime genTime}
<table .table .table--striped .table--hover>
$forall (match, job) <- crontab
$forall (job, lExec, match) <- crontab
<tr .table__row>
<td .table__td>
$case match
@ -36,9 +39,18 @@ getAdminCrontabR = do
_{MsgCronMatchNone}
$of MatchAt t
^{formatTimeW SelFormatDateTime t}
<td .table__td>
$maybe lT <- lExec
^{formatTimeW SelFormatDateTime lT}
<td .table__td>
<pre>
#{encodePrettyToTextBuilder job}
#{doEnc job}
$nothing
_{MsgAdminCrontabNotGenerated}
|]
where doEnc = encodePrettyToTextBuilder' Pretty.defConfig
{ Pretty.confIndent = Pretty.Spaces 2
, Pretty.confCompare = comparing $ \t -> ( t `elem` ["instruction", "job", "notification"]
, Text.splitOn "-" t
)
}

View File

@ -123,7 +123,7 @@ ensureApprootUserGeneratedMaybe'
-> m ()
ensureApprootUserGeneratedMaybe' source = maybeT (return ()) $ do
route <- (,) <$> MaybeT getCurrentRoute <*> fmap reqGetParams getRequest
$logErrorS "ensureApproot" $ tshow route
$logDebugS "ensureApproot" $ tshow route
rApproot <- hoistMaybe <=< lift . runMaybeT $ do
reqHost <- MaybeT $ W.requestHeaderHost <$> waiRequest
let rApproot = authoritiveApproot $ urlRoute route
@ -131,10 +131,10 @@ ensureApprootUserGeneratedMaybe' source = maybeT (return ()) $ do
approotHost <- MaybeT . getsYesod $ approotScopeHost rApproot
guard $ approotHost /= reqHost
return rApproot
$logErrorS "ensureApproot" $ tshow rApproot
$logDebugS "ensureApproot" $ tshow rApproot
route' <- lift $ withFileDownloadTokenMaybe' source route
url <- approotRender rApproot route'
$logErrorS "ensureApprootUserGenerated" url
$logDebugS "ensureApprootUserGenerated" url
redirect url

View File

@ -333,7 +333,8 @@ execCrontab = do
do
lastTimes <- State.get
now <- liftIO getCurrentTime
let currentCrontab' = sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
let currentCrontab' = sortOn cmpProj . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (job, getMax <$> HashMap.lookup job lastTimes, ) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
where cmpProj (j, lT, qT) = (qT, lT, j)
crontabTVar <- asks jobCurrentCrontab
atomically . writeTVar crontabTVar $ Just (now, currentCrontab')
$logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab'
@ -342,7 +343,7 @@ execCrontab = do
newCrontab <- lift $ hoist lift determineCrontab'
when (newCrontab /= currentCrontab) $
mapRWST (liftIO . atomically) $
liftBase . void . flip swapTVar newCrontab =<< asks (jobCrontab . jobContext)
liftBase . flip writeTVar newCrontab =<< asks (jobCrontab . jobContext)
mergeState
newState <- State.get
@ -390,14 +391,13 @@ execCrontab = do
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
go' (jobCtl, cron) mbPrev
| Just (_, t') <- mbPrev
, t' < t
= mbPrev
| otherwise
= Just (jobCtl, t)
go' (jobCtl, cron) = Just . ($ (jobCtl, t)) . maybe id (minOn cmpProj)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
cmpProj (j, qT) = ( qT
, getMax <$> HashMap.lookup j lastTimes
, j
)
waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = do
@ -513,7 +513,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
$logInfoS logIdent "DetermineCrontab"
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . void . flip swapTVar newCTab =<< asks jobCrontab
lift . flip writeTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport
$logDebugS logIdent [st|#{tshow kind}...|]

View File

@ -315,17 +315,13 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
sz = fromIntegral $ Minio.oiSize objInfo
fRef' <- runDB $ do
logger <- askLoggerIO
chunkVar <- newEmptyTMVarIO
dbAsync <- allocateLinkedAsync $ do
atomically $ isEmptyTMVar chunkVar >>= guard . not
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
logger <- askLoggerIO
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
let sendChunks = go 0 0 Nothing =<< liftIO (getTime Monotonic)
let report = go 0 0 Nothing =<< liftIO (getTime Monotonic)
where
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString Void m ()
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString ByteString m ()
go c accsz lastReport startT = do
currT <- liftIO $ getTime Monotonic
chunk' <- await
@ -345,13 +341,18 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
return . ceiling $ (toRational currT - toRational startT) / fromIntegral accsz' * (fromIntegral sz - fromIntegral accsz)
when (lastReport' /= lastReport || sz' >= fromIntegral sz) $
flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes
[ pure [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
[ pure [st|Sinking chunk ##{tshow c}: #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
, eta <&> \eta' -> [st| ETA #{textDuration eta'}|]
, pure "..."
]
atomically . putTMVar chunkVar $ Just chunk
yield chunk
go c' sz' lastReport' startT
lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks
atomically $ isEmptyTMVar chunkVar >>= guard . not
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () .| persistentTokenBucketRateLimit' TokenBucketInjectFiles olength .| report
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
return True
if
| not didSend -> Nothing <$ cancel dbAsync

View File

@ -313,7 +313,7 @@ data JobState = JobState
, jobPoolManager :: Async ()
, jobCron :: Async ()
, jobShutdown :: TMVar ()
, jobCurrentCrontab :: TVar (Maybe (UTCTime, [(CronNextMatch UTCTime, JobCtl)]))
, jobCurrentCrontab :: TVar (Maybe (UTCTime, [(JobCtl, Maybe UTCTime, CronNextMatch UTCTime)]))
}
jobWorkerNames :: JobState -> Set JobWorkerId

View File

@ -1318,6 +1318,18 @@ clampMin, clampMax :: Ord a
clampMin = max
clampMax = min
minBy,maxBy :: (a -> a -> Ordering) -> a -> a -> a
minBy cmp a b = case a `cmp` b of
GT -> b
_ -> a
maxBy cmp a b = case a `cmp` b of
LT -> b
_ -> a
minOn,maxOn :: Ord b => (a -> b) -> a -> a -> a
minOn = minBy . comparing
maxOn = maxBy . comparing
------------
-- Random --
------------

View File

@ -2,12 +2,15 @@ module Utils.PersistentTokenBucket
( TokenBucketSettings(..)
, persistentTokenBucketTryAlloc', persistentTokenBucketTakeC'
, persistentTokenBucketTryAlloc, persistentTokenBucketTakeC
, persistentTokenBucketRateLimit', persistentTokenBucketRateLimit
) where
import Import.NoFoundation
import qualified Data.Conduit.Combinators as C
import Control.Concurrent.STM.Delay
data TokenBucketSettings = TokenBucketSettings
{ tbsIdent :: TokenBucketIdent
@ -83,3 +86,29 @@ persistentTokenBucketTakeC tbs cTokens = C.mapAccumWhileM tbAccum ()
-> SqlPersistT m (Either () ((), i))
tbAccum x ()
= bool (Left ()) (Right ((), x)) <$> persistentTokenBucketTryAlloc tbs (cTokens x)
persistentTokenBucketRateLimit :: forall i m a.
( MonadIO m, Integral a )
=> TokenBucketSettings
-> (i -> a)
-> ConduitT i i m ()
persistentTokenBucketRateLimit TokenBucketSettings{tbsInvRate} cTokens = awaitForever $ \x@(cTokens -> s) -> do
yield x
let
MkFixed (fromIntegral -> dTime) = (realToFrac $ fromIntegral s * tbsInvRate :: Micro)
liftIO $ atomically . waitDelay =<< newDelay dTime
persistentTokenBucketRateLimit' :: forall i m a.
(MonadHandler m, HasAppSettings (HandlerSite m), Integral a)
=> TokenBucketIdent
-> (i -> a)
-> ConduitT i i m ()
persistentTokenBucketRateLimit' tbsIdent cTokens = do
TokenBucketConf{..} <- getsYesod $ views _appPersistentTokenBuckets ($ tbsIdent)
persistentTokenBucketRateLimit TokenBucketSettings
{ tbsIdent
, tbsDepth = tokenBucketDepth
, tbsInvRate = tokenBucketInvRate
, tbsInitialValue = tokenBucketInitialValue
} cTokens