feat(db): provide our own implementation of connection pooling
Also allows monitoring pool status (available/in use connections,
total number of takes from pool)
This reverts commit 35ac503bf9.
This commit is contained in:
parent
e48124a8b8
commit
50fdcb4540
@ -17,8 +17,10 @@ module Application
|
||||
) where
|
||||
|
||||
import Control.Monad.Logger (liftLoc, LoggingT(..), MonadLoggerIO(..))
|
||||
import Database.Persist.Postgresql (createPostgresqlPool, pgConnStr,
|
||||
pgPoolSize, runSqlPool, ConnectionPool, runSqlConn, withPostgresqlConn)
|
||||
import Database.Persist.Postgresql ( openSimpleConn, pgConnStr, connClose, pgPoolIdleTimeout
|
||||
, pgPoolSize
|
||||
)
|
||||
import qualified Database.PostgreSQL.Simple as PG
|
||||
import Import hiding (cancel, respond)
|
||||
import Language.Haskell.TH.Syntax (qLocation)
|
||||
import Network.Wai (Middleware)
|
||||
@ -108,6 +110,8 @@ import qualified Prometheus
|
||||
|
||||
import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
-- Import all relevant handler modules here.
|
||||
-- (HPack takes care to add new modules to our cabal file nowadays.)
|
||||
import Handler.News
|
||||
@ -203,15 +207,14 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
-- temporary foundation without a real connection pool, get a log function
|
||||
-- from there, and then create the real foundation.
|
||||
let
|
||||
mkFoundation :: _ -> _ -> (forall backend m a. (MonadUnliftIO m, BackendCompatible backend SqlBackend, MonadLogger m) => ReaderT backend m a -> m a) -> _
|
||||
mkFoundation appSettings' appDatabaseConnPool appDatabaseAccess appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey = UniWorX {..}
|
||||
mkFoundation :: _ -> (forall m. MonadIO m => Custom.Pool m SqlBackend) -> _
|
||||
mkFoundation appSettings' appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey = UniWorX {..}
|
||||
-- The UniWorX {..} syntax is an example of record wild cards. For more
|
||||
-- information, see:
|
||||
-- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html
|
||||
tempFoundation = mkFoundation
|
||||
(error "appSettings' forced in tempFoundation")
|
||||
(error "databaseConnPool forced in tempFoundation")
|
||||
(error "databaseAccess forced in tempFoundation")
|
||||
(error "connPool forced in tempFoundation")
|
||||
(error "smtpPool forced in tempFoundation")
|
||||
(error "ldapPool forced in tempFoundation")
|
||||
(error "cryptoIDKey forced in tempFoundation")
|
||||
@ -240,14 +243,22 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
|
||||
-- Create the database connection pool
|
||||
$logDebugS "setup" "PostgreSQL-Pool"
|
||||
appDatabaseConnPool <- createPostgresqlPool
|
||||
(pgConnStr appDatabaseConf)
|
||||
(pgPoolSize appDatabaseConf)
|
||||
let
|
||||
appDatabaseAccess :: forall backend m a. (MonadUnliftIO m, BackendCompatible backend SqlBackend, MonadLogger m) => ReaderT backend m a -> m a
|
||||
appDatabaseAccess
|
||||
| appDatabasePool = flip runSqlPool appDatabaseConnPool . withReaderT projectBackend
|
||||
| otherwise = withPostgresqlConn (pgConnStr appDatabaseConf) . runSqlConn . withReaderT projectBackend
|
||||
logFunc <- askLoggerIO
|
||||
sqlPool' <-
|
||||
let create = do
|
||||
$logDebugS "SqlPool" "Opening connection..."
|
||||
conn <- liftIO . PG.connectPostgreSQL $ pgConnStr appDatabaseConf
|
||||
backend <- liftIO $ openSimpleConn logFunc conn
|
||||
$logInfoS "SqlPool" "Opened connection"
|
||||
return backend
|
||||
destroy conn = do
|
||||
$logDebugS "SqlPool" "Closing connection..."
|
||||
liftIO $ connClose conn
|
||||
$logInfoS "SqlPool" "Closed connection"
|
||||
in Custom.createPool (liftIO . flip runLoggingT logFunc) create destroy (Just . fromIntegral $ pgPoolIdleTimeout appDatabaseConf) (Just $ pgPoolSize appDatabaseConf)
|
||||
let sqlPool :: forall m. MonadIO m => Custom.Pool m SqlBackend
|
||||
sqlPool = Custom.hoistPool (liftIO . flip runLoggingT logFunc) sqlPool'
|
||||
void . Prometheus.register . poolMetrics PoolDatabaseConnections $ sqlPool @IO
|
||||
|
||||
ldapPool <- traverse mkFailoverLabeled <=< forOf (traverse . traverse) appLdapConf $ \conf@LdapConf{..} -> do
|
||||
let ldapLabel = case ldapHost of
|
||||
@ -262,33 +273,33 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
if
|
||||
| appAutoDbMigrate -> do
|
||||
$logDebugS "setup" "Migration"
|
||||
appDatabaseAccess migrateAll
|
||||
| otherwise -> whenM (appDatabaseAccess requiresMigration) $ do
|
||||
migrateAll `customRunSqlPool` sqlPool
|
||||
| otherwise -> whenM (requiresMigration `customRunSqlPool` sqlPool) $ do
|
||||
$logErrorS "setup" "Migration required"
|
||||
liftIO . exitWith $ ExitFailure 130
|
||||
|
||||
$logDebugS "setup" "Cluster-Config"
|
||||
appCryptoIDKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterCryptoIDKey
|
||||
appSecretBoxKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterSecretBoxKey
|
||||
appJSONWebKeySet <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterJSONWebKeySet
|
||||
appClusterID <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterId
|
||||
appVerpSecret <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterVerpSecret
|
||||
appAuthKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterAuthKey
|
||||
appCryptoIDKey <- clusterSetting (Proxy :: Proxy 'ClusterCryptoIDKey) `customRunSqlPool` sqlPool
|
||||
appSecretBoxKey <- clusterSetting (Proxy :: Proxy 'ClusterSecretBoxKey) `customRunSqlPool` sqlPool
|
||||
appJSONWebKeySet <- clusterSetting (Proxy :: Proxy 'ClusterJSONWebKeySet) `customRunSqlPool` sqlPool
|
||||
appClusterID <- clusterSetting (Proxy :: Proxy 'ClusterId) `customRunSqlPool` sqlPool
|
||||
appVerpSecret <- clusterSetting (Proxy :: Proxy 'ClusterVerpSecret) `customRunSqlPool` sqlPool
|
||||
appAuthKey <- clusterSetting (Proxy :: Proxy 'ClusterAuthKey) `customRunSqlPool` sqlPool
|
||||
|
||||
needsRechunk <- appDatabaseAccess @SqlReadBackend $ exists [FileContentChunkContentBased !=. True]
|
||||
needsRechunk <- exists [FileContentChunkContentBased !=. True] `customRunSqlPool` sqlPool
|
||||
let appSettings' = appSettings''
|
||||
& _appRechunkFiles %~ guardOnM needsRechunk
|
||||
|
||||
appMemcached <- for appMemcachedConf $ \memcachedConf -> do
|
||||
$logDebugS "setup" "Memcached"
|
||||
memcachedKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterMemcachedKey
|
||||
memcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterMemcachedKey) `customRunSqlPool` sqlPool
|
||||
memcached <- createMemcached memcachedConf
|
||||
when appClearCache $ do
|
||||
$logWarnS "setup" "Clearing memcached"
|
||||
liftIO $ Memcached.flushAll memcached
|
||||
return (memcachedKey, memcached)
|
||||
|
||||
appSessionStore <- appDatabaseAccess $ mkSessionStore appSettings'' appDatabaseConnPool
|
||||
appSessionStore <- mkSessionStore appSettings'' sqlPool `customRunSqlPool` sqlPool
|
||||
|
||||
appUploadCache <- for appUploadCacheConf $ \minioConf -> liftIO $ do
|
||||
conn <- Minio.connect minioConf
|
||||
@ -301,7 +312,7 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
|
||||
$logDebugS "Runtime configuration" $ tshow appSettings'
|
||||
|
||||
let foundation = mkFoundation appSettings' appDatabaseConnPool appDatabaseAccess smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey
|
||||
let foundation = mkFoundation appSettings' sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey
|
||||
|
||||
-- Return the foundation
|
||||
$logDebugS "setup" "Done"
|
||||
@ -318,7 +329,9 @@ mkSessionStore :: forall m.
|
||||
, MonadThrow m
|
||||
, MonadResource m
|
||||
)
|
||||
=> AppSettings -> ConnectionPool -> ReaderT SqlBackend m SomeSessionStorage
|
||||
=> AppSettings
|
||||
-> (forall m'. MonadIO m' => Custom.Pool m' SqlBackend)
|
||||
-> ReaderT SqlBackend m SomeSessionStorage
|
||||
mkSessionStore AppSettings{..} mcdSqlConnPool
|
||||
| Just mcdConf@MemcachedConf{..} <- appSessionMemcachedConf = do
|
||||
mcdSqlMemcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterServerSessionKey)
|
||||
@ -652,7 +665,7 @@ shutdownApp :: (MonadIO m, MonadUnliftIO m) => UniWorX -> m ()
|
||||
shutdownApp app = do
|
||||
stopJobCtl app
|
||||
liftIO $ do
|
||||
destroyAllResources $ appDatabaseConnPool app
|
||||
Custom.purgePool $ appConnPool app
|
||||
for_ (appSmtpPool app) destroyAllResources
|
||||
for_ (appLdapPool app) . mapFailover $ views _2 destroyAllResources
|
||||
for_ (appWidgetMemcached app) Memcached.close
|
||||
|
||||
@ -13,15 +13,17 @@ import GHC.IO.Exception (IOErrorType(OtherError))
|
||||
import Database.Persist.Sql (SqlReadBackend(..))
|
||||
import Database.Persist.Sql.Raw.QQ (executeQQ)
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
|
||||
runSqlPoolRetry :: forall m a backend.
|
||||
( MonadUnliftIO m
|
||||
( MonadUnliftIO m, BackendCompatible SqlBackend backend
|
||||
, MonadLogger m, MonadMask m
|
||||
)
|
||||
=> (ReaderT backend m a -> m a)
|
||||
-> ReaderT backend m a
|
||||
=> ReaderT backend m a
|
||||
-> Custom.Pool m backend
|
||||
-> m a
|
||||
runSqlPoolRetry dbAccess action = do
|
||||
runSqlPoolRetry action pool = do
|
||||
let policy = Retry.fullJitterBackoff 1e3 & Retry.limitRetriesByCumulativeDelay 10e6
|
||||
handlers = Retry.skipAsyncExceptions `snoc` Retry.logRetries suggestRetry logRetry
|
||||
where suggestRetry :: IOException -> m Bool
|
||||
@ -39,10 +41,9 @@ runSqlPoolRetry dbAccess action = do
|
||||
|
||||
Retry.recovering policy handlers $ \Retry.RetryStatus{..} -> do
|
||||
$logDebugS "runSqlPoolRetry" $ "rsIterNumber = " <> tshow rsIterNumber
|
||||
dbAccess action
|
||||
customRunSqlPool action pool
|
||||
|
||||
runDBRead :: ReaderT SqlReadBackend (HandlerFor UniWorX) a -> (HandlerFor UniWorX) a
|
||||
runDBRead action = do
|
||||
$logDebugS "YesodPersist" "runDBRead"
|
||||
dbAccess <- getsYesod appDatabaseAccess
|
||||
runSqlPoolRetry dbAccess . withReaderT SqlReadBackend $ [executeQQ|SET TRANSACTION READ ONLY|] *> action
|
||||
runSqlPoolRetry (withReaderT SqlReadBackend $ [executeQQ|SET TRANSACTION READ ONLY|] *> action) . appConnPool =<< getYesod
|
||||
|
||||
@ -6,7 +6,7 @@ module Foundation.Type
|
||||
, SomeSessionStorage(..)
|
||||
, _SessionStorageMemcachedSql, _SessionStorageAcid
|
||||
, SMTPPool
|
||||
, _appSettings', _appStatic, _appDatabaseConnPool, _appDatabaseAccess, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport, _appMemcached, _appUploadCache, _appVerpSecret, _appAuthKey
|
||||
, _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport, _appMemcached, _appUploadCache, _appVerpSecret, _appAuthKey
|
||||
, DB, Form, MsgRenderer, MailM, DBFile
|
||||
) where
|
||||
|
||||
@ -26,6 +26,8 @@ import Network.Minio (MinioConn)
|
||||
|
||||
import Data.IntervalMap.Strict (IntervalMap)
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
|
||||
type SMTPPool = Pool SMTPConnection
|
||||
|
||||
@ -42,8 +44,7 @@ makePrisms ''SomeSessionStorage
|
||||
data UniWorX = UniWorX
|
||||
{ appSettings' :: AppSettings
|
||||
, appStatic :: EmbeddedStatic -- ^ Settings for static file serving.
|
||||
, appDatabaseConnPool :: Pool SqlBackend
|
||||
, appDatabaseAccess :: forall backend m a. (MonadUnliftIO m, BackendCompatible backend SqlBackend, MonadLogger m) => ReaderT backend m a -> m a
|
||||
, appConnPool :: forall m. MonadIO m => Custom.Pool m SqlBackend -- ^ Database connection pool.
|
||||
, appSmtpPool :: Maybe SMTPPool
|
||||
, appLdapPool :: Maybe (Failover (LdapConf, LdapPool))
|
||||
, appWidgetMemcached :: Maybe Memcached.Connection -- ^ Actually a proper pool
|
||||
|
||||
@ -10,6 +10,11 @@ import Foundation.DB
|
||||
import Foundation.Authorization
|
||||
|
||||
import Database.Persist.Sql (transactionUndo)
|
||||
import qualified Database.Persist.Sql as SQL
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
import UnliftIO.Resource (allocate, unprotect)
|
||||
|
||||
|
||||
runDB :: ( YesodPersistBackend UniWorX ~ SqlBackend
|
||||
@ -25,15 +30,34 @@ runDB action = do
|
||||
| dryRun = action <* transactionUndo
|
||||
| otherwise = action
|
||||
|
||||
dbAccess <- getsYesod appDatabaseAccess
|
||||
runSqlPoolRetry dbAccess action'
|
||||
runSqlPoolRetry action' . appConnPool =<< getYesod
|
||||
|
||||
getDBRunner :: ( YesodPersistBackend UniWorX ~ SqlBackend
|
||||
, BearerAuthSite UniWorX
|
||||
)
|
||||
=> HandlerFor UniWorX (DBRunner UniWorX, HandlerFor UniWorX ())
|
||||
getDBRunner = do
|
||||
(DBRunner{..}, cleanup) <- defaultGetDBRunner appDatabaseConnPool
|
||||
pool <- getsYesod appConnPool
|
||||
UnliftIO{..} <- askUnliftIO
|
||||
let withPrep conn f = f (persistBackend conn) (SQL.getStmtConn $ persistBackend conn)
|
||||
(relKey, (conn, ident)) <- allocate
|
||||
(do
|
||||
(conn, ident) <- unliftIO $ Custom.takeResource pool
|
||||
withPrep conn (\c f -> SQL.connBegin c f Nothing)
|
||||
return (conn, ident)
|
||||
)
|
||||
(\(conn, ident) -> do
|
||||
withPrep conn SQL.connRollback
|
||||
unliftIO $ Custom.releaseResource True pool (conn, ident)
|
||||
)
|
||||
|
||||
let cleanup = liftIO $ do
|
||||
withPrep conn SQL.connCommit
|
||||
unliftIO $ Custom.releaseResource True pool (conn, ident)
|
||||
void $ unprotect relKey
|
||||
runDBRunner :: forall a. YesodDB UniWorX a -> HandlerFor UniWorX a
|
||||
runDBRunner = flip runReaderT conn
|
||||
|
||||
return . (, cleanup) $ DBRunner
|
||||
(\action -> do
|
||||
dryRun <- isDryRun
|
||||
|
||||
@ -90,7 +90,6 @@ data AppSettings = AppSettings
|
||||
, appWellKnownDir :: FilePath
|
||||
, appWellKnownLinkFile :: FilePath
|
||||
, appDatabaseConf :: PostgresConf
|
||||
, appDatabasePool :: Bool
|
||||
-- ^ Configuration settings for accessing the database.
|
||||
, appAutoDbMigrate :: Bool
|
||||
, appLdapConf :: Maybe (PointedList LdapConf)
|
||||
@ -517,7 +516,6 @@ instance FromJSON AppSettings where
|
||||
appWellKnownLinkFile <- o .: "well-known-link-file"
|
||||
appWebpackEntrypoints <- o .: "webpack-manifest"
|
||||
appDatabaseConf <- o .: "database"
|
||||
appDatabasePool <- o .:? "database-pool" .!= True
|
||||
appAutoDbMigrate <- o .: "auto-db-migrate"
|
||||
let nonEmptyHost LdapConf{..} = case ldapHost of
|
||||
Ldap.Tls host _ -> not $ null host
|
||||
|
||||
@ -14,6 +14,10 @@ import Control.Lens.Extras (is)
|
||||
|
||||
import Control.Monad.Catch
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
import Database.Persist.Sql (runSqlConn)
|
||||
|
||||
|
||||
emptyOrIn :: PersistField typ
|
||||
=> E.SqlExpr (E.Value typ) -> Set typ -> E.SqlExpr (E.Value Bool)
|
||||
@ -154,3 +158,6 @@ selectMaybe fltrs opts = listToMaybe <$> selectList fltrs (LimitTo 1 : opts')
|
||||
LimitTo _ -> True
|
||||
_other -> False
|
||||
|
||||
|
||||
customRunSqlPool :: (MonadUnliftIO m, BackendCompatible SqlBackend backend) => ReaderT backend m a -> Custom.Pool m backend -> m a
|
||||
customRunSqlPool act p = Custom.withResource p $ runSqlConn act
|
||||
|
||||
@ -20,6 +20,8 @@ module Utils.Metrics
|
||||
, LRUMetrics, LRULabel(..)
|
||||
, lruMetrics
|
||||
, InjectInhibitMetrics, injectInhibitMetrics
|
||||
, PoolMetrics, PoolLabel(..)
|
||||
, poolMetrics
|
||||
) where
|
||||
|
||||
import Import.NoModel hiding (Vector, Info)
|
||||
@ -50,6 +52,8 @@ import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
import qualified Data.Foldable as F
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
{-# ANN module ("HLint: ignore Use even" :: String) #-}
|
||||
|
||||
|
||||
@ -328,6 +332,45 @@ injectInhibitMetrics tvar = Metric $ return (InjectInhibitMetrics, collectInject
|
||||
"Number of distinct time intervals in which we don't transfer some files from upload cache to db"
|
||||
hashesInfo = Info "uni2work_inject_inhibited_hashes_count"
|
||||
"Number of files which we don't transfer from upload cache to db during some interval"
|
||||
|
||||
data PoolMetrics = PoolMetrics
|
||||
|
||||
data PoolLabel = PoolDatabaseConnections
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
|
||||
nullaryPathPiece ''PoolLabel $ camelToPathPiece' 1
|
||||
|
||||
poolMetrics :: PoolLabel
|
||||
-> Custom.Pool m a
|
||||
-> Metric PoolMetrics
|
||||
poolMetrics lbl pool = Metric $ return (PoolMetrics, collectPoolMetrics)
|
||||
where
|
||||
labelPool = relabel "pool" $ toPathPiece lbl
|
||||
|
||||
collectPoolMetrics = map labelPool <$> do
|
||||
(available, inUse, usesCount) <- atomically $ (,,)
|
||||
<$> Custom.getPoolAvailableCount pool
|
||||
<*> Custom.getPoolInUseCount pool
|
||||
<*> Custom.getPoolUsesCount pool
|
||||
return
|
||||
[ SampleGroup availableInfo GaugeType
|
||||
[ Sample "uni2work_pool_available_count" [] . encodeUtf8 $ tshow available
|
||||
]
|
||||
, SampleGroup inUseInfo GaugeType
|
||||
[ Sample "uni2work_pool_in_use_count" [] . encodeUtf8 $ tshow inUse
|
||||
]
|
||||
, SampleGroup usesInfo CounterType
|
||||
[ Sample "uni2work_pool_uses_count" [] . encodeUtf8 $ tshow usesCount
|
||||
]
|
||||
]
|
||||
|
||||
availableInfo = Info "uni2work_pool_available_count"
|
||||
"Number of open resources available for taking"
|
||||
inUseInfo = Info "uni2work_pool_in_use_count"
|
||||
"Number of resources currently in use"
|
||||
usesInfo = Info "uni2work_pool_uses_count"
|
||||
"Number of takes executed against the pool"
|
||||
|
||||
|
||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||
|
||||
223
src/Utils/Pool.hs
Normal file
223
src/Utils/Pool.hs
Normal file
@ -0,0 +1,223 @@
|
||||
{-# OPTIONS_GHC -Wno-error=unused-top-binds #-}
|
||||
|
||||
module Utils.Pool
|
||||
( Pool, hoistPool
|
||||
, getPoolAvailableCount, getPoolInUseCount, getPoolUsesCount
|
||||
, createPool
|
||||
, purgePool
|
||||
, withResource
|
||||
, destroyResources
|
||||
, takeResource, releaseResource
|
||||
) where
|
||||
|
||||
import ClassyPrelude
|
||||
|
||||
import qualified Data.IntMap.Strict as IntMap
|
||||
|
||||
import UnliftIO.Async.Utils
|
||||
import UnliftIO.Resource (MonadResource, register, release)
|
||||
import UnliftIO.Concurrent (forkIO)
|
||||
|
||||
import Data.Fixed
|
||||
|
||||
import System.Clock
|
||||
|
||||
import Control.Concurrent.STM.Delay
|
||||
import Control.Concurrent.STM.TVar (stateTVar)
|
||||
|
||||
import Control.Monad.Writer.Strict (runWriter)
|
||||
import Control.Monad.Writer.Class (MonadWriter(..))
|
||||
|
||||
import Data.Semigroup (First(..))
|
||||
|
||||
import Utils.NTop
|
||||
|
||||
-- <https://hackage.haskell.org/package/ex-pool-0.2.1/docs/src/Data-Pool.html>
|
||||
|
||||
|
||||
newtype PoolResourceIdent = PoolResourceIdent Int
|
||||
deriving (Eq, Ord, Show, Typeable)
|
||||
|
||||
|
||||
data Pool m a = Pool
|
||||
{ create :: m a
|
||||
, destroy :: a -> m ()
|
||||
, idleTime :: !(Maybe Int)
|
||||
, maxAvailable :: !(Maybe Int)
|
||||
, resources :: !(TVar (PoolResources a))
|
||||
, aliveRef :: !(IORef ())
|
||||
}
|
||||
|
||||
data PoolResources a = PoolResources
|
||||
{ inUseCount, availableCount :: !Int
|
||||
, inUse :: !(IntMap a)
|
||||
, available :: !(IntMap [a])
|
||||
, inUseTick :: !Int
|
||||
} deriving (Functor)
|
||||
|
||||
|
||||
hoistPool :: (forall b. m b -> n b) -> Pool m a -> Pool n a
|
||||
hoistPool nat Pool{..} = Pool
|
||||
{ create = nat create
|
||||
, destroy = nat . destroy
|
||||
, ..
|
||||
}
|
||||
|
||||
getPoolAvailableCount, getPoolInUseCount, getPoolUsesCount :: Pool m a -> STM Int
|
||||
getPoolAvailableCount Pool{..} = availableCount <$> readTVar resources
|
||||
getPoolInUseCount Pool{..} = inUseCount <$> readTVar resources
|
||||
getPoolUsesCount Pool{..} = inUseTick <$> readTVar resources
|
||||
|
||||
|
||||
toSecond :: TimeSpec -> Int
|
||||
toSecond = fromIntegral . sec
|
||||
|
||||
currentSecond :: MonadIO m => m Int
|
||||
currentSecond = liftIO $ toSecond <$> getTime Monotonic
|
||||
|
||||
|
||||
createPool :: (MonadResource m, MonadUnliftIO m, MonadUnliftIO m')
|
||||
=> (forall b. m' b -> m b)
|
||||
-> m' a -- ^ Create
|
||||
-> (a -> m' ()) -- ^ Destroy
|
||||
-> Maybe Int -- ^ Timeout in seconds
|
||||
-> Maybe Int -- ^ Max available
|
||||
-> m (Pool m' a)
|
||||
createPool nat create destroy (fmap $ max 0 -> idleTime) (fmap $ max 0 -> maxAvailable) = do
|
||||
let
|
||||
inUseCount = 0
|
||||
availableCount = 0
|
||||
inUseTick = 0
|
||||
inUse = IntMap.empty
|
||||
available = IntMap.empty
|
||||
aliveRef <- newIORef ()
|
||||
resources <- newTVarIO PoolResources{..}
|
||||
let pool = Pool{..}
|
||||
|
||||
reaper' <- for idleTime $ allocateLinkedAsync . nat . reaper destroy resources
|
||||
relKey <- withRunInIO $ \runInIO -> runInIO . register . runInIO $ do
|
||||
traverse_ cancel reaper'
|
||||
nat $ purgePool pool
|
||||
void . mkWeakIORef aliveRef $ release relKey
|
||||
|
||||
return pool
|
||||
|
||||
purgePool :: MonadUnliftIO m => Pool m a -> m ()
|
||||
purgePool = destroyResources $ const True
|
||||
|
||||
reaper :: MonadUnliftIO m => (a -> m ()) -> TVar (PoolResources a) -> Int -> m ()
|
||||
reaper destroy' resources' t = forever $ do
|
||||
atomically . waitDelay =<< liftIO (newDelay i)
|
||||
|
||||
cutoff <- subtract t <$> currentSecond
|
||||
toDestroy <- atomically $ do
|
||||
res@PoolResources{..} <- readTVar resources'
|
||||
let (toDestroy, pivot, available'') = IntMap.splitLookup cutoff available
|
||||
available' = maybe id (IntMap.insert cutoff) pivot available''
|
||||
writeTVar resources' res
|
||||
{ available = available'
|
||||
, availableCount = availableCount - IntMap.size toDestroy
|
||||
}
|
||||
return toDestroy
|
||||
forM_ toDestroy . mapM_ $ void . destroy'
|
||||
|
||||
where
|
||||
MkFixed (fromIntegral -> i) = 1 :: Micro
|
||||
|
||||
takeResource :: MonadIO m => Pool m a -> m (a, PoolResourceIdent)
|
||||
takeResource Pool{..} = do
|
||||
takenAvailable <- atomically $ do
|
||||
PoolResources{..} <- readTVar resources
|
||||
case IntMap.maxViewWithKey available of
|
||||
Just ((t, av : avs), available') -> do
|
||||
let available''
|
||||
| null avs = available'
|
||||
| otherwise = IntMap.insert t avs available'
|
||||
availableCount' = pred availableCount
|
||||
inUse' = IntMap.insert inUseTick av inUse
|
||||
inUseCount' = succ inUseCount
|
||||
inUseTick' = succ inUseTick
|
||||
writeTVar resources PoolResources
|
||||
{ inUseCount = inUseCount'
|
||||
, availableCount = availableCount'
|
||||
, available = available''
|
||||
, inUseTick = inUseTick'
|
||||
, inUse = inUse'
|
||||
}
|
||||
return $ Just (av, inUseTick)
|
||||
_other -> return Nothing
|
||||
case takenAvailable of
|
||||
Just (av, resTick) -> return (av, PoolResourceIdent resTick)
|
||||
Nothing -> do
|
||||
newResource <- create
|
||||
resTick <- atomically . stateTVar resources $ \res@PoolResources{..} ->
|
||||
let inUseTick' = succ inUseTick
|
||||
inUseCount' = succ inUseCount
|
||||
inUse' = IntMap.insert inUseTick newResource inUse
|
||||
in ( inUseTick
|
||||
, res{ inUseCount = inUseCount', inUse = inUse', inUseTick = inUseTick' }
|
||||
)
|
||||
return (newResource, PoolResourceIdent resTick)
|
||||
|
||||
releaseResource :: MonadUnliftIO m
|
||||
=> Bool -- ^ Destroy resource and don't return to pool?
|
||||
-> Pool m a
|
||||
-> (a, PoolResourceIdent)
|
||||
-> m ()
|
||||
releaseResource isLost p@Pool{..} (x, ident)
|
||||
| isLost = do
|
||||
markResourceLost p ident
|
||||
void . forkIO $ destroy x
|
||||
| otherwise
|
||||
= markResourceAvailable p ident
|
||||
|
||||
markResourceAvailable, markResourceLost :: MonadUnliftIO m => Pool m a -> PoolResourceIdent -> m ()
|
||||
markResourceAvailable = returnResource True
|
||||
markResourceLost = returnResource False
|
||||
|
||||
returnResource :: MonadUnliftIO m
|
||||
=> Bool -- ^ return to available
|
||||
-> Pool m a
|
||||
-> PoolResourceIdent
|
||||
-> m ()
|
||||
returnResource toAvailable Pool{..} (PoolResourceIdent inUseKey) = do
|
||||
now <- if | toAvailable -> Just <$> currentSecond
|
||||
| otherwise -> return Nothing
|
||||
toDestroy <- atomically . stateTVar resources $ \res@PoolResources{..} -> case deleteView inUseKey inUse of
|
||||
Nothing -> (Nothing, res)
|
||||
Just (u, us) | NTop (Just availableCount) >= NTop maxAvailable
|
||||
-> (Just u,) res
|
||||
{ inUse = us
|
||||
, inUseCount = pred inUseCount
|
||||
}
|
||||
Just (u, us)
|
||||
-> (Nothing, ) PoolResources
|
||||
{ inUse = us
|
||||
, inUseCount = pred inUseCount
|
||||
, availableCount = bool id succ toAvailable availableCount
|
||||
, available = maybe id (IntMap.alter $ Just . (u :) . fromMaybe []) now available
|
||||
, inUseTick
|
||||
}
|
||||
|
||||
forM_ toDestroy $ void . forkIO . destroy
|
||||
where
|
||||
deleteView :: Int -> IntMap a -> Maybe (a, IntMap a)
|
||||
deleteView k vs = (, vs') <$> fmap getFirst fv
|
||||
where (vs', fv) = runWriter $ IntMap.alterF (\old -> Nothing <$ tell (First <$> old)) k vs
|
||||
|
||||
|
||||
withResource :: forall b m a. MonadUnliftIO m => Pool m a -> (a -> m b) -> m b
|
||||
withResource p act = bracketOnError (takeResource p) (releaseResource True p) (\x'@(x, _) -> act x <* releaseResource False p x')
|
||||
|
||||
destroyResources :: MonadUnliftIO m => (a -> Bool) -> Pool m a -> m ()
|
||||
destroyResources p Pool{..} = do
|
||||
toDestroy <- atomically . stateTVar resources $ \res@PoolResources{..}
|
||||
-> let partitioned = partition p <$> available
|
||||
toDel = foldMap fst partitioned
|
||||
toKeep = IntMap.mapMaybe (\(_, toKeep') -> toKeep' <$ guard (not $ null toKeep')) partitioned
|
||||
in (toDel, ) res
|
||||
{ availableCount = availableCount - length toDel
|
||||
, available = toKeep
|
||||
}
|
||||
|
||||
forM_ toDestroy $ void . forkIO . destroy
|
||||
@ -14,7 +14,7 @@ import Utils.Lens
|
||||
|
||||
import Web.ServerSession.Core
|
||||
|
||||
import Database.Persist.Sql (ConnectionPool, runSqlPool)
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
import qualified Data.Binary as Binary
|
||||
|
||||
@ -46,7 +46,7 @@ share [mkPersist sqlSettings, mkMigrate "migrateMemcachedSqlStorage"]
|
||||
|
||||
|
||||
data MemcachedSqlStorage sess = MemcachedSqlStorage
|
||||
{ mcdSqlConnPool :: ConnectionPool
|
||||
{ mcdSqlConnPool :: forall m. MonadIO m => Custom.Pool m SqlBackend
|
||||
, mcdSqlMemcached :: Memcached.Connection
|
||||
, mcdSqlMemcachedKey :: AEAD.Key
|
||||
, mcdSqlMemcachedExpiration :: Maybe NominalDiffTime
|
||||
@ -108,7 +108,7 @@ instance (IsSessionData sess, Binary (Decomposed sess)) => Storage (MemcachedSql
|
||||
type SessionData (MemcachedSqlStorage sess) = sess
|
||||
type TransactionM (MemcachedSqlStorage sess) = SqlPersistT IO
|
||||
|
||||
runTransactionM MemcachedSqlStorage{..} = flip runSqlPool mcdSqlConnPool
|
||||
runTransactionM MemcachedSqlStorage{..} = flip customRunSqlPool mcdSqlConnPool
|
||||
|
||||
getSession MemcachedSqlStorage{..} sessId = exceptT (maybe (return Nothing) throwM) (return . Just) $ do
|
||||
encSession <- catchIfExceptT (const Nothing) Memcached.isKeyNotFound . liftIO . fmap LBS.toStrict $ Memcached.getAndTouch_ expiry (memcachedSqlSessionId # sessId) mcdSqlMemcached
|
||||
|
||||
@ -7,8 +7,6 @@ module Database
|
||||
import "uniworx" Import hiding (Option(..), getArgs)
|
||||
import "uniworx" Application (db', getAppSettings)
|
||||
|
||||
import UnliftIO.Pool (destroyAllResources)
|
||||
|
||||
import Database.Persist.Postgresql
|
||||
import Control.Monad.Logger
|
||||
|
||||
@ -21,6 +19,8 @@ import Database.Persist.Sql.Raw.QQ
|
||||
|
||||
import Database.Fill (fillDb)
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
|
||||
data DBAction = DBClear
|
||||
| DBTruncate
|
||||
@ -48,7 +48,7 @@ main = do
|
||||
[executeQQ|drop owned by current_user|] :: ReaderT SqlBackend _ ()
|
||||
DBTruncate -> db' $ do
|
||||
foundation <- getYesod
|
||||
liftIO . destroyAllResources $ appDatabaseConnPool foundation
|
||||
Custom.purgePool $ appConnPool foundation
|
||||
truncateDb
|
||||
DBMigrate -> db' $ return ()
|
||||
DBFill -> db' $ fillDb
|
||||
|
||||
@ -13,7 +13,6 @@ import ClassyPrelude as X
|
||||
)
|
||||
import Database.Persist as X hiding (get)
|
||||
import Database.Persist.Sql as X (SqlPersistM)
|
||||
import Database.Persist.Sql (runSqlPersistMPool)
|
||||
import Foundation as X
|
||||
import Model as X
|
||||
import Test.Hspec as X
|
||||
@ -71,6 +70,9 @@ import Utils.Parameters (GlobalPostParam(PostLoginDummy))
|
||||
|
||||
import Control.Monad.Morph as X (generalize)
|
||||
|
||||
import Control.Monad.Logger (runNoLoggingT)
|
||||
import Utils.DB (customRunSqlPool)
|
||||
|
||||
|
||||
runDB :: SqlPersistM a -> YesodExample UniWorX a
|
||||
runDB query = do
|
||||
@ -78,7 +80,7 @@ runDB query = do
|
||||
liftIO $ runDBWithApp app query
|
||||
|
||||
runDBWithApp :: MonadIO m => UniWorX -> SqlPersistM a -> m a
|
||||
runDBWithApp app query = liftIO $ runSqlPersistMPool query (appDatabaseConnPool app)
|
||||
runDBWithApp app query = liftIO . runResourceT . runNoLoggingT . customRunSqlPool query $ appConnPool app
|
||||
|
||||
runHandler :: Handler a -> YesodExample UniWorX a
|
||||
runHandler handler = do
|
||||
|
||||
Loading…
Reference in New Issue
Block a user