78 lines
2.8 KiB
Haskell
78 lines
2.8 KiB
Haskell
module Jobs.Queue
|
|
( writeJobCtl, writeJobCtlBlock
|
|
, queueJob, queueJob'
|
|
, YesodJobDB
|
|
, runDBJobs, queueDBJob
|
|
) where
|
|
|
|
import Import
|
|
|
|
import Utils.Sql
|
|
import Jobs.Types
|
|
|
|
import Control.Monad.Trans.Writer (WriterT, runWriterT)
|
|
import Control.Monad.Writer.Class (MonadWriter(..))
|
|
import Control.Monad.Trans.Reader (ReaderT, mapReaderT)
|
|
|
|
import qualified Data.Set as Set
|
|
import qualified Data.List.NonEmpty as NonEmpty
|
|
import qualified Data.HashMap.Strict as HashMap
|
|
|
|
import Control.Monad.Random (MonadRandom(..), evalRand, mkStdGen, uniform)
|
|
|
|
|
|
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
|
|
writeJobCtl cmd = do
|
|
tid <- liftIO myThreadId
|
|
chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl
|
|
liftIO . atomically $ writeTMChan chan cmd
|
|
|
|
writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m ()
|
|
writeJobCtlBlock cmd = do
|
|
getResVar <- asks jobConfirm
|
|
resVar <- liftIO . atomically $ do
|
|
var <- newEmptyTMVar
|
|
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
|
|
return var
|
|
lift $ writeJobCtl cmd
|
|
let
|
|
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
|
|
mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
|
|
maybe (return ()) throwM mExc
|
|
|
|
queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId
|
|
queueJobUnsafe job = do
|
|
now <- liftIO getCurrentTime
|
|
self <- getsYesod appInstanceID
|
|
insert QueuedJob
|
|
{ queuedJobContent = toJSON job
|
|
, queuedJobCreationInstance = self
|
|
, queuedJobCreationTime = now
|
|
, queuedJobLockInstance = Nothing
|
|
, queuedJobLockTime = Nothing
|
|
}
|
|
-- We should not immediately notify a worker; instead wait for the transaction to finish first
|
|
-- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
|
|
-- return jId
|
|
|
|
queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId
|
|
queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe
|
|
|
|
queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m ()
|
|
-- ^ `queueJob` followed by `JobCtlPerform`
|
|
queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform
|
|
|
|
type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO))
|
|
|
|
queueDBJob :: Job -> ReaderT (YesodPersistBackend UniWorX) (WriterT (Set QueuedJobId) (HandlerT UniWorX IO)) ()
|
|
queueDBJob job = mapReaderT lift (queueJobUnsafe job) >>= tell . Set.singleton
|
|
|
|
runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => ReaderT (YesodPersistBackend UniWorX) (WriterT (Set QueuedJobId) (HandlerT UniWorX IO)) a -> m a
|
|
runDBJobs act = do
|
|
(ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act
|
|
forM_ jIds $ writeJobCtl . JobCtlPerform
|
|
return ret
|
|
|
|
|
|
|