feat(jobs): move held-up jobs to different workers
This commit is contained in:
parent
967ec26fff
commit
284aae1213
@ -26,7 +26,8 @@ mail-support:
|
||||
job-workers: "_env:JOB_WORKERS:10"
|
||||
job-flush-interval: "_env:JOB_FLUSH:30"
|
||||
job-cron-interval: "_env:CRON_INTERVAL:60"
|
||||
job-stale-threshold: 300
|
||||
job-stale-threshold: 1800
|
||||
job-move-threshold: 30
|
||||
notification-rate-limit: 3600
|
||||
notification-collate-delay: 7200
|
||||
notification-expiration: 259200
|
||||
|
||||
10
src/Control/Monad/Catch/Instances.hs
Normal file
10
src/Control/Monad/Catch/Instances.hs
Normal file
@ -0,0 +1,10 @@
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Control.Monad.Catch.Instances
|
||||
() where
|
||||
|
||||
import ClassyPrelude
|
||||
import Control.Monad.Catch
|
||||
|
||||
|
||||
deriving instance Functor ExitCase
|
||||
@ -174,6 +174,7 @@ import System.Clock.Instances as Import ()
|
||||
import Data.Word.Word24.Instances as Import ()
|
||||
import Control.Monad.Trans.Memo.StateCache.Instances as Import (hoistStateCache)
|
||||
import Database.Persist.Sql.Types.Instances as Import ()
|
||||
import Control.Monad.Catch.Instances as Import ()
|
||||
|
||||
import Crypto.Hash as Import (Digest, SHA3_256, SHA3_512)
|
||||
import Crypto.Random as Import (ChaChaDRG, Seed)
|
||||
|
||||
101
src/Jobs.hs
101
src/Jobs.hs
@ -40,10 +40,10 @@ import qualified Control.Monad.Catch as Exc
|
||||
|
||||
import Data.Time.Zones
|
||||
|
||||
import Control.Concurrent.STM (retry)
|
||||
import Control.Concurrent.STM (stateTVar, retry)
|
||||
import Control.Concurrent.STM.Delay
|
||||
|
||||
import UnliftIO.Concurrent (forkIO, myThreadId)
|
||||
import UnliftIO.Concurrent (forkIO, myThreadId, threadDelay)
|
||||
|
||||
|
||||
import Jobs.Handler.SendNotification
|
||||
@ -68,6 +68,8 @@ import Control.Exception.Base (AsyncException)
|
||||
|
||||
import Type.Reflection (typeOf)
|
||||
|
||||
import System.Clock
|
||||
|
||||
|
||||
data JobQueueException = JInvalid QueuedJobId QueuedJob
|
||||
| JLocked QueuedJobId InstanceId UTCTime
|
||||
@ -143,11 +145,17 @@ manageJobPool :: forall m.
|
||||
=> UniWorX -> (forall a. m a -> m a) -> m ()
|
||||
manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
|
||||
flip runContT return . callCC $ \terminate' ->
|
||||
forever . join . lift . routeExc . atomically $ asum
|
||||
[ spawnMissingWorkers
|
||||
, reapDeadWorkers
|
||||
, terminateGracefully terminate'
|
||||
]
|
||||
forever . join . lift . routeExc $ do
|
||||
transferInfo <- runMaybeT $ do
|
||||
moveThreshold <- hoistMaybe $ appJobMoveThreshold appSettings'
|
||||
let MkFixed (fromInteger -> delayTime) = realToFrac moveThreshold / 2 :: Micro
|
||||
liftIO $ (,) <$> getTime Monotonic <*> newDelay delayTime
|
||||
atomically . asum $
|
||||
[ spawnMissingWorkers
|
||||
, reapDeadWorkers
|
||||
] ++ maybe [] (\(cTime, delay) -> [return () <$ waitDelay delay, transferJobs cTime]) transferInfo ++
|
||||
[ terminateGracefully terminate'
|
||||
]
|
||||
where
|
||||
shutdownOnException :: ((forall m'. Monad m' => m (m' ()) -> m (m' ())) -> m a) -> m a
|
||||
shutdownOnException act = do
|
||||
@ -193,10 +201,8 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
|
||||
| shouldTerminate ->
|
||||
return $ return ()
|
||||
| otherwise -> do
|
||||
queue <- readTVar chan
|
||||
nextVal <- case jqDequeue queue of
|
||||
Nothing -> retry
|
||||
Just (j, q) -> j <$ writeTVar chan q
|
||||
mNext <- stateTVar chan $ \q -> maybe (Nothing, q) (over _1 Just) $ jqDequeue q
|
||||
nextVal <- hoistMaybe mNext
|
||||
return $ yield nextVal >> streamChan
|
||||
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
|
||||
$logInfoS logIdent "Started"
|
||||
@ -231,10 +237,11 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
|
||||
receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
|
||||
return (nextVal, receiver)
|
||||
whenIsJust next $ \(nextVal, receiver) -> do
|
||||
atomically . modifyTVar' receiver $ jqInsert nextVal
|
||||
atomically $ readTVar receiver >>= jqInsert nextVal >>= (writeTVar receiver $!)
|
||||
go
|
||||
in go
|
||||
|
||||
terminateGracefully :: (() -> ContT () m ()) -> STM (ContT () m ())
|
||||
terminateGracefully terminate = do
|
||||
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
guard shouldTerminate
|
||||
@ -246,6 +253,37 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ \routeExc ->
|
||||
$logInfoS "JobPoolManager" "Shutting down"
|
||||
terminate ()
|
||||
|
||||
transferJobs :: TimeSpec -> STM (ContT () m ())
|
||||
transferJobs oldTime = do
|
||||
moveThreshold <- hoistMaybe $ appJobMoveThreshold appSettings'
|
||||
let isOld ts = oldTime - ts >= realToFrac moveThreshold
|
||||
|
||||
oldState <- readTMVar appJobState
|
||||
wState <- mapM readTVar $ jobWorkers oldState
|
||||
|
||||
let receivers = Map.keysSet $ Map.filter ((== 0) . jqDepth) wState
|
||||
senders' = Map.keysSet $ Map.filter (ianyOf jqContents $ \(_, Down qTime) _ -> isOld qTime) wState
|
||||
senders = senders' `Set.difference` receivers
|
||||
sendJobs = Map.restrictKeys wState senders ^.. folded . backwards jqContents . filtered jobMovable
|
||||
|
||||
guard $ not (null receivers)
|
||||
&& not (null senders)
|
||||
&& not (null sendJobs)
|
||||
|
||||
let movePairs = flip zip sendJobs . evalRand (uniforms receivers) . mkStdGen $ hash oldTime
|
||||
|
||||
iforMOf_ (_jobWorkers .> itraversed) oldState $ \w tv -> if
|
||||
| w `elem` senders
|
||||
-> writeTVar tv mempty
|
||||
| w `elem` receivers
|
||||
-> forM_ movePairs $ \(recv, j) -> if
|
||||
| recv == w -> readTVar tv >>= jqInsert j >>= (writeTVar tv $!)
|
||||
| otherwise -> return ()
|
||||
| otherwise
|
||||
-> return ()
|
||||
|
||||
return $ $logWarnS "JobPoolManager" [st|Moved #{tshow (olength movePairs)} long-unadressed jobs from #{tshow (olength senders)} senders to #{tshow (olength receivers)} receivers|]
|
||||
|
||||
stopJobCtl :: MonadUnliftIO m => UniWorX -> m ()
|
||||
-- ^ Stop all worker threads currently running
|
||||
stopJobCtl UniWorX{appJobState} = do
|
||||
@ -278,7 +316,7 @@ execCrontab = do
|
||||
| otherwise = return ()
|
||||
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
|
||||
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
|
||||
mapRWST (liftHandler . runDB . setSerializable) mergeState
|
||||
mapRWST (liftHandler . runDB . setSerializableBatch) mergeState
|
||||
|
||||
refT <- liftIO getCurrentTime
|
||||
settings <- getsYesod appSettings'
|
||||
@ -300,7 +338,7 @@ execCrontab = do
|
||||
atomically . writeTVar crontabTVar $ Just (now, currentCrontab')
|
||||
$logDebugS "Crontab" . intercalate "\n" $ "Current crontab:" : map tshow currentCrontab'
|
||||
|
||||
let doJob = mapRWST (liftHandler . runDBJobs . setSerializable) $ do
|
||||
let doJob = mapRWST (liftHandler . runDBJobs . setSerializableBatch) $ do
|
||||
newCrontab <- lift $ hoist lift determineCrontab'
|
||||
when (newCrontab /= currentCrontab) $
|
||||
mapRWST (liftIO . atomically) $
|
||||
@ -416,9 +454,15 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
|
||||
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
|
||||
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
|
||||
|
||||
handleCmd JobCtlTest = return ()
|
||||
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
handleCmd (JobCtlQueue job) = lift $ queueJob' job
|
||||
handleCmd JobCtlTest = $logDebugS logIdent "JobCtlTest"
|
||||
handleCmd JobCtlFlush = do
|
||||
$logDebugS logIdent "JobCtlFlush..."
|
||||
void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
$logInfoS logIdent "JobCtlFlush"
|
||||
handleCmd (JobCtlQueue job) = do
|
||||
$logDebugS logIdent "JobCtlQueue..."
|
||||
lift $ queueJob' job
|
||||
$logInfoS logIdent "JobCtlQueue"
|
||||
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \(Entity _ j@QueuedJob{..}) -> lift $ do
|
||||
content <- case fromJSON queuedJobContent of
|
||||
Aeson.Success c -> return c
|
||||
@ -447,42 +491,49 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
|
||||
delete jId
|
||||
|
||||
case performJob content of
|
||||
JobHandlerAtomic act -> runDBJobs . setSerializable $ do
|
||||
JobHandlerAtomic act -> runDBJobs . setSerializableBatch $ do
|
||||
act & withJobWorkerState wNum (JobWorkerExecJob content)
|
||||
hoist lift cleanup
|
||||
JobHandlerException act -> do
|
||||
act & withJobWorkerState wNum (JobWorkerExecJob content)
|
||||
runDB $ setSerializable cleanup
|
||||
runDB $ setSerializableBatch cleanup
|
||||
JobHandlerAtomicWithFinalizer act fin -> do
|
||||
res <- runDBJobs . setSerializable $ do
|
||||
res <- runDBJobs . setSerializableBatch $ do
|
||||
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
|
||||
hoist lift cleanup
|
||||
return res
|
||||
fin res
|
||||
handleCmd JobCtlDetermineCrontab = do
|
||||
newCTab <- liftHandler . runDB $ setSerializable determineCrontab'
|
||||
$logDebugS logIdent "DetermineCrontab..."
|
||||
newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab'
|
||||
$logInfoS logIdent "DetermineCrontab"
|
||||
-- logDebugS logIdent $ tshow newCTab
|
||||
mapReaderT (liftIO . atomically) $
|
||||
lift . void . flip swapTVar newCTab =<< asks jobCrontab
|
||||
handleCmd (JobCtlGenerateHealthReport kind) = do
|
||||
hrStorage <- getsYesod appHealthReport
|
||||
$logDebugS logIdent [st|#{tshow kind}...|]
|
||||
newReport@(healthReportStatus -> newStatus) <- lift $ generateHealthReport kind
|
||||
|
||||
$logInfoS (tshow kind) $ toPathPiece newStatus
|
||||
$logInfoS logIdent [st|#{tshow kind}: #{toPathPiece newStatus}|]
|
||||
unless (newStatus == HealthSuccess) $ do
|
||||
$logErrorS (tshow kind) $ tshow newReport
|
||||
$logErrorS logIdent [st|#{tshow kind}: #{tshow newReport}|]
|
||||
|
||||
liftIO $ do
|
||||
now <- getCurrentTime
|
||||
let updateReports = Set.insert (now, newReport)
|
||||
. Set.filter (((/=) `on` classifyHealthReport) newReport . snd)
|
||||
atomically . modifyTVar' hrStorage $ force . updateReports
|
||||
handleCmd (JobCtlSleep secs@(MkFixed (fromIntegral -> msecs))) = do
|
||||
$logInfoS logIdent [st|Sleeping #{tshow secs}s...|]
|
||||
threadDelay msecs
|
||||
$logInfoS logIdent [st|Slept #{tshow secs}s.|]
|
||||
|
||||
jLocked :: QueuedJobId -> (Entity QueuedJob -> ReaderT JobContext Handler a) -> ReaderT JobContext Handler a
|
||||
jLocked jId act = flip evalStateT False $ do
|
||||
let
|
||||
lock :: StateT Bool (ReaderT JobContext Handler) (Entity QueuedJob)
|
||||
lock = hoist (hoist $ runDB . setSerializable) $ do
|
||||
lock = hoist (hoist $ runDB . setSerializableBatch) $ do
|
||||
qj@QueuedJob{..} <- lift . lift $ maybe (throwM $ JNonexistant jId) return =<< get jId
|
||||
instanceID' <- getsYesod $ view instanceID
|
||||
threshold <- getsYesod $ view _appJobStaleThreshold
|
||||
@ -511,7 +562,7 @@ jLocked jId act = flip evalStateT False $ do
|
||||
unlock :: Entity QueuedJob -> StateT Bool (ReaderT JobContext Handler) ()
|
||||
unlock (Entity jId' _) = whenM State.get $ do
|
||||
atomically . flip modifyTVar' (Set.delete jId') =<< asks jobHeldLocks
|
||||
lift . lift . runDB . setSerializable $
|
||||
lift . lift . runDB . setSerializableBatch $
|
||||
update jId' [ QueuedJobLockInstance =. Nothing
|
||||
, QueuedJobLockTime =. Nothing
|
||||
]
|
||||
|
||||
@ -36,6 +36,8 @@ import Handler.Utils.Files (sourceFileDB)
|
||||
|
||||
import Control.Monad.Logger (askLoggerIO, runLoggingT)
|
||||
|
||||
import System.Clock
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
|
||||
@ -236,10 +238,11 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
logger <- askLoggerIO
|
||||
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
|
||||
let sendChunks = go 0 0
|
||||
let sendChunks = go 0 0 Nothing . toNanoSecs =<< liftIO (getTime Monotonic)
|
||||
where
|
||||
go :: forall m. MonadIO m => Natural -> Int64 -> ConduitT ByteString Void m ()
|
||||
go c accsz = do
|
||||
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe Integer -> Integer -> ConduitT ByteString Void m ()
|
||||
go c accsz lastReport startT = do
|
||||
currT <- liftIO $ toNanoSecs <$> getTime Monotonic
|
||||
chunk' <- await
|
||||
whenIsJust chunk' $ \chunk -> do
|
||||
let csz = fromIntegral $ olength chunk
|
||||
@ -247,9 +250,15 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
p :: Centi
|
||||
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
|
||||
!c' = succ c
|
||||
runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{tshow sz'}/#{tshow sz} (#{tshow p}%)...|]
|
||||
eta :: Integer
|
||||
eta = ceiling $ ((currT - startT) % fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz)
|
||||
!lastReport'
|
||||
| currT - fromMaybe startT lastReport > 5e9 = Just currT
|
||||
| otherwise = lastReport
|
||||
when (lastReport' /= lastReport) $
|
||||
runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{tshow sz'}/#{tshow sz} (#{tshow p}%) ETA #{tshow eta}s...|]
|
||||
atomically . putTMVar chunkVar $ Just chunk
|
||||
go c' sz'
|
||||
go c' sz' lastReport' startT
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks
|
||||
return True
|
||||
if
|
||||
|
||||
@ -30,6 +30,8 @@ import UnliftIO.Concurrent (myThreadId)
|
||||
|
||||
import Control.Monad.Trans.Resource (register)
|
||||
|
||||
import System.Clock (getTime, Clock(Monotonic))
|
||||
|
||||
|
||||
data JobQueueException = JobQueuePoolEmpty
|
||||
| JobQueueWorkerNotFound
|
||||
@ -46,7 +48,7 @@ writeJobCtl' target cmd = do
|
||||
| null jobWorkers
|
||||
-> throwM JobQueuePoolEmpty
|
||||
| [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
|
||||
-> atomically . modifyTVar' chan $ jqInsert cmd
|
||||
-> atomically $ readTVar chan >>= jqInsert cmd >>= (writeTVar chan $!)
|
||||
| otherwise
|
||||
-> throwM JobQueueWorkerNotFound
|
||||
|
||||
@ -56,9 +58,15 @@ writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m (
|
||||
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
|
||||
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
|
||||
writeJobCtl cmd = do
|
||||
names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar
|
||||
jSt <- asks appJobState
|
||||
names <- atomically $ jobWorkerNames <$> readTMVar jSt
|
||||
when (null names) $ throwM JobQueuePoolEmpty
|
||||
tid <- myThreadId
|
||||
let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names
|
||||
cTime <- liftIO $ getTime Monotonic
|
||||
let
|
||||
epoch :: Int64
|
||||
epoch = round cTime `div` 3600
|
||||
target = evalRand ?? mkStdGen (hash epoch `hashWithSalt` tid `hashWithSalt` cmd) $ uniform names
|
||||
writeJobCtl' target cmd
|
||||
|
||||
|
||||
|
||||
@ -9,14 +9,14 @@ module Jobs.Types
|
||||
, YesodJobDB
|
||||
, JobHandler(..), _JobHandlerAtomic, _JobHandlerException
|
||||
, JobContext(..)
|
||||
, JobState(..)
|
||||
, JobState(..), _jobWorkers, _jobWorkerName, _jobContext, _jobPoolManager, _jobCron, _jobShutdown, _jobCurrentCrontab
|
||||
, jobWorkerNames
|
||||
, JobWorkerState(..), _jobWorkerJobCtl, _jobWorkerJob
|
||||
, JobWorkerId
|
||||
, showWorkerId, newWorkerId
|
||||
, JobQueue, jqInsert, jqDequeue, jqDepth
|
||||
, JobQueue, jqInsert, jqDequeue', jqDequeue, jqDepth, jqContents
|
||||
, JobPriority(..), prioritiseJob
|
||||
, jobNoQueueSame
|
||||
, jobNoQueueSame, jobMovable
|
||||
, module Cron
|
||||
) where
|
||||
|
||||
@ -39,6 +39,9 @@ import qualified Data.PQueue.Prio.Max as PQ
|
||||
|
||||
import Cron (CronNextMatch(..), _MatchAsap, _MatchAt, _MatchNone)
|
||||
|
||||
import System.Clock (getTime, Clock(Monotonic), TimeSpec)
|
||||
import GHC.Conc (unsafeIOToSTM)
|
||||
|
||||
|
||||
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
|
||||
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
|
||||
@ -150,8 +153,11 @@ data JobCtl = JobCtlFlush
|
||||
| JobCtlQueue Job
|
||||
| JobCtlGenerateHealthReport HealthCheck
|
||||
| JobCtlTest
|
||||
| JobCtlSleep Micro -- | For debugging
|
||||
deriving (Eq, Ord, Read, Show, Generic, Typeable)
|
||||
|
||||
makePrisms ''JobCtl
|
||||
|
||||
instance Hashable JobCtl
|
||||
instance NFData JobCtl
|
||||
|
||||
@ -242,22 +248,36 @@ jobNoQueueSame = \case
|
||||
JobRechunkFiles{} -> True
|
||||
_ -> False
|
||||
|
||||
jobMovable :: JobCtl -> Bool
|
||||
jobMovable = isn't _JobCtlTest
|
||||
|
||||
newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue JobPriority JobCtl }
|
||||
|
||||
newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue (JobPriority, Down TimeSpec) JobCtl }
|
||||
deriving (Eq, Ord, Read, Show)
|
||||
deriving newtype (Semigroup, Monoid, NFData)
|
||||
|
||||
makePrisms ''JobQueue
|
||||
|
||||
jqInsert :: JobCtl -> JobQueue -> JobQueue
|
||||
jqInsert job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job) job
|
||||
jqInsert' :: TimeSpec -> JobCtl -> JobQueue -> JobQueue
|
||||
jqInsert' cTime job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job, Down cTime) job
|
||||
|
||||
jqInsert :: JobCtl -> JobQueue -> STM JobQueue
|
||||
jqInsert job queue = do
|
||||
cTime <- unsafeIOToSTM $ getTime Monotonic
|
||||
return $ jqInsert' cTime job queue
|
||||
|
||||
jqDequeue' :: JobQueue -> Maybe (((JobPriority, Down TimeSpec), JobCtl), JobQueue)
|
||||
jqDequeue' = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxViewWithKey . getJobQueue
|
||||
|
||||
jqDequeue :: JobQueue -> Maybe (JobCtl, JobQueue)
|
||||
jqDequeue = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxView . getJobQueue
|
||||
jqDequeue = fmap (over _1 $ view _2) . jqDequeue'
|
||||
|
||||
jqDepth :: Integral n => JobQueue -> n
|
||||
jqDepth = fromIntegral . PQ.size . getJobQueue
|
||||
|
||||
jqContents :: IndexedTraversal' (JobPriority, Down TimeSpec) JobQueue JobCtl
|
||||
jqContents = _JobQueue . PQ.traverseWithKey . indexed
|
||||
|
||||
|
||||
data JobState = JobState
|
||||
{ jobWorkers :: Map (Async ()) (TVar JobQueue)
|
||||
@ -271,3 +291,5 @@ data JobState = JobState
|
||||
|
||||
jobWorkerNames :: JobState -> Set JobWorkerId
|
||||
jobWorkerNames JobState{..} = Set.map jobWorkerName $ Map.keysSet jobWorkers
|
||||
|
||||
makeLenses_ ''JobState
|
||||
|
||||
@ -117,6 +117,7 @@ data AppSettings = AppSettings
|
||||
, appJobFlushInterval :: Maybe NominalDiffTime
|
||||
, appJobCronInterval :: Maybe NominalDiffTime
|
||||
, appJobStaleThreshold :: NominalDiffTime
|
||||
, appJobMoveThreshold :: Maybe DiffTime
|
||||
, appNotificationRateLimit :: NominalDiffTime
|
||||
, appNotificationCollateDelay :: NominalDiffTime
|
||||
, appNotificationExpiration :: NominalDiffTime
|
||||
@ -454,6 +455,7 @@ instance FromJSON AppSettings where
|
||||
appJobFlushInterval <- o .:? "job-flush-interval"
|
||||
appJobCronInterval <- o .:? "job-cron-interval"
|
||||
appJobStaleThreshold <- o .: "job-stale-threshold"
|
||||
appJobMoveThreshold <- o .:? "job-move-threshold"
|
||||
appNotificationRateLimit <- o .: "notification-rate-limit"
|
||||
appNotificationCollateDelay <- o .: "notification-collate-delay"
|
||||
appNotificationExpiration <- o .: "notification-expiration"
|
||||
|
||||
@ -8,6 +8,35 @@ import ClassyPrelude
|
||||
import System.Clock
|
||||
import Data.Ratio ((%))
|
||||
|
||||
import Data.Fixed
|
||||
|
||||
import Control.Lens
|
||||
|
||||
|
||||
instance Real TimeSpec where
|
||||
toRational TimeSpec{..} = fromIntegral sec + fromIntegral nsec % 1e9
|
||||
|
||||
instance Fractional TimeSpec where
|
||||
a / b = fromRational $ toRational a / toRational b
|
||||
fromRational n = fromNanoSecs n'
|
||||
where MkFixed n' = fromRational n :: Nano
|
||||
|
||||
instance RealFrac TimeSpec where
|
||||
properFraction = over _2 fromRational . properFraction . toRational
|
||||
|
||||
round x = let (n,r) = properFraction x
|
||||
m = bool (n + 1) (n -1) $ r < fromRational 0
|
||||
s = signum (abs r - fromRational 0.5)
|
||||
in if | s == fromRational (-1) -> n
|
||||
| s == fromRational 0 -> bool m n $ even n
|
||||
| s == fromRational 1 -> m
|
||||
| otherwise -> error "round @TimeSpec: Bad value"
|
||||
|
||||
ceiling x = bool n (n + 1) $ r > 0
|
||||
where (n,r) = properFraction x
|
||||
|
||||
floor x = bool n (n - 1) $ r > 0
|
||||
where (n,r) = properFraction x
|
||||
|
||||
instance NFData TimeSpec
|
||||
instance Hashable TimeSpec
|
||||
|
||||
@ -4,35 +4,66 @@ module UnliftIO.Async.Utils
|
||||
, allocateAsyncMasked, allocateLinkedAsyncMasked
|
||||
) where
|
||||
|
||||
import ClassyPrelude hiding (cancel, async, link)
|
||||
import ClassyPrelude hiding (cancel, async, link, finally, mask)
|
||||
import Control.Lens
|
||||
import Control.Lens.Extras (is)
|
||||
|
||||
import qualified UnliftIO.Async as UnliftIO
|
||||
import qualified Control.Concurrent.Async as A
|
||||
|
||||
import Control.Monad.Trans.Resource
|
||||
import qualified Control.Monad.Trans.Resource.Internal as ResourceT.Internal
|
||||
import Data.Acquire
|
||||
|
||||
import Control.Monad.Catch
|
||||
|
||||
|
||||
withReference :: forall m a. (MonadUnliftIO m, MonadResource m) => ((IO (), IO ()) -> m a) -> m a
|
||||
withReference act = do
|
||||
releaseAct <- newEmptyTMVarIO
|
||||
|
||||
let doAlloc = do
|
||||
iSt <- liftResourceT getInternalState
|
||||
liftIO $ mask $ \_ -> do
|
||||
ResourceT.Internal.stateAlloc iSt
|
||||
atomically $ putTMVar releaseAct ()
|
||||
return iSt
|
||||
doRelease iSt eCase = liftIO . whenM (atomically $ is _Just <$> tryTakeTMVar releaseAct) $ do
|
||||
flip ResourceT.Internal.stateCleanup iSt $ case eCase of
|
||||
ExitCaseSuccess _ -> ReleaseNormal
|
||||
ExitCaseException _ -> ReleaseException
|
||||
ExitCaseAbort -> ReleaseEarly
|
||||
|
||||
withRunInIO $ \run ->
|
||||
fmap fst . generalBracket (run doAlloc) doRelease $ \iSt -> do
|
||||
res <- run $ act
|
||||
( atomically $ takeTMVar releaseAct
|
||||
, ResourceT.Internal.stateCleanup ReleaseNormal iSt
|
||||
)
|
||||
atomically $ guard =<< isEmptyTMVar releaseAct
|
||||
return res
|
||||
|
||||
|
||||
allocateAsync :: forall m a.
|
||||
( MonadUnliftIO m, MonadResource m )
|
||||
=> m a -> m (Async a)
|
||||
allocateAsync act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel . A.async $ run act
|
||||
allocateAsync act = withReference $ \(signalReady, releaseRef) -> withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel . A.async . flip finally releaseRef $ signalReady >> run act
|
||||
|
||||
allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => m a -> m (Async a)
|
||||
allocateLinkedAsync = uncurry (<$) . (id &&& UnliftIO.link) <=< allocateAsync
|
||||
|
||||
|
||||
allocateAsyncWithUnmask :: forall m a.
|
||||
( MonadUnliftIO m, MonadResource m )
|
||||
( MonadUnliftIO m, MonadResource m)
|
||||
=> ((forall b. m b -> m b) -> m a) -> m (Async a)
|
||||
allocateAsyncWithUnmask act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel $ A.asyncWithUnmask $ \unmask -> run $ act (liftIO . unmask . run)
|
||||
allocateAsyncWithUnmask act = withReference $ \(signalReady, releaseRef) -> withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel $ A.asyncWithUnmask $ \unmask -> flip finally releaseRef $ signalReady >> run (act $ liftIO . unmask . run)
|
||||
|
||||
allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. m b -> m b) -> m a) -> m (Async a)
|
||||
allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& UnliftIO.link) =<< allocateAsyncWithUnmask act
|
||||
|
||||
|
||||
allocateAsyncMasked :: forall m a.
|
||||
( MonadUnliftIO m, MonadResource m )
|
||||
( MonadUnliftIO m, MonadResource m)
|
||||
=> m a -> m (Async a)
|
||||
allocateAsyncMasked act = allocateAsyncWithUnmask (const act)
|
||||
|
||||
|
||||
18
src/Utils.hs
18
src/Utils.hs
@ -106,8 +106,10 @@ import Algebra.Lattice (top, bottom, (/\), (\/), BoundedJoinSemiLattice, Bounded
|
||||
|
||||
import Data.Constraint (Dict(..))
|
||||
|
||||
import Control.Monad.Random.Class (MonadRandom)
|
||||
import Control.Monad.Random.Class (MonadSplit(getSplit), MonadRandom, MonadInterleave(interleave), uniform)
|
||||
import Control.Monad.Random (RandomGen)
|
||||
import qualified System.Random.Shuffle as Rand (shuffleM)
|
||||
import qualified Control.Monad.Random.Lazy as LazyRand
|
||||
|
||||
import Data.Data (Data)
|
||||
import qualified Data.Text.Lazy.Builder as Builder
|
||||
@ -739,6 +741,9 @@ throwExceptT :: ( Exception e, MonadThrow m )
|
||||
=> ExceptT e m a -> m a
|
||||
throwExceptT = exceptT throwM return
|
||||
|
||||
generalFinally :: MonadMask m => m a -> (ExitCase a -> m b) -> m a
|
||||
generalFinally action finalizer = view _1 <$> generalBracket (return ()) (const finalizer) (const action)
|
||||
|
||||
------------
|
||||
-- Monads --
|
||||
------------
|
||||
@ -1192,6 +1197,10 @@ unstableSortOn = unstableSortBy . comparing
|
||||
unstableSort :: (MonadRandom m, Ord a) => [a] -> m [a]
|
||||
unstableSort = unstableSortBy compare
|
||||
|
||||
uniforms :: (RandomGen g, MonadSplit g m, Foldable t) => t a -> m [a]
|
||||
uniforms xs = LazyRand.evalRand go <$> getSplit
|
||||
where go = (:) <$> interleave (uniform xs) <*> go
|
||||
|
||||
----------
|
||||
-- Lens --
|
||||
----------
|
||||
@ -1273,3 +1282,10 @@ infixr 4 <//>
|
||||
|
||||
(<//>) :: FilePath -> FilePath -> FilePath
|
||||
dir <//> file = dir </> dropDrive file
|
||||
|
||||
|
||||
----------------
|
||||
-- TH Dungeon --
|
||||
----------------
|
||||
|
||||
makePrisms ''ExitCase
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
{-# OPTIONS_GHC -fno-warn-deprecations #-}
|
||||
{-# OPTIONS_GHC -fno-warn-deprecations #-} -- `WidgetT`, `HandlerT`
|
||||
|
||||
module Utils.Form where
|
||||
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
module Utils.Sql
|
||||
( setSerializable, setSerializable'
|
||||
( setSerializable, setSerializableBatch, setSerializable'
|
||||
, catchSql, handleSql
|
||||
, isUniqueConstraintViolation
|
||||
, catchIfSql, handleIfSql
|
||||
@ -30,6 +30,9 @@ import Text.Shakespeare.Text (st)
|
||||
|
||||
setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a
|
||||
setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
|
||||
|
||||
setSerializableBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a
|
||||
setSerializableBatch = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6
|
||||
|
||||
setSerializable' :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => RetryPolicyM (SqlPersistT m) -> SqlPersistT m a -> ReaderT SqlBackend m a
|
||||
setSerializable' policy act = do
|
||||
|
||||
Loading…
Reference in New Issue
Block a user