module Jobs.Queue ( writeJobCtl, writeJobCtlBlock , writeJobCtl', writeJobCtlBlock' , queueJob, queueJob' , JobDB , runDBJobs, queueDBJob, sinkDBJobs , runDBJobs' , queueDBJobCron , module Jobs.Types ) where import Import hiding ((<>)) import Jobs.Types import qualified Data.Map.Strict as Map import qualified Data.Set as Set import qualified Data.List.NonEmpty as NonEmpty import qualified Data.HashMap.Strict as HashMap import Control.Monad.Random (evalRand, mkStdGen, uniform) import qualified Data.Conduit.List as C import Data.Semigroup ((<>)) import UnliftIO.Concurrent (myThreadId) import Control.Monad.Trans.Resource (register) import System.Clock (getTime, Clock(Monotonic)) data JobQueueException = JobQueuePoolEmpty | JobQueueWorkerNotFound deriving (Eq, Ord, Enum, Bounded, Show, Read, Generic) instance Exception JobQueueException writeJobCtl' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobWorkerId -> JobCtl -> m () -- | Pass an instruction to the given `Job`-Worker writeJobCtl' target cmd = do JobState{jobWorkers, jobWorkerName} <- asks appJobState >>= atomically . readTMVar if | null jobWorkers -> throwM JobQueuePoolEmpty | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers -> atomically $ readTVar chan >>= jqInsert cmd >>= (writeTVar chan $!) | otherwise -> throwM JobQueueWorkerNotFound writeJobCtl :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () -- | Pass an instruction to the `Job`-Workers -- -- Instructions are assigned deterministically and pseudo-randomly to one specific worker. -- While this means that they might be executed later than desireable, rouge threads that queue the same instruction many times do not deny service to others writeJobCtl cmd = do jSt <- asks appJobState names <- atomically $ jobWorkerNames <$> readTMVar jSt when (null names) $ throwM JobQueuePoolEmpty tid <- myThreadId cTime <- liftIO $ getTime Monotonic let epoch :: Int64 epoch = round cTime `div` 3600 target = evalRand ?? mkStdGen (hash epoch `hashWithSalt` tid `hashWithSalt` cmd) $ uniform names writeJobCtl' target cmd writeJobCtlBlock' :: (MonadMask m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m () -- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon writeJobCtlBlock' writeCtl cmd = do getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar let getResVar' = atomically $ do var <- newEmptyTMVar modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) return var removeResVar resVar = modifyTVar' getResVar $ HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd bracket getResVar' (atomically . removeResVar) $ \resVar -> do writeCtl cmd mExc <- atomically $ takeTMVar resVar <* removeResVar resVar maybe (return ()) throwM mExc writeJobCtlBlock :: (MonadMask m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () -- | Pass an instruction to the `Job`-Workers and block until it was acted upon writeJobCtlBlock = writeJobCtlBlock' writeJobCtl queueJobUnsafe :: Bool -> Job -> YesodDB UniWorX (Maybe QueuedJobId) queueJobUnsafe queuedJobWriteLastExec job = do $logDebugS "queueJob" $ tshow job doQueue <- fmap not . and2M (return $ jobNoQueueSame job) $ exists [ QueuedJobContent ==. toJSON job ] if | doQueue -> Just <$> do queuedJobCreationTime <- liftIO getCurrentTime queuedJobCreationInstance <- getsYesod appInstanceID insert QueuedJob { queuedJobContent = toJSON job , queuedJobLockInstance = Nothing , queuedJobLockTime = Nothing , .. } -- We should not immediately notify a worker; instead wait for the transaction to finish first -- writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) -- return jId | otherwise -> return Nothing queueJob :: (MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m (Maybe QueuedJobId) -- ^ Queue a job for later execution -- -- Makes no guarantees as to when it will be executed (`queueJob'`) and does not interact with any running database transactions (`runDBJobs`) queueJob = liftHandler . runDB . setSerializable . queueJobUnsafe False queueJob' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => Job -> m () -- ^ `queueJob` followed by `writeJobCtl` `JobCtlPerform` to ensure, that it is executed asap queueJob' job = do app <- getYesod queueJob job >>= maybe (return ()) (flip runReaderT app . writeJobCtl . JobCtlPerform) -- | Slightly modified Version of `DB` for `runDBJobs` type JobDB = YesodJobDB UniWorX queueDBJob, queueDBJobCron :: Job -> YesodJobDB UniWorX () -- | Queue a job as part of a database transaction and execute it after the transaction succeeds queueDBJob job = mapReaderT lift (queueJobUnsafe False job) >>= tell . maybe Set.empty Set.singleton queueDBJobCron job = mapReaderT lift (queueJobUnsafe True job) >>= tell . maybe Set.empty Set.singleton sinkDBJobs :: ConduitT Job Void (YesodJobDB UniWorX) () -- | Queue many jobs as part of a database transaction and execute them after the transaction passes sinkDBJobs = C.mapM_ queueDBJob runDBJobs :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => YesodJobDB UniWorX a -> m a -- | Replacement for/Wrapper around `runDB` when jobs need to be queued as part of a database transaction -- -- Jobs get immediately executed if the transaction succeeds runDBJobs act = do (ret, jIds) <- liftHandler . runDB $ mapReaderT runWriterT act app <- getYesod forM_ jIds $ flip runReaderT app . writeJobCtl . JobCtlPerform return ret runDBJobs' :: YesodJobDB UniWorX a -> DB a runDBJobs' act = do (ret, jIds) <- mapReaderT runWriterT act void . liftHandler $ do UnliftIO{..} <- askUnliftIO register . unliftIO . runDB $ forM_ jIds $ \jId -> whenM (existsKey jId) $ runReaderT (writeJobCtl $ JobCtlPerform jId) =<< getYesod return ret