72 lines
2.5 KiB
Haskell
72 lines
2.5 KiB
Haskell
-- SPDX-FileCopyrightText: 2022 Gregor Kleen <gregor.kleen@ifi.lmu.de>
|
|
--
|
|
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
module Utils.Postgresql
|
|
( PostgresqlChannel(..)
|
|
, PostgresqlChannelManager(..)
|
|
, managePostgresqlChannel
|
|
, PostgresConf
|
|
) where
|
|
|
|
import Import.NoFoundation hiding (bracket)
|
|
|
|
import qualified Database.PostgreSQL.Simple as PG
|
|
import qualified Database.PostgreSQL.Simple.Types as PG
|
|
import qualified Database.PostgreSQL.Simple.Notification as PG
|
|
|
|
import Database.Persist.Postgresql (PostgresConf, pgConnStr)
|
|
|
|
import UnliftIO.Exception (bracket)
|
|
|
|
|
|
data PostgresqlChannel
|
|
= ChannelJobOffload
|
|
| ChannelMemcachedLocalInvalidation
|
|
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic)
|
|
deriving anyclass (Universe, Finite)
|
|
|
|
nullaryPathPiece ''PostgresqlChannel $ camelToPathPiece' 1
|
|
|
|
|
|
data PostgresqlChannelManager m a = PostgresqlChannelManager
|
|
{ pgcTerminate :: m a -- ^ Expected to block; used within `race`
|
|
, pgcOnInput :: Maybe (ByteString -> m ())
|
|
, pgcGenOutput :: m ByteString -- ^ Expected to block; used within `race`
|
|
}
|
|
|
|
managePostgresqlChannel :: forall m a.
|
|
( MonadUnliftIO m
|
|
, MonadLogger m
|
|
)
|
|
=> PostgresConf
|
|
-> PostgresqlChannel
|
|
-> PostgresqlChannelManager m a
|
|
-> m a
|
|
managePostgresqlChannel dbConf (toPathPiece -> chan) PostgresqlChannelManager{..} = bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do
|
|
myPid <- liftIO $ PG.getBackendPID pgConn
|
|
when (is _Just pgcOnInput) $
|
|
void . liftIO . PG.execute pgConn "LISTEN ?" . PG.Only $ PG.Identifier chan
|
|
|
|
let
|
|
getInput = do
|
|
n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn
|
|
if | notificationPid == myPid || notificationChannel /= encodeUtf8 chan -> getInput
|
|
| otherwise -> return n
|
|
|
|
foreverBreak $ \terminate -> do
|
|
io <- lift . (pgcTerminate `race`) $ if
|
|
| is _Just pgcOnInput -> getInput `race` pgcGenOutput
|
|
| otherwise -> Right <$> pgcGenOutput
|
|
|
|
case io of
|
|
Right (Left notif@PG.Notification{..}) -> do
|
|
$logDebugS "PGChannel" $ "Got input: " <> tshow notif
|
|
lift $ maybe (return ()) ($ notificationData) pgcOnInput
|
|
Right (Right o) -> do
|
|
void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier chan, o)
|
|
$logDebugS "PGChannel" $ "Sent output: " <> tshow o
|
|
Left t -> do
|
|
$logDebugS "PGChannel" "Terminating..."
|
|
terminate t
|