-- SPDX-FileCopyrightText: 2022 Gregor Kleen -- -- SPDX-License-Identifier: AGPL-3.0-or-later module Jobs.Offload ( mkJobOffloadHandler ) where import Import hiding (js) import Jobs.Types import Jobs.Queue import Utils.Postgresql import Data.Text.Encoding (decodeUtf8') mkJobOffloadHandler :: forall m. ( MonadResource m , MonadUnliftIO m , MonadThrow m, MonadReader UniWorX m , MonadLogger m ) => PostgresConf -> JobMode -> Maybe (m JobOffloadHandler) mkJobOffloadHandler dbConf jMode | not shouldListen = Nothing | otherwise = Just $ do jobOffloadOutgoing <- newTVarIO mempty jobOffloadHandler <- allocateAsync $ managePostgresqlChannel dbConf ChannelJobOffload PostgresqlChannelManager { pgcTerminate = do UniWorX{appJobState} <- ask atomically $ do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown guardOn shouldTerminate () , pgcOnInput = Just $ \inpBS -> case fromPathPiece =<< either (const Nothing) Just (decodeUtf8' inpBS) of Nothing -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow inpBS Just jId -> writeJobCtl $ JobCtlPerform jId , pgcGenOutput = atomically $ do jQueue <- readTVar jobOffloadOutgoing j <- case jQueue of j :< js -> j <$ writeTVar jobOffloadOutgoing js _other -> mzero return . encodeUtf8 $ toPathPiece j } return JobOffloadHandler{..} where shouldListen = has (_jobsAcceptOffload . only True) jMode