fix: improve async behaviour
This commit is contained in:
parent
39f12957f5
commit
cc7a5289a4
@ -77,10 +77,7 @@ update = do
|
|||||||
(port, site, app) <- getApplicationRepl
|
(port, site, app) <- getApplicationRepl
|
||||||
resourceForkIO $ do
|
resourceForkIO $ do
|
||||||
finally (liftIO $ runSettings (setPort port defaultSettings) app)
|
finally (liftIO $ runSettings (setPort port defaultSettings) app)
|
||||||
-- Note that this implies concurrency
|
(liftIO $ shutdownApp site >> putMVar done ())
|
||||||
-- between shutdownApp and the next app that is starting.
|
|
||||||
-- Normally this should be fine
|
|
||||||
(liftIO $ putMVar done () >> shutdownApp site)
|
|
||||||
|
|
||||||
-- | kill the server
|
-- | kill the server
|
||||||
shutdown :: IO ()
|
shutdown :: IO ()
|
||||||
|
|||||||
11
src/Jobs.hs
11
src/Jobs.hs
@ -55,8 +55,6 @@ import Data.Time.Zones
|
|||||||
import Control.Concurrent.STM (retry)
|
import Control.Concurrent.STM (retry)
|
||||||
import Control.Concurrent.STM.Delay
|
import Control.Concurrent.STM.Delay
|
||||||
|
|
||||||
import UnliftIO.Concurrent (forkIO)
|
|
||||||
|
|
||||||
|
|
||||||
import Jobs.Handler.SendNotification
|
import Jobs.Handler.SendNotification
|
||||||
import Jobs.Handler.SendTestEmail
|
import Jobs.Handler.SendTestEmail
|
||||||
@ -143,6 +141,9 @@ manageJobPool foundation@UniWorX{..}
|
|||||||
|
|
||||||
spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ())
|
spawnMissingWorkers, reapDeadWorkers, terminateGracefully :: STM (ContT () m ())
|
||||||
spawnMissingWorkers = do
|
spawnMissingWorkers = do
|
||||||
|
shouldTerminate' <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||||
|
guard $ not shouldTerminate'
|
||||||
|
|
||||||
oldState <- takeTMVar appJobState
|
oldState <- takeTMVar appJobState
|
||||||
let missing = num - Map.size (jobWorkers oldState)
|
let missing = num - Map.size (jobWorkers oldState)
|
||||||
guard $ missing > 0
|
guard $ missing > 0
|
||||||
@ -204,6 +205,10 @@ manageJobPool foundation@UniWorX{..}
|
|||||||
terminateGracefully = do
|
terminateGracefully = do
|
||||||
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||||
guard shouldTerminate
|
guard shouldTerminate
|
||||||
|
|
||||||
|
oldState <- takeTMVar appJobState
|
||||||
|
guard $ 0 == Map.size (jobWorkers oldState)
|
||||||
|
|
||||||
return . callCC $ \terminate -> do
|
return . callCC $ \terminate -> do
|
||||||
$logInfoS "JobPoolManager" "Shutting down"
|
$logInfoS "JobPoolManager" "Shutting down"
|
||||||
terminate ()
|
terminate ()
|
||||||
@ -214,7 +219,7 @@ stopJobCtl UniWorX{appJobState} = do
|
|||||||
didStop <- atomically $ do
|
didStop <- atomically $ do
|
||||||
jState <- tryReadTMVar appJobState
|
jState <- tryReadTMVar appJobState
|
||||||
for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown ()
|
for jState $ \jSt'@JobState{jobShutdown} -> jSt' <$ tryPutTMVar jobShutdown ()
|
||||||
whenIsJust didStop $ \jSt' -> void . forkIO . atomically $ do
|
whenIsJust didStop $ \jSt' -> void . atomically $ do
|
||||||
workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState
|
workers <- maybe [] (Map.keys . jobWorkers) <$> tryTakeTMVar appJobState
|
||||||
mapM_ (void . waitCatchSTM) $
|
mapM_ (void . waitCatchSTM) $
|
||||||
[ jobPoolManager jSt'
|
[ jobPoolManager jSt'
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import ClassyPrelude.Yesod
|
|||||||
import Database.PostgreSQL.Simple (SqlError(SqlError), sqlErrorHint)
|
import Database.PostgreSQL.Simple (SqlError(SqlError), sqlErrorHint)
|
||||||
import Control.Monad.Catch (MonadMask)
|
import Control.Monad.Catch (MonadMask)
|
||||||
|
|
||||||
|
import Database.Persist.Sql
|
||||||
import Database.Persist.Sql.Raw.QQ
|
import Database.Persist.Sql.Raw.QQ
|
||||||
|
|
||||||
import Control.Retry
|
import Control.Retry
|
||||||
@ -14,20 +15,22 @@ import Control.Retry
|
|||||||
import Control.Lens ((&))
|
import Control.Lens ((&))
|
||||||
|
|
||||||
|
|
||||||
retryTransaction :: forall m a. (MonadLogger m, MonadMask m, MonadIO m) => m a -> m a
|
setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m) => ReaderT SqlBackend m a -> ReaderT SqlBackend m a
|
||||||
retryTransaction = recovering policy [logRetries suggestRetry logRetry] . const
|
setSerializable act = recovering policy [logRetries suggestRetry logRetry] act'
|
||||||
where
|
where
|
||||||
policy :: RetryPolicyM m
|
policy :: RetryPolicyM (ReaderT SqlBackend m)
|
||||||
policy = fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
|
policy = fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
|
||||||
|
|
||||||
suggestRetry :: SqlError -> m Bool
|
suggestRetry :: SqlError -> ReaderT SqlBackend m Bool
|
||||||
suggestRetry SqlError{sqlErrorHint} = return $ "The transaction might succeed if retried." `isInfixOf` sqlErrorHint
|
suggestRetry SqlError{sqlErrorHint} = return $ "The transaction might succeed if retried." `isInfixOf` sqlErrorHint
|
||||||
|
|
||||||
logRetry :: Bool -- ^ Will retry
|
logRetry :: Bool -- ^ Will retry
|
||||||
-> SqlError
|
-> SqlError
|
||||||
-> RetryStatus
|
-> RetryStatus
|
||||||
-> m ()
|
-> ReaderT SqlBackend m ()
|
||||||
logRetry shouldRetry err status = $logDebugS "Sql" . pack $ defaultLogMsg shouldRetry err status
|
logRetry shouldRetry err status = $logDebugS "Sql" . pack $ defaultLogMsg shouldRetry err status
|
||||||
|
|
||||||
setSerializable :: (MonadLogger m, MonadMask m, MonadIO m) => ReaderT SqlBackend m a -> ReaderT SqlBackend m a
|
act' :: RetryStatus -> ReaderT SqlBackend m a
|
||||||
setSerializable act = retryTransaction $ [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act
|
act' RetryStatus{..}
|
||||||
|
| rsIterNumber == 0 = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act
|
||||||
|
| otherwise = transactionUndoWithIsolation Serializable *> act
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user