diff --git a/src/Foundation/Yesod/Middleware.hs b/src/Foundation/Yesod/Middleware.hs index 8bb5bb35d..8e79af686 100644 --- a/src/Foundation/Yesod/Middleware.hs +++ b/src/Foundation/Yesod/Middleware.hs @@ -144,14 +144,14 @@ yesodMiddleware = cacheControlMiddleware . storeBearerMiddleware . csrfMiddlewar approotHost <- hoistMaybe $ approotScopeHost rApproot app let doRedirect = do url <- approotRender rApproot route - $logErrorS "normalizeApprootMiddleware" url + $logDebugS "normalizeApprootMiddleware" url redirect url if | approotHost /= reqHost , rApproot /= ApprootUserGenerated -> doRedirect | approotHost /= reqHost -> do resp <- try $ lift handler - $logErrorS "normalizeApprootMiddleware" $ tshow (is _Right resp, preview _Left resp) + $logDebugS "normalizeApprootMiddleware" $ tshow (is _Right resp, preview _Left resp) case resp of Right _ -> doRedirect Left sc | is _HCRedirect sc -> throwM sc diff --git a/src/Handler/Admin/Crontab.hs b/src/Handler/Admin/Crontab.hs index bc8a3097f..e1e5313f8 100644 --- a/src/Handler/Admin/Crontab.hs +++ b/src/Handler/Admin/Crontab.hs @@ -6,7 +6,10 @@ import Import import Jobs import Handler.Utils.DateTime -import Data.Aeson.Encode.Pretty (encodePrettyToTextBuilder) +import qualified Data.Aeson.Encode.Pretty as Pretty +import Data.Aeson.Encode.Pretty (encodePrettyToTextBuilder') + +import qualified Data.Text as Text getAdminCrontabR :: Handler Html @@ -16,7 +19,7 @@ getAdminCrontabR = do JobState{jobCurrentCrontab} <- MaybeT $ tryReadTMVar jState MaybeT $ readTVar jobCurrentCrontab - let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _1 . _MatchNone) + let mCrontab = mCrontab' <&> _2 %~ filter (hasn't $ _3 . _MatchNone) siteLayoutMsg MsgMenuAdminCrontab $ do setTitleI MsgMenuAdminCrontab @@ -26,7 +29,7 @@ getAdminCrontabR = do
^{formatTimeW SelFormatDateTime genTime}
| $case match @@ -36,9 +39,18 @@ getAdminCrontabR = do _{MsgCronMatchNone} $of MatchAt t ^{formatTimeW SelFormatDateTime t} + | + $maybe lT <- lExec + ^{formatTimeW SelFormatDateTime lT} |
- #{encodePrettyToTextBuilder job}
+ #{doEnc job}
$nothing
_{MsgAdminCrontabNotGenerated}
|]
+ where doEnc = encodePrettyToTextBuilder' Pretty.defConfig
+ { Pretty.confIndent = Pretty.Spaces 2
+ , Pretty.confCompare = comparing $ \t -> ( t `elem` ["instruction", "job", "notification"]
+ , Text.splitOn "-" t
+ )
+ }
diff --git a/src/Handler/Utils/Download.hs b/src/Handler/Utils/Download.hs
index 40e2c78d2..27c6c35ad 100644
--- a/src/Handler/Utils/Download.hs
+++ b/src/Handler/Utils/Download.hs
@@ -123,7 +123,7 @@ ensureApprootUserGeneratedMaybe'
-> m ()
ensureApprootUserGeneratedMaybe' source = maybeT (return ()) $ do
route <- (,) <$> MaybeT getCurrentRoute <*> fmap reqGetParams getRequest
- $logErrorS "ensureApproot" $ tshow route
+ $logDebugS "ensureApproot" $ tshow route
rApproot <- hoistMaybe <=< lift . runMaybeT $ do
reqHost <- MaybeT $ W.requestHeaderHost <$> waiRequest
let rApproot = authoritiveApproot $ urlRoute route
@@ -131,10 +131,10 @@ ensureApprootUserGeneratedMaybe' source = maybeT (return ()) $ do
approotHost <- MaybeT . getsYesod $ approotScopeHost rApproot
guard $ approotHost /= reqHost
return rApproot
- $logErrorS "ensureApproot" $ tshow rApproot
+ $logDebugS "ensureApproot" $ tshow rApproot
route' <- lift $ withFileDownloadTokenMaybe' source route
url <- approotRender rApproot route'
- $logErrorS "ensureApprootUserGenerated" url
+ $logDebugS "ensureApprootUserGenerated" url
redirect url
diff --git a/src/Jobs.hs b/src/Jobs.hs
index 7b779d0a2..f667d0edb 100644
--- a/src/Jobs.hs
+++ b/src/Jobs.hs
@@ -333,7 +333,8 @@ execCrontab = do
do
lastTimes <- State.get
now <- liftIO getCurrentTime
- let currentCrontab' = sortOn fst . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (,job) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
+ let currentCrontab' = sortOn cmpProj . flip map (HashMap.toList currentCrontab) $ \(job, cron) -> (job, getMax <$> HashMap.lookup job lastTimes, ) $ nextCronMatch appTZ (getMax <$> HashMap.lookup job lastTimes) (debouncingAcc settings job) now cron
+ where cmpProj (j, lT, qT) = (qT, lT, j)
crontabTVar <- asks jobCurrentCrontab
atomically . writeTVar crontabTVar $ Just (now, currentCrontab')
$logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab'
@@ -342,7 +343,7 @@ execCrontab = do
newCrontab <- lift $ hoist lift determineCrontab'
when (newCrontab /= currentCrontab) $
mapRWST (liftIO . atomically) $
- liftBase . void . flip swapTVar newCrontab =<< asks (jobCrontab . jobContext)
+ liftBase . flip writeTVar newCrontab =<< asks (jobCrontab . jobContext)
mergeState
newState <- State.get
@@ -390,14 +391,13 @@ execCrontab = do
earliestJob :: AppSettings -> HashMap JobCtl (Max UTCTime) -> Crontab JobCtl -> UTCTime -> Maybe (JobCtl, CronNextMatch UTCTime)
earliestJob settings lastTimes crontab now = foldr go' Nothing $ HashMap.toList crontab
where
- go' (jobCtl, cron) mbPrev
- | Just (_, t') <- mbPrev
- , t' < t
- = mbPrev
- | otherwise
- = Just (jobCtl, t)
+ go' (jobCtl, cron) = Just . ($ (jobCtl, t)) . maybe id (minOn cmpProj)
where
t = nextCronMatch appTZ (getMax <$> HashMap.lookup jobCtl lastTimes) (debouncingAcc settings jobCtl) now cron
+ cmpProj (j, qT) = ( qT
+ , getMax <$> HashMap.lookup j lastTimes
+ , j
+ )
waitUntil :: (Eq a, MonadUnliftIO m, MonadLogger m) => TVar a -> a -> UTCTime -> m Bool
waitUntil crontabTV crontab nextTime = do
@@ -513,7 +513,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
$logInfoS logIdent "DetermineCrontab"
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
- lift . void . flip swapTVar newCTab =<< asks jobCrontab
+ lift . flip writeTVar newCTab =<< asks jobCrontab
handleCmd (JobCtlGenerateHealthReport kind) = do
hrStorage <- getsYesod appHealthReport
$logDebugS logIdent [st|#{tshow kind}...|]
diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs
index 43a5122d8..e100c571d 100644
--- a/src/Jobs/Handler/Files.hs
+++ b/src/Jobs/Handler/Files.hs
@@ -315,17 +315,13 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
sz = fromIntegral $ Minio.oiSize objInfo
fRef' <- runDB $ do
+ logger <- askLoggerIO
+
chunkVar <- newEmptyTMVarIO
dbAsync <- allocateLinkedAsync $ do
- atomically $ isEmptyTMVar chunkVar >>= guard . not
- sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
-
- logger <- askLoggerIO
- didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
- objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
- let sendChunks = go 0 0 Nothing =<< liftIO (getTime Monotonic)
+ let report = go 0 0 Nothing =<< liftIO (getTime Monotonic)
where
- go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString Void m ()
+ go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString ByteString m ()
go c accsz lastReport startT = do
currT <- liftIO $ getTime Monotonic
chunk' <- await
@@ -345,13 +341,18 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
return . ceiling $ (toRational currT - toRational startT) / fromIntegral accsz' * (fromIntegral sz - fromIntegral accsz)
when (lastReport' /= lastReport || sz' >= fromIntegral sz) $
flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes
- [ pure [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
+ [ pure [st|Sinking chunk ##{tshow c}: #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
, eta <&> \eta' -> [st| ETA #{textDuration eta'}|]
, pure "..."
]
- atomically . putTMVar chunkVar $ Just chunk
+ yield chunk
go c' sz' lastReport' startT
- lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks
+ atomically $ isEmptyTMVar chunkVar >>= guard . not
+ sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () .| persistentTokenBucketRateLimit' TokenBucketInjectFiles olength .| report
+
+ didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
+ objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
+ lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
return True
if
| not didSend -> Nothing <$ cancel dbAsync
diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs
index a4fce51d6..f08801213 100644
--- a/src/Jobs/Types.hs
+++ b/src/Jobs/Types.hs
@@ -313,7 +313,7 @@ data JobState = JobState
, jobPoolManager :: Async ()
, jobCron :: Async ()
, jobShutdown :: TMVar ()
- , jobCurrentCrontab :: TVar (Maybe (UTCTime, [(CronNextMatch UTCTime, JobCtl)]))
+ , jobCurrentCrontab :: TVar (Maybe (UTCTime, [(JobCtl, Maybe UTCTime, CronNextMatch UTCTime)]))
}
jobWorkerNames :: JobState -> Set JobWorkerId
diff --git a/src/Utils.hs b/src/Utils.hs
index 004d9cfd1..619eefe85 100644
--- a/src/Utils.hs
+++ b/src/Utils.hs
@@ -1318,6 +1318,18 @@ clampMin, clampMax :: Ord a
clampMin = max
clampMax = min
+minBy,maxBy :: (a -> a -> Ordering) -> a -> a -> a
+minBy cmp a b = case a `cmp` b of
+ GT -> b
+ _ -> a
+maxBy cmp a b = case a `cmp` b of
+ LT -> b
+ _ -> a
+
+minOn,maxOn :: Ord b => (a -> b) -> a -> a -> a
+minOn = minBy . comparing
+maxOn = maxBy . comparing
+
------------
-- Random --
------------
diff --git a/src/Utils/PersistentTokenBucket.hs b/src/Utils/PersistentTokenBucket.hs
index ca9f05f22..798323a29 100644
--- a/src/Utils/PersistentTokenBucket.hs
+++ b/src/Utils/PersistentTokenBucket.hs
@@ -2,12 +2,15 @@ module Utils.PersistentTokenBucket
( TokenBucketSettings(..)
, persistentTokenBucketTryAlloc', persistentTokenBucketTakeC'
, persistentTokenBucketTryAlloc, persistentTokenBucketTakeC
+ , persistentTokenBucketRateLimit', persistentTokenBucketRateLimit
) where
import Import.NoFoundation
import qualified Data.Conduit.Combinators as C
+import Control.Concurrent.STM.Delay
+
data TokenBucketSettings = TokenBucketSettings
{ tbsIdent :: TokenBucketIdent
@@ -83,3 +86,29 @@ persistentTokenBucketTakeC tbs cTokens = C.mapAccumWhileM tbAccum ()
-> SqlPersistT m (Either () ((), i))
tbAccum x ()
= bool (Left ()) (Right ((), x)) <$> persistentTokenBucketTryAlloc tbs (cTokens x)
+
+
+persistentTokenBucketRateLimit :: forall i m a.
+ ( MonadIO m, Integral a )
+ => TokenBucketSettings
+ -> (i -> a)
+ -> ConduitT i i m ()
+persistentTokenBucketRateLimit TokenBucketSettings{tbsInvRate} cTokens = awaitForever $ \x@(cTokens -> s) -> do
+ yield x
+ let
+ MkFixed (fromIntegral -> dTime) = (realToFrac $ fromIntegral s * tbsInvRate :: Micro)
+ liftIO $ atomically . waitDelay =<< newDelay dTime
+
+persistentTokenBucketRateLimit' :: forall i m a.
+ (MonadHandler m, HasAppSettings (HandlerSite m), Integral a)
+ => TokenBucketIdent
+ -> (i -> a)
+ -> ConduitT i i m ()
+persistentTokenBucketRateLimit' tbsIdent cTokens = do
+ TokenBucketConf{..} <- getsYesod $ views _appPersistentTokenBuckets ($ tbsIdent)
+ persistentTokenBucketRateLimit TokenBucketSettings
+ { tbsIdent
+ , tbsDepth = tokenBucketDepth
+ , tbsInvRate = tokenBucketInvRate
+ , tbsInitialValue = tokenBucketInitialValue
+ } cTokens
|