This repository has been archived on 2024-10-24. You can view files and clone it, but cannot push or open issues or pull requests.
fradrive-old/src/Jobs/Queue.hs
2020-07-31 18:00:30 +02:00

145 lines
5.5 KiB
Haskell

module Jobs.Queue
( writeJobCtl, writeJobCtlBlock
, writeJobCtl', writeJobCtlBlock'
, queueJob, queueJob'
, JobDB
, runDBJobs, queueDBJob, sinkDBJobs
, runDBJobs'
, queueDBJobCron
, module Jobs.Types
) where
import Import hiding ((<>))
import Jobs.Types
import Control.Monad.Writer.Class (MonadWriter(..))
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.HashMap.Strict as HashMap
import Control.Monad.Random (evalRand, mkStdGen, uniform)
import qualified Data.Conduit.List as C
import Data.Semigroup ((<>))
import UnliftIO.Concurrent (myThreadId)
import Control.Monad.Trans.Resource (register)
data JobQueueException = JobQueuePoolEmpty
| JobQueueWorkerNotFound
deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic)
instance Exception JobQueueException
writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m ()
-- | Pass an instruction to the given `Job`-Worker
writeJobCtl' target cmd = do
JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar
if
| null jobWorkers
-> throwM JobQueuePoolEmpty
| [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers
-> atomically . modifyTVar' chan $ jqInsert cmd
| otherwise
-> throwM JobQueueWorkerNotFound
writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers
--
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
writeJobCtl cmd = do
names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar
tid <- myThreadId
let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names
writeJobCtl' target cmd
writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon
writeJobCtlBlock' writeCtl cmd = do
getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar
resVar <- atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
writeCtl cmd
let
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl
queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX QueuedJobId
queueJobUnsafe queuedJobWriteLastExec job = do
$logInfoS "queueJob" $ tshow job
queuedJobCreationTime <- liftIO getCurrentTime
queuedJobCreationInstance <- getsYesod appInstanceID
insert QueuedJob
{ queuedJobContent = toJSON job
, queuedJobLockInstance = Nothing
, queuedJobLockTime = Nothing
, ..
}
-- We should not immediately notify a worker; instead wait for the transaction to finish first
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
-- return jId
queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
-- ^ Queue a job for later execution
--
-- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`)
queueJob = liftHandler . runDB . setSerializable . queueJobUnsafe False
queueJob' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
queueJob' job = do
app <- getYesod
queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform
-- | Slightly modified Version of `DB` for `runDBJobs`
type JobDB = YesodJobDB UniWorX
queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX ()
-- | Queue a job as part of a database transaction and execute it after the transaction succeeds
queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . Set.singleton
queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . Set.singleton
sinkDBJobs :: ConduitT Job Void (YesodJobDB UniWorX) ()
-- | Queue many jobs as part of a database transaction and execute them after the transaction passes
sinkDBJobs = C.mapM_ queueDBJob
runDBJobs :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -> m a
-- | Replacement for/Wrapper around `runDB` when jobs need to be queued as part of a database transaction
--
-- Jobs get immediately executed if the transaction succeeds
runDBJobs act = do
(ret, jIds) <- liftHandler . runDB $ mapReaderT runWriterT act
app <- getYesod
forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform
return ret
runDBJobs' :: YesodJobDB UniWorX a -> DB a
runDBJobs' act = do
(ret, jIds) <- mapReaderT runWriterT act
void . liftHandler $ do
UnliftIO{..} <- askUnliftIO
register . unliftIO . runDB $
forM_ jIds $ \jId ->
whenM (existsKey jId) $
runReaderT (writeJobCtl $ JobCtlPerform jId) =<< getYesod
return ret