module Jobs.Queue ( writeJobCtl, writeJobCtlBlock , writeJobCtl', writeJobCtlBlock' , queueJob, queueJob' , YesodJobDB, JobDB , runDBJobs, queueDBJob, sinkDBJobs , runDBJobs' , queueDBJobCron , module Jobs.Types ) where import Import hiding ((<>)) import Utils.Sql import Jobs.Types import Control.Monad.Trans.Writer (WriterT, runWriterT) import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Reader (ReaderT, mapReaderT) import qualified Data.Map.Strict as Map import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.HashMap.Strict as HashMap import Control.Monad.Random (evalRand, mkStdGen, uniform) import qualified Data.Conduit.List as C import Data.Semigroup ((<>)) import UnliftIO.Concurrent (myThreadId) import Control.Monad.Trans.Resource (register) data JobQueueException = JobQueuePoolEmpty | JobQueueWorkerNotFound deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic) instance Exception JobQueueException writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m () -- | Pass an instruction to the given `Job`-Worker writeJobCtl' target cmd = do JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar if | null jobWorkers -> throwM JobQueuePoolEmpty | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers -> atomically . modifyTVar' chan $ jqInsert cmd | otherwise -> throwM JobQueueWorkerNotFound writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () -- | Pass an instruction to the `Job`-Workers -- -- Instructions are assigned deterministically and pseudo-randomly to one specific worker. -- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others writeJobCtl cmd = do names <- fmap jobWorkerNames $ asks appJobState >>= atomically . readTMVar tid <- myThreadId let target = evalRand ?? mkStdGen (hash tid `hashWithSalt` cmd) $ uniform names writeJobCtl' target cmd writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m () -- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon writeJobCtlBlock' writeCtl cmd = do getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar resVar <- atomically $ do var <- newEmptyTMVar modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) return var writeCtl cmd let removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar maybe (return ()) throwM mExc writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () -- | Pass an instruction to the `Job`-Workers and block until it was acted upon writeJobCtlBlock = writeJobCtlBlock' writeJobCtl queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX QueuedJobId queueJobUnsafe queuedJobWriteLastExec job = do $logInfoS "queueJob" $ tshow job queuedJobCreationTime <- liftIO getCurrentTime queuedJobCreationInstance <- getsYesod appInstanceID insert QueuedJob { queuedJobContent = toJSON job , queuedJobLockInstance = Nothing , queuedJobLockTime = Nothing , .. } -- We should not immediately notify a worker; instead wait for the transaction to finish first -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) -- return jId queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m QueuedJobId -- ^ Queue a job for later execution -- -- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`) queueJob = liftHandler . runDB . setSerializable . queueJobUnsafe False queueJob' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m () -- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap queueJob' job = do app <- getYesod queueJob job >>= flip runReaderT app . writeJobCtl . JobCtlPerform -- | Slightly modified Version of `YesodDB` for `runDBJobs` type YesodJobDB site = ReaderT (YesodPersistBackend site) (WriterT (Set QueuedJobId) (HandlerFor site)) -- | Slightly modified Version of `DB` for `runDBJobs` type JobDB = YesodJobDB UniWorX queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX () -- | Queue a job as part of a database transaction and execute it after the transaction succeeds queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . Set.singleton queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . Set.singleton sinkDBJobs :: ConduitT Job Void (YesodJobDB UniWorX) () -- | Queue many jobs as part of a database transaction and execute them after the transaction passes sinkDBJobs = C.mapM_ queueDBJob runDBJobs :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -> m a -- | Replacement for/Wrapper around `runDB` when jobs need to be queued as part of a database transaction -- -- Jobs get immediately executed if the transaction succeeds runDBJobs act = do (ret, jIds) <- liftHandler . runDB $ mapReaderT runWriterT act app <- getYesod forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform return ret runDBJobs' :: YesodJobDB UniWorX a -> DB a runDBJobs' act = do (ret, jIds) <- mapReaderT runWriterT act void . liftHandler $ do UnliftIO{..} <- askUnliftIO register . unliftIO . runDB $ forM_ jIds $ \jId -> whenM (existsKey jId) $ runReaderT (writeJobCtl $ JobCtlPerform jId) =<< getYesod return ret