module Jobs.Queue ( writeJobCtl, writeJobCtlBlock , queueJob, queueJob' , YesodJobDB , runDBJobs, queueDBJob , module Jobs.Types ) where import Import import Utils.Sql import Jobs.Types import Control.Monad.Trans.Writer (WriterT, runWriterT) import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Reader (ReaderT, mapReaderT) import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.HashMap.Strict as HashMap import Control.Monad.Random (evalRand, mkStdGen, uniform) data JobQueueException = JobQueuePoolEmpty deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic) instance Exception JobQueueException writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () writeJobCtl cmd = do tid <- liftIO myThreadId wMap <- getsYesod appJobCtl >>= liftIO . readTVarIO if | null wMap -> throwM JobQueuePoolEmpty | otherwise -> do let chan = flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) $ uniform wMap liftIO . atomically $ writeTMChan chan cmd writeJobCtlBlock :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> ReaderT JobContext m () writeJobCtlBlock cmd = do getResVar <- asks jobConfirm resVar <- liftIO . atomically $ do var <- newEmptyTMVar modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) return var lift $ writeJobCtl cmd let removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd mExc <- liftIO . atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar maybe (return ()) throwM mExc queueJobUnsafe :: Job -> YesodDB UniWorX QueuedJobId queueJobUnsafe job = do now <- liftIO getCurrentTime self <- getsYesod appInstanceID insert QueuedJob { queuedJobContent = toJSON job , queuedJobCreationInstance = self , queuedJobCreationTime = now , queuedJobLockInstance = Nothing , queuedJobLockTime = Nothing } -- We should not immediately notify a worker; instead wait for the transaction to finish first -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) -- return jId queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId queueJob = liftHandlerT . runDB . setSerializable . queueJobUnsafe queueJob' :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m () -- ^ `queueJob` followed by `JobCtlPerform` queueJob' job = queueJob job >>= writeJobCtl . JobCtlPerform type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerT site IO)) queueDBJob :: Job -> ReaderT (YesodPersistBackend UniWorX) (WriterT (Set QueuedJobId) (HandlerT UniWorX IO)) () queueDBJob job = mapReaderT lift (queueJobUnsafe job) >>= tell . Set.singleton runDBJobs :: (MonadHandler m, HandlerSite m ~ UniWorX) => ReaderT (YesodPersistBackend UniWorX) (WriterT (Set QueuedJobId) (HandlerT UniWorX IO)) a -> m a runDBJobs act = do (ret, jIds) <- liftHandlerT . runDB $ mapReaderT runWriterT act forM_ jIds $ writeJobCtl . JobCtlPerform return ret