152 lines
6.1 KiB
Haskell
152 lines
6.1 KiB
Haskell
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
|
|
|
module Ldap.Client.Pool
|
|
( LdapPool
|
|
, LdapExecutor, Ldap, LdapError
|
|
, LdapPoolError(..)
|
|
, withLdap, withLdapFailover, withLdapFailoverReTest
|
|
, createLdapPool
|
|
) where
|
|
|
|
import ClassyPrelude hiding (Handler, catches, try)
|
|
|
|
import Utils.Failover
|
|
|
|
import Control.Lens
|
|
|
|
import Ldap.Client (Ldap, LdapError)
|
|
import qualified Ldap.Client as Ldap
|
|
|
|
import UnliftIO.Pool
|
|
import UnliftIO.Async.Utils
|
|
|
|
import Control.Monad.Logger
|
|
import Data.Time.Clock (NominalDiffTime)
|
|
|
|
import Data.Dynamic
|
|
|
|
import Control.Monad.Trans.Resource (MonadResource)
|
|
import Control.Monad.Catch
|
|
|
|
import Control.Monad.Trans.Except (throwE)
|
|
import Data.Fixed (Nano)
|
|
|
|
import Network.Connection
|
|
|
|
|
|
type LdapPool = Pool LdapExecutor
|
|
data LdapExecutor = LdapExecutor
|
|
{ ldapExec :: forall a m. (Typeable a, MonadUnliftIO m) => (Ldap -> m a) -> m (Either LdapPoolError a)
|
|
, ldapDestroy :: TMVar ()
|
|
, ldapAsync :: Async ()
|
|
}
|
|
|
|
data LdapPoolError = LdapPoolTimeout
|
|
| LdapError LdapError
|
|
| LdapLineTooLong | LdapHostNotResolved String | LdapHostCannotConnect String [IOException]
|
|
deriving (Eq, Show, Generic, Typeable)
|
|
deriving anyclass (Exception)
|
|
|
|
|
|
withLdap :: (MonadUnliftIO m, MonadCatch m, Typeable a) => LdapPool -> (Ldap -> m a) -> m (Either LdapPoolError a)
|
|
withLdap pool act = fmap join . try . withResource pool $ \LdapExecutor{..} -> ldapExec act
|
|
|
|
withLdapFailover :: (MonadUnliftIO m, MonadMask m, Typeable a, MonadLogger m) => Lens p p' LdapPool Ldap -> Failover p -> FailoverMode -> (p' -> m a) -> m (Either LdapPoolError a)
|
|
withLdapFailover l@(flip withLens const -> proj) pool' mode act = try . withFailover pool' mode (either throwE return) $ \x -> withLdap (proj x) (\c -> act $ x & l .~ c)
|
|
|
|
withLdapFailoverReTest :: (MonadUnliftIO m, MonadMask m, Typeable a, MonadLogger m) => Lens p p' LdapPool Ldap -> Failover p -> (Nano -> Bool) -> FailoverMode -> (p' -> m a) -> m (Either LdapPoolError a)
|
|
withLdapFailoverReTest l@(flip withLens const -> proj) pool' doTest mode act = try . withFailoverReTest pool' doTest mode (either throwE return) $ \x -> withLdap (proj x) (\c -> act $ x & l .~ c)
|
|
|
|
|
|
createLdapPool :: forall m.
|
|
( MonadLogger m
|
|
, MonadResource m, MonadUnliftIO m
|
|
, MonadCatch m
|
|
)
|
|
=> Ldap.Host
|
|
-> Ldap.PortNumber
|
|
-> Int -- ^ Stripes
|
|
-> NominalDiffTime -- ^ Connection Timeout
|
|
-> NominalDiffTime -- ^ Action Timeout
|
|
-> Int -- ^ Limit
|
|
-> m LdapPool
|
|
createLdapPool host port stripes timeoutConn (round . (* 1e6) -> timeoutAct) limit = do
|
|
let
|
|
mkExecutor :: m LdapExecutor
|
|
mkExecutor = handleAny (\e -> $logErrorS "LdapExecuter" (tshow e) >> throwM e) . (`catches` convertErrors) $ do
|
|
ldapDestroy <- liftIO newEmptyTMVarIO
|
|
ldapAct <- liftIO newEmptyTMVarIO
|
|
|
|
let
|
|
ldapExec :: forall a m'. (Typeable a, MonadUnliftIO m') => (Ldap -> m' a) -> m' (Either LdapPoolError a)
|
|
ldapExec act = withRunInIO $ \runInIO -> convertErrors' . withTimeout $ do
|
|
ldapAnswer <- newEmptyTMVarIO :: IO (TMVar (Either SomeException Dynamic))
|
|
atomically $ putTMVar ldapAct (runInIO . fmap toDyn . act, ldapAnswer)
|
|
either throwIO (return . Right . flip fromDyn (error "Could not cast dynamic")) =<< atomically (takeTMVar ldapAnswer)
|
|
where
|
|
convertErrors' = flip catches
|
|
[ Handler $ return . Left . LdapError . Ldap.ParseError
|
|
, Handler $ return . Left . LdapError . Ldap.ResponseError
|
|
, Handler $ return . Left . LdapError . Ldap.IOError
|
|
, Handler $ return . Left . LdapError . Ldap.DisconnectError
|
|
, Handler $ return . Left . (id :: LdapPoolError -> LdapPoolError)
|
|
]
|
|
|
|
go :: Maybe (TMVar (Maybe a)) -> Ldap -> m ()
|
|
go waiting ldap = do
|
|
$logDebugS "LdapExecutor" "Waiting"
|
|
for_ waiting $ atomically . flip putTMVar Nothing
|
|
instruction <- atomically $ (Nothing <$ takeTMVar ldapDestroy) <|> (Just <$> takeTMVar ldapAct)
|
|
case instruction of
|
|
Nothing -> $logDebugS "LdapExecutor" "Terminating"
|
|
Just (act, returnRes) -> do
|
|
$logDebugS "LdapExecutor" $ "Executing " <> tshow (host, port)
|
|
res <- try . withTimeout . liftIO $ act ldap
|
|
didReturn <- atomically $ tryPutTMVar returnRes res
|
|
unless didReturn $
|
|
$logErrorS "LdapExecutor" "Could not return result"
|
|
either throwM (const $ return ()) res
|
|
`catches`
|
|
[ Handler $ \case
|
|
Ldap.ResponseError Ldap.ResponseErrorCode{}
|
|
-> return ()
|
|
other
|
|
-> throwM other
|
|
]
|
|
go Nothing ldap
|
|
|
|
ldapAsync <- withTimeout $ do
|
|
setup <- liftIO newEmptyTMVarIO
|
|
|
|
ldapAsync <- allocateLinkedAsync . handleAny (atomically . void . tryPutTMVar setup . Just) $ do
|
|
$logDebugS "LdapExecutor" $ "Starting " <> tshow (host, port)
|
|
res <- withRunInIO $ \runInIO ->
|
|
Ldap.with host port $ runInIO . go (Just setup)
|
|
case res of
|
|
Left exc -> do
|
|
$logWarnS "LdapExecutor" $ tshow exc
|
|
atomically . void . tryPutTMVar setup . Just $ toException exc
|
|
Right res' -> return res'
|
|
|
|
maybe (return ()) throwM =<< atomically (takeTMVar setup)
|
|
|
|
return ldapAsync
|
|
|
|
return LdapExecutor{..}
|
|
|
|
delExecutor :: LdapExecutor -> IO ()
|
|
delExecutor LdapExecutor{..} = do
|
|
atomically . void $ tryPutTMVar ldapDestroy ()
|
|
wait ldapAsync
|
|
withRunInIO $ \runInIO ->
|
|
createPool (runInIO mkExecutor) delExecutor stripes timeoutConn limit
|
|
where
|
|
withTimeout :: forall m' a. (MonadUnliftIO m', MonadThrow m') => m' a -> m' a
|
|
withTimeout = maybe (throwM LdapPoolTimeout) return <=< timeout timeoutAct
|
|
|
|
convertErrors =
|
|
[ Handler $ \LineTooLong -> throwM LdapLineTooLong
|
|
, Handler $ \(HostNotResolved h) -> throwM $ LdapHostNotResolved h
|
|
, Handler $ \(HostCannotConnect h es) -> throwM $ LdapHostCannotConnect h es
|
|
]
|