fradrive/src/Ldap/Client/Pool.hs
2020-04-27 16:17:00 +02:00

139 lines
5.5 KiB
Haskell

{-# OPTIONS_GHC -fno-warn-orphans #-}
module Ldap.Client.Pool
( LdapPool
, LdapExecutor, Ldap, LdapError
, LdapPoolError(..)
, withLdap, withLdapFailover, withLdapFailoverReTest
, createLdapPool
) where
import ClassyPrelude hiding (Handler, catches, try)
import Utils.Failover
import Control.Lens
import Ldap.Client (Ldap, LdapError)
import qualified Ldap.Client as Ldap
import UnliftIO.Pool
import UnliftIO.Async.Utils
import Control.Monad.Logger
import Data.Time.Clock (NominalDiffTime)
import Data.Dynamic
import Control.Monad.Trans.Resource (MonadResource)
import qualified Control.Monad.Trans.Resource as Resource
import Control.Monad.Catch
import Control.Monad.Trans.Except (throwE)
import Data.Fixed (Nano)
type LdapPool = Pool LdapExecutor
data LdapExecutor = LdapExecutor
{ ldapExec :: forall a m. (Typeable a, MonadUnliftIO m) => (Ldap -> m a) -> m (Either LdapPoolError a)
, ldapDestroy :: TMVar ()
, ldapAsync :: Async ()
}
data LdapPoolError = LdapPoolTimeout | LdapError LdapError
deriving (Eq, Show, Generic, Typeable)
instance Exception LdapPoolError
withLdap :: (MonadUnliftIO m, MonadCatch m, Typeable a) => LdapPool -> (Ldap -> m a) -> m (Either LdapPoolError a)
withLdap pool act = fmap join . try . withResource pool $ \LdapExecutor{..} -> ldapExec act
withLdapFailover :: (MonadUnliftIO m, MonadCatch m, Typeable a) => Lens p p' LdapPool Ldap -> Failover p -> FailoverMode -> (p' -> m a) -> m (Either LdapPoolError a)
withLdapFailover l@(flip withLens const -> proj) pool' mode act = try . withFailover pool' mode (either throwE return) $ \x -> withLdap (proj x) (\c -> act $ x & l .~ c)
withLdapFailoverReTest :: (MonadUnliftIO m, MonadCatch m, Typeable a) => Lens p p' LdapPool Ldap -> Failover p -> (Nano -> Bool) -> FailoverMode -> (p' -> m a) -> m (Either LdapPoolError a)
withLdapFailoverReTest l@(flip withLens const -> proj) pool' doTest mode act = try . withFailoverReTest pool' doTest mode (either throwE return) $ \x -> withLdap (proj x) (\c -> act $ x & l .~ c)
createLdapPool :: ( MonadLoggerIO m, MonadResource m )
=> Ldap.Host
-> Ldap.PortNumber
-> Int -- ^ Stripes
-> NominalDiffTime -- ^ Connection Timeout
-> NominalDiffTime -- ^ Action Timeout
-> Int -- ^ Limit
-> m LdapPool
createLdapPool host port stripes timeoutConn (round . (* 1e6) -> timeoutAct) limit = do
logFunc <- askLoggerIO
let
mkExecutor :: Resource.InternalState -> IO LdapExecutor
mkExecutor rSt = Resource.runInternalState ?? rSt $ do
ldapDestroy <- liftIO newEmptyTMVarIO
ldapAct <- liftIO newEmptyTMVarIO
let
ldapExec :: forall a m. (Typeable a, MonadUnliftIO m) => (Ldap -> m a) -> m (Either LdapPoolError a)
ldapExec act = withRunInIO $ \runInIO -> do
ldapAnswer <- newEmptyTMVarIO :: IO (TMVar (Either SomeException Dynamic))
atomically $ putTMVar ldapAct (runInIO . fmap toDyn . act, ldapAnswer)
either throwIO (return . Right . flip fromDyn (error "Could not cast dynamic")) =<< atomically (takeTMVar ldapAnswer)
`catches`
[ Handler $ return . Left . LdapError . Ldap.ParseError
, Handler $ return . Left . LdapError . Ldap.ResponseError
, Handler $ return . Left . LdapError . Ldap.IOError
, Handler $ return . Left . LdapError . Ldap.DisconnectError
, Handler $ return . Left . (id :: LdapPoolError -> LdapPoolError)
]
go :: Maybe (TMVar (Maybe a)) -> Ldap -> LoggingT IO ()
go waiting ldap = do
$logDebugS "LdapExecutor" "Waiting"
for_ waiting $ atomically . flip putTMVar Nothing
instruction <- atomically $ (Nothing <$ takeTMVar ldapDestroy) <|> (Just <$> takeTMVar ldapAct)
case instruction of
Nothing -> $logDebugS "LdapExecutor" "Terminating"
Just (act, returnRes) -> do
$logDebugS "LdapExecutor" "Executing"
res <- try . withTimeout . liftIO $ act ldap
didReturn <- atomically $ tryPutTMVar returnRes res
unless didReturn $
$logErrorS "LdapExecutor" "Could not return result"
either throwM (const $ return ()) res
`catches`
[ Handler $ \case
Ldap.ResponseError _ -> return ()
Ldap.DisconnectError _ -> return ()
other -> throwM other
]
go Nothing ldap
ldapAsync <- withTimeout $ do
setup <- liftIO newEmptyTMVarIO
ldapAsync <- allocateAsync . flip runLoggingT logFunc $ do
$logDebugS "LdapExecutor" "Starting"
res <- liftIO . Ldap.with host port $ flip runLoggingT logFunc . go (Just setup)
case res of
Left exc -> do
$logWarnS "LdapExecutor" $ tshow exc
atomically . void . tryPutTMVar setup $ Just exc
Right res' -> return res'
maybe (return ()) throwM =<< atomically (takeTMVar setup)
return ldapAsync
return LdapExecutor{..}
delExecutor :: LdapExecutor -> IO ()
delExecutor LdapExecutor{..} = do
atomically . void $ tryPutTMVar ldapDestroy ()
wait ldapAsync
rSt <- view _2 <$> Resource.allocate Resource.createInternalState Resource.closeInternalState
liftIO $ createPool (mkExecutor rSt) delExecutor stripes timeoutConn limit
where
withTimeout :: forall m a. (MonadUnliftIO m, MonadThrow m) => m a -> m a
withTimeout = maybe (throwM LdapPoolTimeout) return <=< timeout timeoutAct