module Jobs.Offload ( mkJobOffloadHandler ) where import Import hiding (bracket, js) import Jobs.Types import Jobs.Queue import qualified Database.PostgreSQL.Simple as PG import qualified Database.PostgreSQL.Simple.Types as PG import qualified Database.PostgreSQL.Simple.Notification as PG import Database.Persist.Postgresql (PostgresConf, pgConnStr) import Data.Text.Encoding (decodeUtf8') import UnliftIO.Exception (bracket) jobOffloadChannel :: Text jobOffloadChannel = "job-offload" mkJobOffloadHandler :: forall m. ( MonadResource m , MonadUnliftIO m , MonadThrow m, MonadReader UniWorX m , MonadLogger m ) => PostgresConf -> JobMode -> Maybe (m JobOffloadHandler) mkJobOffloadHandler dbConf jMode | not shouldListen = Nothing | otherwise = Just $ do jobOffloadOutgoing <- newTVarIO mempty jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do myPid <- liftIO $ PG.getBackendPID pgConn when shouldListen $ void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel) foreverBreak $ \(($ ()) -> terminate) -> do UniWorX{appJobState} <- ask shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown when shouldTerminate terminate let getInput = do n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn if | notificationPid == myPid || notificationChannel /= encodeUtf8 jobOffloadChannel -> getInput | otherwise -> return n getOutput = atomically $ do jQueue <- readTVar jobOffloadOutgoing case jQueue of j :< js -> j <$ writeTVar jobOffloadOutgoing js _other -> mzero io <- lift $ if | shouldListen -> getInput `race` getOutput | otherwise -> Right <$> getOutput case io of Left PG.Notification{..} | Just jId <- fromPathPiece =<< either (const Nothing) Just (decodeUtf8' notificationData) -> writeJobCtl $ JobCtlPerform jId | otherwise -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow notificationData Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId) return JobOffloadHandler{..} where shouldListen = has (_jobsAcceptOffload . only True) jMode