From adc8d466ac0948dcddf601fac439bb4e8d3bf619 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Thu, 26 Sep 2019 11:56:33 +0200 Subject: [PATCH] fix(jobs): cleaner shutdown of job-pool-manager --- src/Application.hs | 4 ++-- src/Jobs.hs | 34 ++++++++++++++++++++++++++-------- src/UnliftIO/Async/Utils.hs | 20 ++++++++++++++++++++ 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/Application.hs b/src/Application.hs index 294e1a206..41ca6fed4 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -380,7 +380,7 @@ develMain = runResourceT $ do void . liftIO $ awaitTermination `race` runSettings wsettings app -- | The @main@ function for an executable running this site. -appMain :: forall m. MonadUnliftIO m => m () +appMain :: forall m. (MonadUnliftIO m, MonadMask m) => m () appMain = runResourceT $ do settings <- getAppSettings @@ -472,7 +472,7 @@ appMain = runResourceT $ do foundationStoreNum :: Word32 foundationStoreNum = 2 -getApplicationRepl :: (MonadResource m, MonadUnliftIO m) => m (Int, UniWorX, Application) +getApplicationRepl :: (MonadResource m, MonadUnliftIO m, MonadMask m) => m (Int, UniWorX, Application) getApplicationRepl = do settings <- getAppDevSettings foundation <- makeFoundation settings diff --git a/src/Jobs.hs b/src/Jobs.hs index d2de34d8d..587aa9ac3 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -86,6 +86,7 @@ instance Exception JobQueueException handleJobs :: ( MonadResource m , MonadLogger m , MonadUnliftIO m + , MonadMask m ) => UniWorX -> m () -- | Spawn a set of workers that read control commands from `appJobCtl` and address them as they come in @@ -97,7 +98,7 @@ handleJobs foundation@UniWorX{..} | otherwise = do UnliftIO{..} <- askUnliftIO - jobPoolManager <- allocateLinkedAsync . unliftIO $ manageJobPool foundation + jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> unliftIO $ manageJobPool foundation unmask jobCron <- allocateLinkedAsync . unliftIO $ manageCrontab foundation @@ -129,15 +130,32 @@ manageJobPool :: forall m. ( MonadResource m , MonadLogger m , MonadUnliftIO m + , MonadMask m ) - => UniWorX -> m () -manageJobPool foundation@UniWorX{..} - = flip runContT return . forever . join . atomically $ asum - [ spawnMissingWorkers - , reapDeadWorkers - , terminateGracefully - ] + => UniWorX -> (forall a. IO a -> IO a) -> m () +manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ + flip runContT return . forever . join . atomically $ asum + [ spawnMissingWorkers + , reapDeadWorkers + , terminateGracefully + ] where + shutdownOnException :: m a -> m a + shutdownOnException act = do + UnliftIO{..} <- askUnliftIO + + actAsync <- allocateLinkedAsyncMasked $ unliftIO act + + let handleExc e = do + atomically $ do + jState <- tryReadTMVar appJobState + for_ jState $ \JobState{jobShutdown} -> tryPutTMVar jobShutdown () + + void $ wait actAsync + throwM e + + unmask (wait actAsync) `catchAll` handleExc + num :: Int num = fromIntegral $ foundation ^. _appJobWorkers diff --git a/src/UnliftIO/Async/Utils.hs b/src/UnliftIO/Async/Utils.hs index 862d057e4..fb1dbc978 100644 --- a/src/UnliftIO/Async/Utils.hs +++ b/src/UnliftIO/Async/Utils.hs @@ -1,5 +1,7 @@ module UnliftIO.Async.Utils ( allocateAsync, allocateLinkedAsync + , allocateAsyncWithUnmask, allocateLinkedAsyncWithUnmask + , allocateAsyncMasked, allocateLinkedAsyncMasked ) where import ClassyPrelude hiding (cancel, async, link) @@ -17,3 +19,21 @@ allocateAsync = fmap (view _2) . flip allocate cancel . liftIO . async allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => IO a -> m (Async a) allocateLinkedAsync = uncurry (<$) . (id &&& link) <=< allocateAsync + + +allocateAsyncWithUnmask :: forall m a. + MonadResource m + => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +allocateAsyncWithUnmask act = fmap (view _2) . flip allocate cancel . liftIO $ asyncWithUnmask act + +allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& link) =<< allocateAsyncWithUnmask act + + +allocateAsyncMasked :: forall m a. + MonadResource m + => IO a -> m (Async a) +allocateAsyncMasked act = fmap (view _2) . flip allocate cancel . liftIO $ asyncWithUnmask (const act) + +allocateLinkedAsyncMasked :: forall m a. (MonadUnliftIO m, MonadResource m) => IO a -> m (Async a) +allocateLinkedAsyncMasked = uncurry (<$) . (id &&& link) <=< allocateAsyncMasked