fix(jobs): implement job priorities

This commit is contained in:
Gregor Kleen 2019-09-05 16:14:11 +02:00
parent 7f2dd7808e
commit e29f042229
14 changed files with 81 additions and 21 deletions

View File

@ -135,6 +135,7 @@ dependencies:
- cassava-conduit
- constraints
- memory
- pqueue
other-extensions:
- GeneralizedNewtypeDeriving

View File

@ -21,3 +21,6 @@ instance PersistEntity record => Binary (Key record) where
put = Binary.put . toPersistValue
putList = Binary.putList . map toPersistValue
get = either (fail . unpack) return . fromPersistValue =<< Binary.get
instance PersistEntity record => NFData (Key record) where
rnf = rnf . keyToValues

View File

@ -20,3 +20,4 @@ deriving instance Typeable PersistValue
instance Hashable PersistValue
instance Binary PersistValue
instance NFData PersistValue

View File

@ -145,7 +145,7 @@ manageJobPool foundation@UniWorX{..}
endo <- execWriterT . replicateM_ missing $ do
workerId <- newWorkerId
let logIdent = mkLogIdent workerId
(bChan, chan) <- atomically $ newBroadcastTChan >>= (\c -> (c, ) <$> dupTChan c)
chan <- liftIO $ newTVarIO mempty
let
streamChan = join . atomically $ do
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
@ -153,7 +153,10 @@ manageJobPool foundation@UniWorX{..}
| shouldTerminate ->
return $ return ()
| otherwise -> do
nextVal <- readTChan chan
queue <- readTVar chan
nextVal <- case jqDequeue queue of
Nothing -> retry
Just (j, q) -> j <$ writeTVar chan q
return $ yield nextVal >> streamChan
runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do
$logInfoS logIdent "Started"
@ -162,7 +165,7 @@ manageJobPool foundation@UniWorX{..}
worker <- allocateLinkedAsync runWorker
tell . Endo $ \cSt -> cSt
{ jobWorkers = Map.insert worker bChan $ jobWorkers cSt
{ jobWorkers = Map.insert worker chan $ jobWorkers cSt
, jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker
}
atomically . putTMVar appJobState $ endo `appEndo` oldState
@ -181,12 +184,14 @@ manageJobPool foundation@UniWorX{..}
void . lift . allocateLinkedAsync $
let go = do
next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do
nextVal <- MaybeT . lift . tryReadTChan $ jobWorkers oldState ! jobAsync
let chan = jobWorkers oldState ! jobAsync
(nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan
lift . lift $ writeTVar chan newQueue
jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState
receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers'
return (nextVal, receiver)
whenIsJust next $ \(nextVal, receiver) -> do
atomically $ writeTChan receiver nextVal
atomically . modifyTVar' receiver $ jqInsert nextVal
go
in go
@ -216,15 +221,15 @@ execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWo
execCrontab = do
mapRWST (liftHandlerT . runDB . setSerializable) $ do
let
mergeLastExec (Entity leId CronLastExec{..})
mergeLastExec (Entity _leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime)
| otherwise = lift $ delete leId
| otherwise = return ()
mergeQueued (Entity qjId QueuedJob{..})
mergeQueued (Entity _qjId QueuedJob{..})
| Just job <- Aeson.parseMaybe parseJSON queuedJobContent
= State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime)
| otherwise = lift $ delete qjId
| otherwise = return ()
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
@ -341,7 +346,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlNoOp = return ()
handleCmd JobCtlTest = return ()
handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform)
handleCmd (JobCtlQueue job) = lift $ queueJob' job
handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
@ -439,10 +444,10 @@ pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCronta
| abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2
-> return ()
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
, HashMap.member (JobCtlQueue job) crontab
-> return ()
| otherwise
, not $ HashMap.member (JobCtlQueue job) crontab
-> lift $ delete leId
| otherwise
-> return ()
determineCrontab' :: DB (Crontab JobCtl)
determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab

View File

@ -159,7 +159,7 @@ dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do
in fromInteger micro
$logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers'
responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName)
-> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp)
-> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlTest)
if
| Map.null workers -> return Nothing
| otherwise -> return . Just $ responders % fromIntegral (Map.size workers)

View File

@ -44,7 +44,7 @@ writeJobCtl' target cmd = do
| null jobWorkers
-> throwM JobQueuePoolEmpty
| [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
-> atomically $ writeTChan chan cmd
-> atomically . modifyTVar' chan $ jqInsert cmd
| otherwise
-> throwM JobQueueWorkerNotFound

View File

@ -1,3 +1,5 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Jobs.Types
( Job(..), Notification(..)
, JobCtl(..)
@ -6,6 +8,8 @@ module Jobs.Types
, jobWorkerNames
, JobWorkerId
, showWorkerId, newWorkerId
, JobQueue, jqInsert, jqDequeue
, JobPriority(..), prioritiseJob
) where
import Import.NoFoundation hiding (Unique)
@ -20,6 +24,9 @@ import Data.Unique
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.PQueue.Prio.Max (MaxPQueue)
import qualified Data.PQueue.Prio.Max as PQ
data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification }
| JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext }
@ -75,7 +82,9 @@ data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
deriving (Eq, Ord, Show, Read, Generic, Typeable)
instance Hashable Job
instance NFData Job
instance Hashable Notification
instance NFData Notification
deriveJSON defaultOptions
{ constructorTagModifier = camelToPathPiece' 1
@ -97,10 +106,11 @@ data JobCtl = JobCtlFlush
| JobCtlDetermineCrontab
| JobCtlQueue Job
| JobCtlGenerateHealthReport HealthCheck
| JobCtlNoOp
| JobCtlTest
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl
instance NFData JobCtl
newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique }
@ -118,8 +128,35 @@ data JobContext = JobContext
, jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException))))
}
data JobPriority = JobPrioBatch | JobPrioRealtime
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable)
instance Universe JobPriority
instance Finite JobPriority
instance NFData JobPriority
prioritiseJob :: JobCtl -> JobPriority
prioritiseJob JobCtlTest = JobPrioRealtime
prioritiseJob (JobCtlGenerateHealthReport _) = JobPrioRealtime
prioritiseJob JobCtlDetermineCrontab = JobPrioRealtime
prioritiseJob _ = JobPrioBatch
newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue JobPriority JobCtl }
deriving (Eq, Ord, Read, Show)
deriving newtype (Monoid, NFData)
makePrisms ''JobQueue
jqInsert :: JobCtl -> JobQueue -> JobQueue
jqInsert job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job) job
jqDequeue :: JobQueue -> Maybe (JobCtl, JobQueue)
jqDequeue = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxView . getJobQueue
data JobState = JobState
{ jobWorkers :: Map (Async ()) (TChan JobCtl)
{ jobWorkers :: Map (Async ()) (TVar JobQueue)
, jobWorkerName :: Async () -> JobWorkerId
, jobContext :: JobContext
, jobPoolManager :: Async ()

View File

@ -160,6 +160,7 @@ instance Default MailLanguages where
def = MailLanguages []
instance Hashable MailLanguages
instance NFData MailLanguages
data MailContext = MailContext
@ -172,10 +173,12 @@ deriveJSON defaultOptions
} ''MailContext
instance Hashable MailContext
instance NFData MailContext
instance Default MailContext where
def = MailContext { mcLanguages = def
, mcDateTimeFormat = def
}
def = MailContext
{ mcLanguages = def
, mcDateTimeFormat = def
}
makeLenses_ ''MailContext

View File

@ -20,6 +20,7 @@ data HealthCheck
instance Universe HealthCheck
instance Finite HealthCheck
instance Hashable HealthCheck
instance NFData HealthCheck
deriveJSON defaultOptions
{ constructorTagModifier = camelToPathPiece' 2

View File

@ -12,6 +12,7 @@ data SchoolFunction
instance Universe SchoolFunction
instance Finite SchoolFunction
instance Hashable SchoolFunction
instance NFData SchoolFunction
nullaryPathPiece ''SchoolFunction $ camelToPathPiece' 1
pathPieceJSON ''SchoolFunction

View File

@ -27,6 +27,7 @@ data AuthenticationMode = AuthLDAP
deriving (Eq, Ord, Read, Show, Generic)
instance Hashable AuthenticationMode
instance NFData AuthenticationMode
deriveJSON defaultOptions
{ constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel

View File

@ -21,9 +21,10 @@ deriving instance Ord Address
deriving instance Generic Address
instance Hashable Address
instance NFData Address
deriveToJSON defaultOptions
{ fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel
{ fieldLabelModifier = camelToPathPiece' 1
} ''Address
instance FromJSON Address where

View File

@ -155,8 +155,11 @@ data LogDestination = LogDestStderr | LogDestStdout | LogDestFile { logDestFile
deriving instance Generic LogLevel
instance Hashable LogLevel
instance NFData LogLevel
instance Hashable LogSettings
instance NFData LogSettings
instance Hashable LogDestination
instance NFData LogDestination
data UserDefaultConf = UserDefaultConf
{ userDefaultTheme :: Theme

View File

@ -43,3 +43,6 @@ instance Csv.ToField Markup where
instance Csv.FromField Markup where
parseField = fmap preEscapedText . Csv.parseField
instance NFData Markup where
rnf = rnf . Text.renderMarkup