chore(utils): add timeoutHandler to run a sub-handler to be killed by timeout
This commit is contained in:
parent
a06f345391
commit
4154b1f26b
@ -279,8 +279,8 @@ getCourseNewR = do
|
|||||||
, E.desc $ courseCreated course] -- most recent created course
|
, E.desc $ courseCreated course] -- most recent created course
|
||||||
E.limit 1
|
E.limit 1
|
||||||
return course
|
return course
|
||||||
template <- case listToMaybe oldCourses of
|
template <- case oldCourses of
|
||||||
(Just oldTemplate) ->
|
(oldTemplate:_) ->
|
||||||
let newTemplate = courseToForm oldTemplate mempty mempty in
|
let newTemplate = courseToForm oldTemplate mempty mempty in
|
||||||
return $ Just $ newTemplate
|
return $ Just $ newTemplate
|
||||||
{ cfCourseId = Nothing
|
{ cfCourseId = Nothing
|
||||||
@ -289,7 +289,7 @@ getCourseNewR = do
|
|||||||
, cfRegTo = Nothing
|
, cfRegTo = Nothing
|
||||||
, cfDeRegUntil = Nothing
|
, cfDeRegUntil = Nothing
|
||||||
}
|
}
|
||||||
Nothing -> do
|
[] -> do
|
||||||
(tidOk,sshOk,cshOk) <- runDB $ (,,)
|
(tidOk,sshOk,cshOk) <- runDB $ (,,)
|
||||||
<$> ifMaybeM mbTid True existsKey
|
<$> ifMaybeM mbTid True existsKey
|
||||||
<*> ifMaybeM mbSsh True existsKey
|
<*> ifMaybeM mbSsh True existsKey
|
||||||
|
|||||||
@ -35,6 +35,8 @@ import Handler.Utils.Qualification as Handler.Utils
|
|||||||
|
|
||||||
import Handler.Utils.Term as Handler.Utils
|
import Handler.Utils.Term as Handler.Utils
|
||||||
|
|
||||||
|
import Handler.Utils.Concurrent as Handler.Utils
|
||||||
|
|
||||||
import Control.Monad.Logger
|
import Control.Monad.Logger
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
32
src/Handler/Utils/Concurrent.hs
Normal file
32
src/Handler/Utils/Concurrent.hs
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
-- SPDX-FileCopyrightText: 2024 Steffen Jost <s.jost@fraport.de>
|
||||||
|
--
|
||||||
|
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
|
||||||
|
module Handler.Utils.Concurrent
|
||||||
|
( module Handler.Utils.Concurrent
|
||||||
|
) where
|
||||||
|
|
||||||
|
|
||||||
|
import Import
|
||||||
|
import UnliftIO.Concurrent
|
||||||
|
|
||||||
|
|
||||||
|
-- | Run a handler action until it finishes or if it exceeds a given number of microseconds via `registerDelay`
|
||||||
|
timeoutHandler :: Int -> HandlerFor site a -> HandlerFor site (Maybe a)
|
||||||
|
timeoutHandler maxWait act = do
|
||||||
|
innerAct <- handlerToIO
|
||||||
|
(hresult, tid) <- liftIO $ do
|
||||||
|
hresult <- newTVarIO Nothing
|
||||||
|
tid <- forkIO $ innerAct $ do
|
||||||
|
res <- act
|
||||||
|
liftIO $ atomically $ writeTVar hresult $ Just res
|
||||||
|
return (hresult, tid)
|
||||||
|
res <- liftIO $ do
|
||||||
|
flag <- registerDelay maxWait
|
||||||
|
atomically $ do
|
||||||
|
res <- readTVar hresult
|
||||||
|
out <- readTVar flag
|
||||||
|
checkSTM $ out || isJust res
|
||||||
|
return res
|
||||||
|
when (isNothing res) $ killThread tid
|
||||||
|
return res
|
||||||
@ -47,7 +47,7 @@ import qualified Control.Monad.Catch as Exc
|
|||||||
|
|
||||||
import Data.Time.Zones
|
import Data.Time.Zones
|
||||||
|
|
||||||
import Control.Concurrent.STM (stateTVar, retry)
|
import Control.Concurrent.STM (stateTVar)
|
||||||
import Control.Concurrent.STM.Delay
|
import Control.Concurrent.STM.Delay
|
||||||
|
|
||||||
import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay)
|
import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay)
|
||||||
@ -260,7 +260,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
|
|||||||
(nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan
|
(nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan
|
||||||
lift . lift $ writeTVar chan newQueue
|
lift . lift $ writeTVar chan newQueue
|
||||||
jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
|
jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
|
||||||
receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
|
receiver <- maybe (lift $ lift retrySTM) return =<< uniformMay jobWorkers'
|
||||||
return (nextVal, receiver)
|
return (nextVal, receiver)
|
||||||
whenIsJust next $ \(nextVal, receiver) -> do
|
whenIsJust next $ \(nextVal, receiver) -> do
|
||||||
atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!)
|
atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!)
|
||||||
@ -373,8 +373,8 @@ execCrontab = do
|
|||||||
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
|
State.modify . HashMap.filterWithKey $ \k _ -> HashMap.member k crontab
|
||||||
prevExec <- State.get
|
prevExec <- State.get
|
||||||
case earliestJob settings prevExec crontab refT of
|
case earliestJob settings prevExec crontab refT of
|
||||||
Nothing -> liftBase retry
|
Nothing -> liftBase retrySTM
|
||||||
Just (_, MatchNone) -> liftBase retry
|
Just (_, MatchNone) -> liftBase retrySTM
|
||||||
Just x -> return (crontab, x, prevExec)
|
Just x -> return (crontab, x, prevExec)
|
||||||
|
|
||||||
do
|
do
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user