107 lines
4.1 KiB
Haskell
107 lines
4.1 KiB
Haskell
module Jobs.Queue
|
|
( writeJobCtl, writeJobCtlBlock
|
|
, queueJob, queueJob'
|
|
, YesodJobDB
|
|
, runDBJobs, queueDBJob, sinkDBJobs
|
|
, module Jobs.Types
|
|
) where
|
|
|
|
import Import hiding ((<>))
|
|
|
|
import Utils.Sql
|
|
import Jobs.Types
|
|
|
|
import Control.Monad.Trans.Writer (WriterT, runWriterT)
|
|
import Control.Monad.Writer.Class (MonadWriter(..))
|
|
import Control.Monad.Trans.Reader (ReaderT, mapReaderT)
|
|
|
|
import qualified Data.Set as Set
|
|
import qualified Data.List.NonEmpty as NonEmpty
|
|
import qualified Data.HashMap.Strict as HashMap
|
|
|
|
import Control.Monad.Random (evalRand, mkStdGen, uniform)
|
|
|
|
import qualified Data.Conduit.List as C
|
|
|
|
import Data.Semigroup ((<>))
|
|
|
|
|
|
data JobQueueException = JobQueuePoolEmpty
|
|
deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic)
|
|
|
|
instance Exception JobQueueException
|
|
|
|
|
|
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
|
|
-- | Pass an instruction to the `Job`-Workers
|
|
--
|
|
-- Instructions are assigned deterministically and pseudo-randomly to one specific worker.
|
|
-- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others
|
|
writeJobCtl cmd = do
|
|
tid <- liftIO myThreadId
|
|
wMap <- getsYesod appJobCtl >>= liftIO . readTVarIO
|
|
if
|
|
| null wMap -> throwM JobQueuePoolEmpty
|
|
| otherwise -> do
|
|
let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap
|
|
liftIO . atomically $ writeTMChan chan cmd
|
|
|
|
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
|
|
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
|
|
writeJobCtlBlock cmd = do
|
|
getResVar <- asks jobConfirm
|
|
resVar <- liftIO . atomically $ do
|
|
var <- newEmptyTMVar
|
|
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
|
|
return var
|
|
lift $ writeJobCtl cmd
|
|
let
|
|
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
|
|
mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
|
|
maybe (return ()) throwM mExc
|
|
|
|
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
|
|
queueJobUnsafe job = do
|
|
now <- liftIO getCurrentTime
|
|
self <- getsYesod appInstanceID
|
|
insert QueuedJob
|
|
{ queuedJobContent = toJSON job
|
|
, queuedJobCreationInstance = self
|
|
, queuedJobCreationTime = now
|
|
, queuedJobLockInstance = Nothing
|
|
, queuedJobLockTime = Nothing
|
|
}
|
|
-- We should not immediately notify a worker; instead wait for the transaction to finish first
|
|
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
|
|
-- return jId
|
|
|
|
queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
|
|
-- ^ Queue a job for later execution
|
|
--
|
|
-- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`)
|
|
queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
|
|
|
|
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
|
|
-- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap
|
|
queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
|
|
|
|
-- | Slightly modified Version of `YesodDB` for `runDBJobs`
|
|
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
|
|
|
|
queueDBJob :: Job -> YesodJobDB UniWorX ()
|
|
-- | Queue a job as part of a database transaction and execute it after the transaction succeeds
|
|
queueDBJob job = mapReaderT lift (queueJobUnsafe job) >>= tell . Set.singleton
|
|
|
|
sinkDBJobs :: Sink Job (YesodJobDB UniWorX) ()
|
|
-- | Queue many jobs as part of a database transaction and execute them after the transaction passes
|
|
sinkDBJobs = C.mapM_ queueDBJob
|
|
|
|
runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -> m a
|
|
-- | Replacement for/Wrapper around `runDB` when jobs need to be queued as part of a database transaction
|
|
--
|
|
-- Jobs get immediately executed if the transaction succeeds
|
|
runDBJobs act = do
|
|
(ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act
|
|
forM_ jIds $ writeJobCtl . JobCtlPerform
|
|
return ret
|