261 lines
8.9 KiB
Haskell
261 lines
8.9 KiB
Haskell
{-# OPTIONS_GHC -Wno-error=unused-top-binds #-}
|
|
|
|
module Utils.Pool
|
|
( Pool', hoistPool
|
|
, PoolResourceIdent'
|
|
, Pool, PoolResourceIdent
|
|
, getPoolAvailableCount, getPoolInUseCount, getPoolUsesCount
|
|
, getPoolIdleTime, getPoolMaxAvailable
|
|
, createPool, createPool'
|
|
, purgePool
|
|
, withResource, withResource'
|
|
, destroyResources
|
|
, takeResource, releaseResource
|
|
, takeResource'
|
|
) where
|
|
|
|
import ClassyPrelude
|
|
|
|
import qualified Data.IntMap.Strict as IntMap
|
|
|
|
import UnliftIO.Async.Utils
|
|
import UnliftIO.Resource (MonadResource, register, release)
|
|
import UnliftIO.Concurrent (forkIO)
|
|
|
|
import Data.Fixed
|
|
|
|
import System.Clock
|
|
import Data.Time.Clock (DiffTime)
|
|
|
|
import Control.Concurrent.STM.Delay
|
|
import Control.Concurrent.STM.TVar (stateTVar)
|
|
|
|
import Control.Monad.Writer.Strict (runWriter)
|
|
import Control.Monad.Writer.Class (MonadWriter(..))
|
|
|
|
import Data.Semigroup (First(..))
|
|
|
|
import Utils.NTop
|
|
|
|
-- <https://hackage.haskell.org/package/ex-pool-0.2.1/docs/src/Data-Pool.html>
|
|
|
|
|
|
data PoolResourceIdent' c = PoolResourceIdent Int c
|
|
deriving (Eq, Ord, Show, Typeable)
|
|
|
|
|
|
data Pool' m c' c a = Pool
|
|
{ create :: m a
|
|
, destroy :: a -> m ()
|
|
, onUse :: c' -> a -> m c
|
|
, onRelease :: c -> a -> m ()
|
|
, idleTime :: !(Maybe Int)
|
|
, maxAvailable :: !(Maybe Int)
|
|
, resources :: !(TVar (PoolResources c a))
|
|
, aliveRef :: !(IORef ())
|
|
}
|
|
|
|
data PoolResources c a = PoolResources
|
|
{ inUseCount, availableCount :: !Int
|
|
, inUse :: !(IntMap a)
|
|
, available :: !(IntMap [a])
|
|
, inUseTick :: !Int
|
|
} deriving (Functor)
|
|
|
|
type Pool m a = Pool' m () () a
|
|
type PoolResourceIdent = PoolResourceIdent' ()
|
|
|
|
|
|
hoistPool :: (forall b. m b -> n b) -> Pool' m c' c a -> Pool' n c' c a
|
|
hoistPool nat Pool{..} = Pool
|
|
{ create = nat create
|
|
, destroy = nat . destroy
|
|
, onUse = (nat .) . onUse
|
|
, onRelease = (nat .) . onRelease
|
|
, ..
|
|
}
|
|
|
|
getPoolAvailableCount, getPoolInUseCount, getPoolUsesCount :: Pool' m c' c a -> STM Int
|
|
getPoolAvailableCount Pool{..} = availableCount <$> readTVar resources
|
|
getPoolInUseCount Pool{..} = inUseCount <$> readTVar resources
|
|
getPoolUsesCount Pool{..} = inUseTick <$> readTVar resources
|
|
|
|
getPoolIdleTime :: Pool' m c' c a -> Maybe DiffTime
|
|
getPoolIdleTime = fmap realToFrac . maxAvailable
|
|
getPoolMaxAvailable :: Pool' m c' c a -> Maybe Int
|
|
getPoolMaxAvailable = maxAvailable
|
|
|
|
toSecond :: TimeSpec -> Int
|
|
toSecond = fromIntegral . sec
|
|
|
|
currentSecond :: MonadIO m => m Int
|
|
currentSecond = liftIO $ toSecond <$> getTime Monotonic
|
|
|
|
|
|
createPool :: (MonadResource m, MonadUnliftIO m, MonadUnliftIO m')
|
|
=> (forall b. m' b -> m b)
|
|
-> m' a -- ^ Create
|
|
-> (a -> m' ()) -- ^ Destroy
|
|
-> Maybe Int -- ^ Timeout in seconds
|
|
-> Maybe Int -- ^ Max available
|
|
-> m (Pool m' a)
|
|
createPool nat create destroy = createPool' nat create destroy (\() _ -> return ()) (\() _ -> return ())
|
|
|
|
createPool' :: (MonadResource m, MonadUnliftIO m, MonadUnliftIO m')
|
|
=> (forall b. m' b -> m b)
|
|
-> m' a -- ^ Create
|
|
-> (a -> m' ()) -- ^ Destroy
|
|
-> (c' -> a -> m' c) -- ^ onUse
|
|
-> (c -> a -> m' ()) -- ^ onRelease
|
|
-> Maybe Int -- ^ Timeout in seconds
|
|
-> Maybe Int -- ^ Max available
|
|
-> m (Pool' m' c' c a)
|
|
createPool' nat create destroy onUse onRelease (fmap $ max 0 -> idleTime) (fmap $ max 0 -> maxAvailable) = do
|
|
let
|
|
inUseCount = 0
|
|
availableCount = 0
|
|
inUseTick = 0
|
|
inUse = IntMap.empty
|
|
available = IntMap.empty
|
|
aliveRef <- newIORef ()
|
|
resources <- newTVarIO PoolResources{..}
|
|
let pool = Pool{..}
|
|
|
|
reaper' <- for idleTime $ allocateLinkedAsync . nat . reaper destroy resources
|
|
relKey <- withRunInIO $ \runInIO -> runInIO . register . runInIO $ do
|
|
traverse_ cancel reaper'
|
|
nat $ purgePool pool
|
|
void . mkWeakIORef aliveRef $ release relKey
|
|
|
|
return pool
|
|
|
|
purgePool :: MonadUnliftIO m => Pool' m c' c a -> m ()
|
|
purgePool = destroyResources $ const True
|
|
|
|
reaper :: MonadUnliftIO m => (a -> m ()) -> TVar (PoolResources c a) -> Int -> m ()
|
|
reaper destroy' resources' t = forever $ do
|
|
atomically . waitDelay =<< liftIO (newDelay i)
|
|
|
|
cutoff <- subtract t <$> currentSecond
|
|
toDestroy <- atomically $ do
|
|
res@PoolResources{..} <- readTVar resources'
|
|
let (toDestroy, pivot, available'') = IntMap.splitLookup cutoff available
|
|
available' = maybe id (IntMap.insert cutoff) pivot available''
|
|
writeTVar resources' res
|
|
{ available = available'
|
|
, availableCount = availableCount - IntMap.size toDestroy
|
|
}
|
|
return toDestroy
|
|
forM_ toDestroy . mapM_ $ void . destroy'
|
|
|
|
where
|
|
MkFixed (fromIntegral -> i) = 1 :: Micro
|
|
|
|
takeResource :: MonadIO m => Pool m a -> m (a, PoolResourceIdent' ())
|
|
takeResource p = takeResource' p ()
|
|
|
|
takeResource' :: MonadIO m => Pool' m c' c a -> c' -> m (a, PoolResourceIdent' c)
|
|
takeResource' Pool{..} stateInit = do
|
|
takenAvailable <- atomically $ do
|
|
PoolResources{..} <- readTVar resources
|
|
case IntMap.maxViewWithKey available of
|
|
Just ((t, av : avs), available') -> do
|
|
let available''
|
|
| null avs = available'
|
|
| otherwise = IntMap.insert t avs available'
|
|
availableCount' = pred availableCount
|
|
inUse' = IntMap.insert inUseTick av inUse
|
|
inUseCount' = succ inUseCount
|
|
inUseTick' = succ inUseTick
|
|
writeTVar resources PoolResources
|
|
{ inUseCount = inUseCount'
|
|
, availableCount = availableCount'
|
|
, available = available''
|
|
, inUseTick = inUseTick'
|
|
, inUse = inUse'
|
|
}
|
|
return $ Just (av, inUseTick)
|
|
_other -> return Nothing
|
|
case takenAvailable of
|
|
Just (av, resTick) -> do
|
|
hookData <- onUse stateInit av
|
|
return (av, PoolResourceIdent resTick hookData)
|
|
Nothing -> do
|
|
newResource <- create
|
|
resTick <- atomically . stateTVar resources $ \res@PoolResources{..} ->
|
|
let inUseTick' = succ inUseTick
|
|
inUseCount' = succ inUseCount
|
|
inUse' = IntMap.insert inUseTick newResource inUse
|
|
in ( inUseTick
|
|
, res{ inUseCount = inUseCount', inUse = inUse', inUseTick = inUseTick' }
|
|
)
|
|
hookData <- onUse stateInit newResource
|
|
return (newResource, PoolResourceIdent resTick hookData)
|
|
|
|
releaseResource :: MonadUnliftIO m
|
|
=> Bool -- ^ Destroy resource and don't return to pool?
|
|
-> Pool' m c' c a
|
|
-> (a, PoolResourceIdent' c)
|
|
-> m ()
|
|
releaseResource isLost p@Pool{..} (x, ident)
|
|
| isLost = do
|
|
markResourceLost p ident
|
|
void . forkIO $ destroy x
|
|
| otherwise
|
|
= markResourceAvailable p ident
|
|
|
|
markResourceAvailable, markResourceLost :: MonadUnliftIO m => Pool' m c' c a -> PoolResourceIdent' c -> m ()
|
|
markResourceAvailable = returnResource True
|
|
markResourceLost = returnResource False
|
|
|
|
returnResource :: MonadUnliftIO m
|
|
=> Bool -- ^ return to available
|
|
-> Pool' m c' c a
|
|
-> PoolResourceIdent' c
|
|
-> m ()
|
|
returnResource toAvailable Pool{..} (PoolResourceIdent inUseKey hookData) = do
|
|
now <- if | toAvailable -> Just <$> currentSecond
|
|
| otherwise -> return Nothing
|
|
(toDestroy, released) <- atomically . stateTVar resources $ \res@PoolResources{..} -> case deleteView inUseKey inUse of
|
|
Nothing -> ((Nothing, Nothing), res)
|
|
Just (u, us) | NTop (Just availableCount) >= NTop maxAvailable
|
|
-> ((Just u, Just u),) res
|
|
{ inUse = us
|
|
, inUseCount = pred inUseCount
|
|
}
|
|
Just (u, us)
|
|
-> ((Nothing, Just u), ) PoolResources
|
|
{ inUse = us
|
|
, inUseCount = pred inUseCount
|
|
, availableCount = bool id succ toAvailable availableCount
|
|
, available = maybe id (IntMap.alter $ Just . (u :) . fromMaybe []) now available
|
|
, inUseTick
|
|
}
|
|
|
|
forM_ released $ \u -> onRelease hookData u
|
|
forM_ toDestroy $ void . forkIO . destroy
|
|
where
|
|
deleteView :: Int -> IntMap a -> Maybe (a, IntMap a)
|
|
deleteView k vs = (, vs') <$> fmap getFirst fv
|
|
where (vs', fv) = runWriter $ IntMap.alterF (\old -> Nothing <$ tell (First <$> old)) k vs
|
|
|
|
|
|
withResource :: forall b m a. MonadUnliftIO m => Pool m a -> (a -> m b) -> m b
|
|
withResource p = withResource' p ()
|
|
|
|
withResource' :: forall b m c' c a. MonadUnliftIO m => Pool' m c' c a -> c' -> (a -> m b) -> m b
|
|
withResource' p stateInit act = bracketOnError (takeResource' p stateInit) (releaseResource True p) (\x'@(x, _) -> act x <* releaseResource False p x')
|
|
|
|
destroyResources :: MonadUnliftIO m => (a -> Bool) -> Pool' m c' c a -> m ()
|
|
destroyResources p Pool{..} = do
|
|
toDestroy <- atomically . stateTVar resources $ \res@PoolResources{..}
|
|
-> let partitioned = partition p <$> available
|
|
toDel = foldMap fst partitioned
|
|
toKeep = IntMap.mapMaybe (\(_, toKeep') -> toKeep' <$ guard (not $ null toKeep')) partitioned
|
|
in (toDel, ) res
|
|
{ availableCount = availableCount - length toDel
|
|
, available = toKeep
|
|
}
|
|
|
|
forM_ toDestroy $ void . forkIO . destroy
|