diff --git a/config/settings.yml b/config/settings.yml index d8b8b0330..252507577 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -26,7 +26,8 @@ mail-support: job-workers: "_env:JOB_WORKERS:10" job-flush-interval: "_env:JOB_FLUSH:30" job-cron-interval: "_env:CRON_INTERVAL:60" -job-stale-threshold: 300 +job-stale-threshold: 1800 +job-move-threshold: 30 notification-rate-limit: 3600 notification-collate-delay: 7200 notification-expiration: 259200 diff --git a/src/Control/Monad/Catch/Instances.hs b/src/Control/Monad/Catch/Instances.hs new file mode 100644 index 000000000..c0d1c3345 --- /dev/null +++ b/src/Control/Monad/Catch/Instances.hs @@ -0,0 +1,10 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} + +module Control.Monad.Catch.Instances + () where + +import ClassyPrelude +import Control.Monad.Catch + + +deriving instance Functor ExitCase diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index 7c03533a4..0f29237c5 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -174,6 +174,7 @@ import System.Clock.Instances as Import () import Data.Word.Word24.Instances as Import () import Control.Monad.Trans.Memo.StateCache.Instances as Import (hoistStateCache) import Database.Persist.Sql.Types.Instances as Import () +import Control.Monad.Catch.Instances as Import () import Crypto.Hash as Import (Digest, SHA3_256, SHA3_512) import Crypto.Random as Import (ChaChaDRG, Seed) diff --git a/src/Jobs.hs b/src/Jobs.hs index 17ec46921..d461b539b 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -40,10 +40,10 @@ import qualified Control.Monad.Catch as Exc import Data.Time.Zones -import Control.Concurrent.STM (retry) +import Control.Concurrent.STM (stateTVar, retry) import Control.Concurrent.STM.Delay -import UnliftIO.Concurrent (forkIO, myThreadId) +import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay) import Jobs.Handler.SendNotification @@ -68,6 +68,8 @@ import Control.Exception.Base (AsyncException) import Type.Reflection (typeOf) +import System.Clock + data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime @@ -143,11 +145,17 @@ manageJobPool :: forall m. => UniWorX -> (forall a. m a -> m a) -> m () manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> flip runContT return . callCC $ \terminate' -> - forever . join . lift . routeExc . atomically $ asum - [ spawnMissingWorkers - , reapDeadWorkers - , terminateGracefully terminate' - ] + forever . join . lift . routeExc $ do + transferInfo <- runMaybeT $ do + moveThreshold <- hoistMaybe $ appJobMoveThreshold appSettings' + let MkFixed (fromInteger -> delayTime) = realToFrac moveThreshold / 2 :: Micro + liftIO $ (,) <$> getTime Monotonic <*> newDelay delayTime + atomically . asum $ + [ spawnMissingWorkers + , reapDeadWorkers + ] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo ++ + [ terminateGracefully terminate' + ] where shutdownOnException :: ((forall m'. Monad m' => m (m' ()) -> m (m' ())) -> m a) -> m a shutdownOnException act = do @@ -193,10 +201,8 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> | shouldTerminate -> return $ return () | otherwise -> do - queue <- readTVar chan - nextVal <- case jqDequeue queue of - Nothing -> retry - Just (j, q) -> j <$ writeTVar chan q + mNext <- stateTVar chan $ \q -> maybe (Nothing, q) (over _1 Just) $ jqDequeue q + nextVal <- hoistMaybe mNext return $ yield nextVal >> streamChan runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" @@ -231,10 +237,11 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers' return (nextVal, receiver) whenIsJust next $ \(nextVal, receiver) -> do - atomically . modifyTVar' receiver $ jqInsert nextVal + atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!) go in go + terminateGracefully :: (() -> ContT () m ()) -> STM (ContT () m ()) terminateGracefully terminate = do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guard shouldTerminate @@ -246,6 +253,37 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc -> $logInfoS "JobPoolManager" "Shutting down" terminate () + transferJobs :: TimeSpec -> STM (ContT () m ()) + transferJobs oldTime = do + moveThreshold <- hoistMaybe $ appJobMoveThreshold appSettings' + let isOld ts = oldTime - ts >= realToFrac moveThreshold + + oldState <- readTMVar appJobState + wState <- mapM readTVar $ jobWorkers oldState + + let receivers = Map.keysSet $ Map.filter ((== 0) . jqDepth) wState + senders' = Map.keysSet $ Map.filter (ianyOf jqContents $ \(_, Down qTime) _ -> isOld qTime) wState + senders = senders' `Set.difference` receivers + sendJobs = Map.restrictKeys wState senders ^.. folded . backwards jqContents . filtered jobMovable + + guard $ not (null receivers) + && not (null senders) + && not (null sendJobs) + + let movePairs = flip zip sendJobs . evalRand (uniforms receivers) . mkStdGen $ hash oldTime + + iforMOf_ (_jobWorkers .> itraversed) oldState $ \w tv -> if + | w `elem` senders + -> writeTVar tv mempty + | w `elem` receivers + -> forM_ movePairs $ \(recv, j) -> if + | recv == w -> readTVar tv >>= jqInsert j >>= (writeTVar tv $!) + | otherwise -> return () + | otherwise + -> return () + + return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|] + stopJobCtl :: MonadUnliftIO m => UniWorX -> m () -- ^ Stop all worker threads currently running stopJobCtl UniWorX{appJobState} = do @@ -278,7 +316,7 @@ execCrontab = do | otherwise = return () runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued - mapRWST (liftHandler . runDB . setSerializable) mergeState + mapRWST (liftHandler . runDB . setSerializableBatch) mergeState refT <- liftIO getCurrentTime settings <- getsYesod appSettings' @@ -300,7 +338,7 @@ execCrontab = do atomically . writeTVar crontabTVar $ Just (now, currentCrontab') $logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab' - let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do + let doJob = mapRWST (liftHandler . runDBJobs . setSerializableBatch) $ do newCrontab <- lift $ hoist lift determineCrontab' when (newCrontab /= currentCrontab) $ mapRWST (liftIO . atomically) $ @@ -416,9 +454,15 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) - handleCmd JobCtlTest = return () - handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) - handleCmd (JobCtlQueue job) = lift $ queueJob' job + handleCmd JobCtlTest = $logDebugS logIdent "JobCtlTest" + handleCmd JobCtlFlush = do + $logDebugS logIdent "JobCtlFlush..." + void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) + $logInfoS logIdent "JobCtlFlush" + handleCmd (JobCtlQueue job) = do + $logDebugS logIdent "JobCtlQueue..." + lift $ queueJob' job + $logInfoS logIdent "JobCtlQueue" handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c @@ -447,42 +491,49 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker delete jId case performJob content of - JobHandlerAtomic act -> runDBJobs . setSerializable $ do + JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do act & withJobWorkerState wNum (JobWorkerExecJob content) hoist lift cleanup JobHandlerException act -> do act & withJobWorkerState wNum (JobWorkerExecJob content) - runDB $ setSerializable cleanup + runDB $ setSerializableBatch cleanup JobHandlerAtomicWithFinalizer act fin -> do - res <- runDBJobs . setSerializable $ do + res <- runDBJobs . setSerializableBatch $ do res <- act & withJobWorkerState wNum (JobWorkerExecJob content) hoist lift cleanup return res fin res handleCmd JobCtlDetermineCrontab = do - newCTab <- liftHandler . runDB $ setSerializable determineCrontab' + $logDebugS logIdent "DetermineCrontab..." + newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab' + $logInfoS logIdent "DetermineCrontab" -- logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ lift . void . flip swapTVar newCTab =<< asks jobCrontab handleCmd (JobCtlGenerateHealthReport kind) = do hrStorage <- getsYesod appHealthReport + $logDebugS logIdent [st|#{tshow kind}...|] newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind - $logInfoS (tshow kind) $ toPathPiece newStatus + $logInfoS logIdent [st|#{tshow kind}: #{toPathPiece newStatus}|] unless (newStatus == HealthSuccess) $ do - $logErrorS (tshow kind) $ tshow newReport + $logErrorS logIdent [st|#{tshow kind}: #{tshow newReport}|] liftIO $ do now <- getCurrentTime let updateReports = Set.insert (now, newReport) . Set.filter (((/=) `on` classifyHealthReport) newReport . snd) atomically . modifyTVar' hrStorage $ force . updateReports + handleCmd (JobCtlSleep secs@(MkFixed (fromIntegral -> msecs))) = do + $logInfoS logIdent [st|Sleeping #{tshow secs}s...|] + threadDelay msecs + $logInfoS logIdent [st|Slept #{tshow secs}s.|] jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a jLocked jId act = flip evalStateT False $ do let lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob) - lock = hoist (hoist $ runDB . setSerializable) $ do + lock = hoist (hoist $ runDB . setSerializableBatch) $ do qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId instanceID' <- getsYesod $ view instanceID threshold <- getsYesod $ view _appJobStaleThreshold @@ -511,7 +562,7 @@ jLocked jId act = flip evalStateT False $ do unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) () unlock (Entity jId' _) = whenM State.get $ do atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks - lift . lift . runDB . setSerializable $ + lift . lift . runDB . setSerializableBatch $ update jId' [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index c3559482e..17862fec9 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -36,6 +36,8 @@ import Handler.Utils.Files (sourceFileDB) import Control.Monad.Logger (askLoggerIO, runLoggingT) +import System.Clock + dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin @@ -236,10 +238,11 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do logger <- askLoggerIO didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions - let sendChunks = go 0 0 + let sendChunks = go 0 0 Nothing . toNanoSecs =<< liftIO (getTime Monotonic) where - go :: forall m. MonadIO m => Natural -> Int64 -> ConduitT ByteString Void m () - go c accsz = do + go :: forall m. MonadIO m => Natural -> Int64 -> Maybe Integer -> Integer -> ConduitT ByteString Void m () + go c accsz lastReport startT = do + currT <- liftIO $ toNanoSecs <$> getTime Monotonic chunk' <- await whenIsJust chunk' $ \chunk -> do let csz = fromIntegral $ olength chunk @@ -247,9 +250,15 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do p :: Centi p = realToFrac $ (toInteger sz' % toInteger sz) * 100 !c' = succ c - runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{tshow sz'}/#{tshow sz} (#{tshow p}%)...|] + eta :: Integer + eta = ceiling $ ((currT - startT) % fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz) + !lastReport' + | currT - fromMaybe startT lastReport > 5e9 = Just currT + | otherwise = lastReport + when (lastReport' /= lastReport) $ + runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{tshow sz'}/#{tshow sz} (#{tshow p}%) ETA #{tshow eta}s...|] atomically . putTMVar chunkVar $ Just chunk - go c' sz' + go c' sz' lastReport' startT lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks return True if diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index bb2faf762..491ce98b7 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -30,6 +30,8 @@ import UnliftIO.Concurrent (myThreadId) import Control.Monad.Trans.Resource (register) +import System.Clock (getTime, Clock(Monotonic)) + data JobQueueException = JobQueuePoolEmpty | JobQueueWorkerNotFound @@ -46,7 +48,7 @@ writeJobCtl' target cmd = do | null jobWorkers -> throwM JobQueuePoolEmpty | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers - -> atomically . modifyTVar' chan $ jqInsert cmd + -> atomically $ readTVar chan >>= jqInsert cmd >>= (writeTVar chan $!) | otherwise -> throwM JobQueueWorkerNotFound @@ -56,9 +58,15 @@ writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ( -- Instructions are assigned deterministically and pseudo-randomly to one specific worker. -- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others writeJobCtl cmd = do - names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar + jSt <- asks appJobState + names <- atomically $ jobWorkerNames <$> readTMVar jSt + when (null names) $ throwM JobQueuePoolEmpty tid <- myThreadId - let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names + cTime <- liftIO $ getTime Monotonic + let + epoch :: Int64 + epoch = round cTime `div` 3600 + target = evalRand ?? mkStdGen (hash epoch `hashWithSalt` tid `hashWithSalt` cmd) $ uniform names writeJobCtl' target cmd diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index ee1c9aa3b..851ad9ac7 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -9,14 +9,14 @@ module Jobs.Types , YesodJobDB , JobHandler(..), _JobHandlerAtomic, _JobHandlerException , JobContext(..) - , JobState(..) + , JobState(..), _jobWorkers, _jobWorkerName, _jobContext, _jobPoolManager, _jobCron, _jobShutdown, _jobCurrentCrontab , jobWorkerNames , JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob , JobWorkerId , showWorkerId, newWorkerId - , JobQueue, jqInsert, jqDequeue, jqDepth + , JobQueue, jqInsert, jqDequeue', jqDequeue, jqDepth, jqContents , JobPriority(..), prioritiseJob - , jobNoQueueSame + , jobNoQueueSame, jobMovable , module Cron ) where @@ -39,6 +39,9 @@ import qualified Data.PQueue.Prio.Max as PQ import Cron (CronNextMatch(..), _MatchAsap, _MatchAt, _MatchNone) +import System.Clock (getTime, Clock(Monotonic), TimeSpec) +import GHC.Conc (unsafeIOToSTM) + data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } | JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext } @@ -150,8 +153,11 @@ data JobCtl = JobCtlFlush | JobCtlQueue Job | JobCtlGenerateHealthReport HealthCheck | JobCtlTest + | JobCtlSleep Micro -- | For debugging deriving (Eq, Ord, Read, Show, Generic, Typeable) +makePrisms ''JobCtl + instance Hashable JobCtl instance NFData JobCtl @@ -242,22 +248,36 @@ jobNoQueueSame = \case JobRechunkFiles{} -> True _ -> False +jobMovable :: JobCtl -> Bool +jobMovable = isn't _JobCtlTest -newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue JobPriority JobCtl } + +newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue (JobPriority, Down TimeSpec) JobCtl } deriving (Eq, Ord, Read, Show) deriving newtype (Semigroup, Monoid, NFData) makePrisms ''JobQueue -jqInsert :: JobCtl -> JobQueue -> JobQueue -jqInsert job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job) job +jqInsert' :: TimeSpec -> JobCtl -> JobQueue -> JobQueue +jqInsert' cTime job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job, Down cTime) job + +jqInsert :: JobCtl -> JobQueue -> STM JobQueue +jqInsert job queue = do + cTime <- unsafeIOToSTM $ getTime Monotonic + return $ jqInsert' cTime job queue + +jqDequeue' :: JobQueue -> Maybe (((JobPriority, Down TimeSpec), JobCtl), JobQueue) +jqDequeue' = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxViewWithKey . getJobQueue jqDequeue :: JobQueue -> Maybe (JobCtl, JobQueue) -jqDequeue = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxView . getJobQueue +jqDequeue = fmap (over _1 $ view _2) . jqDequeue' jqDepth :: Integral n => JobQueue -> n jqDepth = fromIntegral . PQ.size . getJobQueue +jqContents :: IndexedTraversal' (JobPriority, Down TimeSpec) JobQueue JobCtl +jqContents = _JobQueue . PQ.traverseWithKey . indexed + data JobState = JobState { jobWorkers :: Map (Async ()) (TVar JobQueue) @@ -271,3 +291,5 @@ data JobState = JobState jobWorkerNames :: JobState -> Set JobWorkerId jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers + +makeLenses_ ''JobState diff --git a/src/Settings.hs b/src/Settings.hs index acedff5c4..b1b37557b 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -117,6 +117,7 @@ data AppSettings = AppSettings , appJobFlushInterval :: Maybe NominalDiffTime , appJobCronInterval :: Maybe NominalDiffTime , appJobStaleThreshold :: NominalDiffTime + , appJobMoveThreshold :: Maybe DiffTime , appNotificationRateLimit :: NominalDiffTime , appNotificationCollateDelay :: NominalDiffTime , appNotificationExpiration :: NominalDiffTime @@ -454,6 +455,7 @@ instance FromJSON AppSettings where appJobFlushInterval <- o .:? "job-flush-interval" appJobCronInterval <- o .:? "job-cron-interval" appJobStaleThreshold <- o .: "job-stale-threshold" + appJobMoveThreshold <- o .:? "job-move-threshold" appNotificationRateLimit <- o .: "notification-rate-limit" appNotificationCollateDelay <- o .: "notification-collate-delay" appNotificationExpiration <- o .: "notification-expiration" diff --git a/src/System/Clock/Instances.hs b/src/System/Clock/Instances.hs index 16082a700..936d6ea38 100644 --- a/src/System/Clock/Instances.hs +++ b/src/System/Clock/Instances.hs @@ -8,6 +8,35 @@ import ClassyPrelude import System.Clock import Data.Ratio ((%)) +import Data.Fixed + +import Control.Lens + instance Real TimeSpec where toRational TimeSpec{..} = fromIntegral sec + fromIntegral nsec % 1e9 + +instance Fractional TimeSpec where + a / b = fromRational $ toRational a / toRational b + fromRational n = fromNanoSecs n' + where MkFixed n' = fromRational n :: Nano + +instance RealFrac TimeSpec where + properFraction = over _2 fromRational . properFraction . toRational + + round x = let (n,r) = properFraction x + m = bool (n + 1) (n -1) $ r < fromRational 0 + s = signum (abs r - fromRational 0.5) + in if | s == fromRational (-1) -> n + | s == fromRational 0 -> bool m n $ even n + | s == fromRational 1 -> m + | otherwise -> error "round @TimeSpec: Bad value" + + ceiling x = bool n (n + 1) $ r > 0 + where (n,r) = properFraction x + + floor x = bool n (n - 1) $ r > 0 + where (n,r) = properFraction x + +instance NFData TimeSpec +instance Hashable TimeSpec diff --git a/src/UnliftIO/Async/Utils.hs b/src/UnliftIO/Async/Utils.hs index 3e0184997..851a72367 100644 --- a/src/UnliftIO/Async/Utils.hs +++ b/src/UnliftIO/Async/Utils.hs @@ -4,35 +4,66 @@ module UnliftIO.Async.Utils , allocateAsyncMasked, allocateLinkedAsyncMasked ) where -import ClassyPrelude hiding (cancel, async, link) +import ClassyPrelude hiding (cancel, async, link, finally, mask) import Control.Lens +import Control.Lens.Extras (is) import qualified UnliftIO.Async as UnliftIO import qualified Control.Concurrent.Async as A import Control.Monad.Trans.Resource +import qualified Control.Monad.Trans.Resource.Internal as ResourceT.Internal +import Data.Acquire + +import Control.Monad.Catch + + +withReference :: forall m a. (MonadUnliftIO m, MonadResource m) => ((IO (), IO ()) -> m a) -> m a +withReference act = do + releaseAct <- newEmptyTMVarIO + + let doAlloc = do + iSt <- liftResourceT getInternalState + liftIO $ mask $ \_ -> do + ResourceT.Internal.stateAlloc iSt + atomically $ putTMVar releaseAct () + return iSt + doRelease iSt eCase = liftIO . whenM (atomically $ is _Just <$> tryTakeTMVar releaseAct) $ do + flip ResourceT.Internal.stateCleanup iSt $ case eCase of + ExitCaseSuccess _ -> ReleaseNormal + ExitCaseException _ -> ReleaseException + ExitCaseAbort -> ReleaseEarly + + withRunInIO $ \run -> + fmap fst . generalBracket (run doAlloc) doRelease $ \iSt -> do + res <- run $ act + ( atomically $ takeTMVar releaseAct + , ResourceT.Internal.stateCleanup ReleaseNormal iSt + ) + atomically $ guard =<< isEmptyTMVar releaseAct + return res allocateAsync :: forall m a. ( MonadUnliftIO m, MonadResource m ) => m a -> m (Async a) -allocateAsync act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel . A.async $ run act +allocateAsync act = withReference $ \(signalReady, releaseRef) -> withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel . A.async . flip finally releaseRef $ signalReady >> run act allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => m a -> m (Async a) allocateLinkedAsync = uncurry (<$) . (id &&& UnliftIO.link) <=< allocateAsync allocateAsyncWithUnmask :: forall m a. - ( MonadUnliftIO m, MonadResource m ) + ( MonadUnliftIO m, MonadResource m) => ((forall b. m b -> m b) -> m a) -> m (Async a) -allocateAsyncWithUnmask act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel $ A.asyncWithUnmask $ \unmask -> run $ act (liftIO . unmask . run) +allocateAsyncWithUnmask act = withReference $ \(signalReady, releaseRef) -> withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel $ A.asyncWithUnmask $ \unmask -> flip finally releaseRef $ signalReady >> run (act $ liftIO . unmask . run) allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. m b -> m b) -> m a) -> m (Async a) allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& UnliftIO.link) =<< allocateAsyncWithUnmask act allocateAsyncMasked :: forall m a. - ( MonadUnliftIO m, MonadResource m ) + ( MonadUnliftIO m, MonadResource m) => m a -> m (Async a) allocateAsyncMasked act = allocateAsyncWithUnmask (const act) diff --git a/src/Utils.hs b/src/Utils.hs index 446f66d30..654bd22c9 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -106,8 +106,10 @@ import Algebra.Lattice (top, bottom, (/\), (\/), BoundedJoinSemiLattice, Bounded import Data.Constraint (Dict(..)) -import Control.Monad.Random.Class (MonadRandom) +import Control.Monad.Random.Class (MonadSplit(getSplit), MonadRandom, MonadInterleave(interleave), uniform) +import Control.Monad.Random (RandomGen) import qualified System.Random.Shuffle as Rand (shuffleM) +import qualified Control.Monad.Random.Lazy as LazyRand import Data.Data (Data) import qualified Data.Text.Lazy.Builder as Builder @@ -739,6 +741,9 @@ throwExceptT :: ( Exception e, MonadThrow m ) => ExceptT e m a -> m a throwExceptT = exceptT throwM return +generalFinally :: MonadMask m => m a -> (ExitCase a -> m b) -> m a +generalFinally action finalizer = view _1 <$> generalBracket (return ()) (const finalizer) (const action) + ------------ -- Monads -- ------------ @@ -1192,6 +1197,10 @@ unstableSortOn = unstableSortBy . comparing unstableSort :: (MonadRandom m, Ord a) => [a] -> m [a] unstableSort = unstableSortBy compare +uniforms :: (RandomGen g, MonadSplit g m, Foldable t) => t a -> m [a] +uniforms xs = LazyRand.evalRand go <$> getSplit + where go = (:) <$> interleave (uniform xs) <*> go + ---------- -- Lens -- ---------- @@ -1273,3 +1282,10 @@ infixr 4 () :: FilePath -> FilePath -> FilePath dir file = dir dropDrive file + + +---------------- +-- TH Dungeon -- +---------------- + +makePrisms ''ExitCase diff --git a/src/Utils/Form.hs b/src/Utils/Form.hs index 2fa06586a..307bd6fad 100644 --- a/src/Utils/Form.hs +++ b/src/Utils/Form.hs @@ -1,6 +1,6 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE UndecidableInstances #-} -{-# OPTIONS_GHC -fno-warn-deprecations #-} +{-# OPTIONS_GHC -fno-warn-deprecations #-} -- `WidgetT`, `HandlerT` module Utils.Form where diff --git a/src/Utils/Sql.hs b/src/Utils/Sql.hs index ad51820d0..c0470ba30 100644 --- a/src/Utils/Sql.hs +++ b/src/Utils/Sql.hs @@ -1,5 +1,5 @@ module Utils.Sql - ( setSerializable, setSerializable' + ( setSerializable, setSerializableBatch, setSerializable' , catchSql, handleSql , isUniqueConstraintViolation , catchIfSql, handleIfSql @@ -30,6 +30,9 @@ import Text.Shakespeare.Text (st) setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 + +setSerializableBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a +setSerializableBatch = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6 setSerializable' :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => RetryPolicyM (SqlPersistT m) -> SqlPersistT m a -> ReaderT SqlBackend m a setSerializable' policy act = do