diff --git a/src/Application.hs b/src/Application.hs index 03d798600..d32a05f0e 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -17,8 +17,10 @@ module Application ) where import Control.Monad.Logger (liftLoc, LoggingT(..), MonadLoggerIO(..)) -import Database.Persist.Postgresql (createPostgresqlPool, pgConnStr, - pgPoolSize, runSqlPool, ConnectionPool, runSqlConn, withPostgresqlConn) +import Database.Persist.Postgresql ( openSimpleConn, pgConnStr, connClose, pgPoolIdleTimeout + , pgPoolSize + ) +import qualified Database.PostgreSQL.Simple as PG import Import hiding (cancel, respond) import Language.Haskell.TH.Syntax (qLocation) import Network.Wai (Middleware) @@ -108,6 +110,8 @@ import qualified Prometheus import qualified Data.IntervalMap.Strict as IntervalMap +import qualified Utils.Pool as Custom + -- Import all relevant handler modules here. -- (HPack takes care to add new modules to our cabal file nowadays.) import Handler.News @@ -203,15 +207,14 @@ makeFoundation appSettings''@AppSettings{..} = do -- temporary foundation without a real connection pool, get a log function -- from there, and then create the real foundation. let - mkFoundation :: _ -> _ -> (forall backend m a. (MonadUnliftIO m, BackendCompatible backend SqlBackend, MonadLogger m) => ReaderT backend m a -> m a) -> _ - mkFoundation appSettings' appDatabaseConnPool appDatabaseAccess appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey = UniWorX {..} + mkFoundation :: _ -> (forall m. MonadIO m => Custom.Pool m SqlBackend) -> _ + mkFoundation appSettings' appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey = UniWorX {..} -- The UniWorX {..} syntax is an example of record wild cards. For more -- information, see: -- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html tempFoundation = mkFoundation (error "appSettings' forced in tempFoundation") - (error "databaseConnPool forced in tempFoundation") - (error "databaseAccess forced in tempFoundation") + (error "connPool forced in tempFoundation") (error "smtpPool forced in tempFoundation") (error "ldapPool forced in tempFoundation") (error "cryptoIDKey forced in tempFoundation") @@ -240,14 +243,22 @@ makeFoundation appSettings''@AppSettings{..} = do -- Create the database connection pool $logDebugS "setup" "PostgreSQL-Pool" - appDatabaseConnPool <- createPostgresqlPool - (pgConnStr appDatabaseConf) - (pgPoolSize appDatabaseConf) - let - appDatabaseAccess :: forall backend m a. (MonadUnliftIO m, BackendCompatible backend SqlBackend, MonadLogger m) => ReaderT backend m a -> m a - appDatabaseAccess - | appDatabasePool = flip runSqlPool appDatabaseConnPool . withReaderT projectBackend - | otherwise = withPostgresqlConn (pgConnStr appDatabaseConf) . runSqlConn . withReaderT projectBackend + logFunc <- askLoggerIO + sqlPool' <- + let create = do + $logDebugS "SqlPool" "Opening connection..." + conn <- liftIO . PG.connectPostgreSQL $ pgConnStr appDatabaseConf + backend <- liftIO $ openSimpleConn logFunc conn + $logInfoS "SqlPool" "Opened connection" + return backend + destroy conn = do + $logDebugS "SqlPool" "Closing connection..." + liftIO $ connClose conn + $logInfoS "SqlPool" "Closed connection" + in Custom.createPool (liftIO . flip runLoggingT logFunc) create destroy (Just . fromIntegral $ pgPoolIdleTimeout appDatabaseConf) (Just $ pgPoolSize appDatabaseConf) + let sqlPool :: forall m. MonadIO m => Custom.Pool m SqlBackend + sqlPool = Custom.hoistPool (liftIO . flip runLoggingT logFunc) sqlPool' + void . Prometheus.register . poolMetrics PoolDatabaseConnections $ sqlPool @IO ldapPool <- traverse mkFailoverLabeled <=< forOf (traverse . traverse) appLdapConf $ \conf@LdapConf{..} -> do let ldapLabel = case ldapHost of @@ -262,33 +273,33 @@ makeFoundation appSettings''@AppSettings{..} = do if | appAutoDbMigrate -> do $logDebugS "setup" "Migration" - appDatabaseAccess migrateAll - | otherwise -> whenM (appDatabaseAccess requiresMigration) $ do + migrateAll `customRunSqlPool` sqlPool + | otherwise -> whenM (requiresMigration `customRunSqlPool` sqlPool) $ do $logErrorS "setup" "Migration required" liftIO . exitWith $ ExitFailure 130 $logDebugS "setup" "Cluster-Config" - appCryptoIDKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterCryptoIDKey - appSecretBoxKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterSecretBoxKey - appJSONWebKeySet <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterJSONWebKeySet - appClusterID <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterId - appVerpSecret <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterVerpSecret - appAuthKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterAuthKey + appCryptoIDKey <- clusterSetting (Proxy :: Proxy 'ClusterCryptoIDKey) `customRunSqlPool` sqlPool + appSecretBoxKey <- clusterSetting (Proxy :: Proxy 'ClusterSecretBoxKey) `customRunSqlPool` sqlPool + appJSONWebKeySet <- clusterSetting (Proxy :: Proxy 'ClusterJSONWebKeySet) `customRunSqlPool` sqlPool + appClusterID <- clusterSetting (Proxy :: Proxy 'ClusterId) `customRunSqlPool` sqlPool + appVerpSecret <- clusterSetting (Proxy :: Proxy 'ClusterVerpSecret) `customRunSqlPool` sqlPool + appAuthKey <- clusterSetting (Proxy :: Proxy 'ClusterAuthKey) `customRunSqlPool` sqlPool - needsRechunk <- appDatabaseAccess @SqlReadBackend $ exists [FileContentChunkContentBased !=. True] + needsRechunk <- exists [FileContentChunkContentBased !=. True] `customRunSqlPool` sqlPool let appSettings' = appSettings'' & _appRechunkFiles %~ guardOnM needsRechunk appMemcached <- for appMemcachedConf $ \memcachedConf -> do $logDebugS "setup" "Memcached" - memcachedKey <- appDatabaseAccess . clusterSetting $ Proxy @'ClusterMemcachedKey + memcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterMemcachedKey) `customRunSqlPool` sqlPool memcached <- createMemcached memcachedConf when appClearCache $ do $logWarnS "setup" "Clearing memcached" liftIO $ Memcached.flushAll memcached return (memcachedKey, memcached) - appSessionStore <- appDatabaseAccess $ mkSessionStore appSettings'' appDatabaseConnPool + appSessionStore <- mkSessionStore appSettings'' sqlPool `customRunSqlPool` sqlPool appUploadCache <- for appUploadCacheConf $ \minioConf -> liftIO $ do conn <- Minio.connect minioConf @@ -301,7 +312,7 @@ makeFoundation appSettings''@AppSettings{..} = do $logDebugS "Runtime configuration" $ tshow appSettings' - let foundation = mkFoundation appSettings' appDatabaseConnPool appDatabaseAccess smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey + let foundation = mkFoundation appSettings' sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey -- Return the foundation $logDebugS "setup" "Done" @@ -318,7 +329,9 @@ mkSessionStore :: forall m. , MonadThrow m , MonadResource m ) - => AppSettings -> ConnectionPool -> ReaderT SqlBackend m SomeSessionStorage + => AppSettings + -> (forall m'. MonadIO m' => Custom.Pool m' SqlBackend) + -> ReaderT SqlBackend m SomeSessionStorage mkSessionStore AppSettings{..} mcdSqlConnPool | Just mcdConf@MemcachedConf{..} <- appSessionMemcachedConf = do mcdSqlMemcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterServerSessionKey) @@ -652,7 +665,7 @@ shutdownApp :: (MonadIO m, MonadUnliftIO m) => UniWorX -> m () shutdownApp app = do stopJobCtl app liftIO $ do - destroyAllResources $ appDatabaseConnPool app + Custom.purgePool $ appConnPool app for_ (appSmtpPool app) destroyAllResources for_ (appLdapPool app) . mapFailover $ views _2 destroyAllResources for_ (appWidgetMemcached app) Memcached.close diff --git a/src/Foundation/DB.hs b/src/Foundation/DB.hs index 9730c3a5a..4d1e4d02f 100644 --- a/src/Foundation/DB.hs +++ b/src/Foundation/DB.hs @@ -13,15 +13,17 @@ import GHC.IO.Exception (IOErrorType(OtherError)) import Database.Persist.Sql (SqlReadBackend(..)) import Database.Persist.Sql.Raw.QQ (executeQQ) +import qualified Utils.Pool as Custom + runSqlPoolRetry :: forall m a backend. - ( MonadUnliftIO m + ( MonadUnliftIO m, BackendCompatible SqlBackend backend , MonadLogger m, MonadMask m ) - => (ReaderT backend m a -> m a) - -> ReaderT backend m a + => ReaderT backend m a + -> Custom.Pool m backend -> m a -runSqlPoolRetry dbAccess action = do +runSqlPoolRetry action pool = do let policy = Retry.fullJitterBackoff 1e3 & Retry.limitRetriesByCumulativeDelay 10e6 handlers = Retry.skipAsyncExceptions `snoc` Retry.logRetries suggestRetry logRetry where suggestRetry :: IOException -> m Bool @@ -39,10 +41,9 @@ runSqlPoolRetry dbAccess action = do Retry.recovering policy handlers $ \Retry.RetryStatus{..} -> do $logDebugS "runSqlPoolRetry" $ "rsIterNumber = " <> tshow rsIterNumber - dbAccess action + customRunSqlPool action pool runDBRead :: ReaderT SqlReadBackend (HandlerFor UniWorX) a -> (HandlerFor UniWorX) a runDBRead action = do $logDebugS "YesodPersist" "runDBRead" - dbAccess <- getsYesod appDatabaseAccess - runSqlPoolRetry dbAccess . withReaderT SqlReadBackend $ [executeQQ|SET TRANSACTION READ ONLY|] *> action + runSqlPoolRetry (withReaderT SqlReadBackend $ [executeQQ|SET TRANSACTION READ ONLY|] *> action) . appConnPool =<< getYesod diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index 3e70ec2ad..72f07d08f 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -6,7 +6,7 @@ module Foundation.Type , SomeSessionStorage(..) , _SessionStorageMemcachedSql, _SessionStorageAcid , SMTPPool - , _appSettings', _appStatic, _appDatabaseConnPool, _appDatabaseAccess, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport, _appMemcached, _appUploadCache, _appVerpSecret, _appAuthKey + , _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport, _appMemcached, _appUploadCache, _appVerpSecret, _appAuthKey , DB, Form, MsgRenderer, MailM, DBFile ) where @@ -26,6 +26,8 @@ import Network.Minio (MinioConn) import Data.IntervalMap.Strict (IntervalMap) +import qualified Utils.Pool as Custom + type SMTPPool = Pool SMTPConnection @@ -42,8 +44,7 @@ makePrisms ''SomeSessionStorage data UniWorX = UniWorX { appSettings' :: AppSettings , appStatic :: EmbeddedStatic -- ^ Settings for static file serving. - , appDatabaseConnPool :: Pool SqlBackend - , appDatabaseAccess :: forall backend m a. (MonadUnliftIO m, BackendCompatible backend SqlBackend, MonadLogger m) => ReaderT backend m a -> m a + , appConnPool :: forall m. MonadIO m => Custom.Pool m SqlBackend -- ^ Database connection pool. , appSmtpPool :: Maybe SMTPPool , appLdapPool :: Maybe (Failover (LdapConf, LdapPool)) , appWidgetMemcached :: Maybe Memcached.Connection -- ^ Actually a proper pool diff --git a/src/Foundation/Yesod/Persist.hs b/src/Foundation/Yesod/Persist.hs index e467da4fe..dc2c515aa 100644 --- a/src/Foundation/Yesod/Persist.hs +++ b/src/Foundation/Yesod/Persist.hs @@ -10,6 +10,11 @@ import Foundation.DB import Foundation.Authorization import Database.Persist.Sql (transactionUndo) +import qualified Database.Persist.Sql as SQL + +import qualified Utils.Pool as Custom + +import UnliftIO.Resource (allocate, unprotect) runDB :: ( YesodPersistBackend UniWorX ~ SqlBackend @@ -25,15 +30,34 @@ runDB action = do | dryRun = action <* transactionUndo | otherwise = action - dbAccess <- getsYesod appDatabaseAccess - runSqlPoolRetry dbAccess action' + runSqlPoolRetry action' . appConnPool =<< getYesod getDBRunner :: ( YesodPersistBackend UniWorX ~ SqlBackend , BearerAuthSite UniWorX ) => HandlerFor UniWorX (DBRunner UniWorX, HandlerFor UniWorX ()) getDBRunner = do - (DBRunner{..}, cleanup) <- defaultGetDBRunner appDatabaseConnPool + pool <- getsYesod appConnPool + UnliftIO{..} <- askUnliftIO + let withPrep conn f = f (persistBackend conn) (SQL.getStmtConn $ persistBackend conn) + (relKey, (conn, ident)) <- allocate + (do + (conn, ident) <- unliftIO $ Custom.takeResource pool + withPrep conn (\c f -> SQL.connBegin c f Nothing) + return (conn, ident) + ) + (\(conn, ident) -> do + withPrep conn SQL.connRollback + unliftIO $ Custom.releaseResource True pool (conn, ident) + ) + + let cleanup = liftIO $ do + withPrep conn SQL.connCommit + unliftIO $ Custom.releaseResource True pool (conn, ident) + void $ unprotect relKey + runDBRunner :: forall a. YesodDB UniWorX a -> HandlerFor UniWorX a + runDBRunner = flip runReaderT conn + return . (, cleanup) $ DBRunner (\action -> do dryRun <- isDryRun diff --git a/src/Settings.hs b/src/Settings.hs index 893170a89..9006adba0 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -90,7 +90,6 @@ data AppSettings = AppSettings , appWellKnownDir :: FilePath , appWellKnownLinkFile :: FilePath , appDatabaseConf :: PostgresConf - , appDatabasePool :: Bool -- ^ Configuration settings for accessing the database. , appAutoDbMigrate :: Bool , appLdapConf :: Maybe (PointedList LdapConf) @@ -517,7 +516,6 @@ instance FromJSON AppSettings where appWellKnownLinkFile <- o .: "well-known-link-file" appWebpackEntrypoints <- o .: "webpack-manifest" appDatabaseConf <- o .: "database" - appDatabasePool <- o .:? "database-pool" .!= True appAutoDbMigrate <- o .: "auto-db-migrate" let nonEmptyHost LdapConf{..} = case ldapHost of Ldap.Tls host _ -> not $ null host diff --git a/src/Utils/DB.hs b/src/Utils/DB.hs index 5a95de49f..5d80d7c00 100644 --- a/src/Utils/DB.hs +++ b/src/Utils/DB.hs @@ -14,6 +14,10 @@ import Control.Lens.Extras (is) import Control.Monad.Catch +import qualified Utils.Pool as Custom + +import Database.Persist.Sql (runSqlConn) + emptyOrIn :: PersistField typ => E.SqlExpr (E.Value typ) -> Set typ -> E.SqlExpr (E.Value Bool) @@ -154,3 +158,6 @@ selectMaybe fltrs opts = listToMaybe <$> selectList fltrs (LimitTo 1 : opts') LimitTo _ -> True _other -> False + +customRunSqlPool :: (MonadUnliftIO m, BackendCompatible SqlBackend backend) => ReaderT backend m a -> Custom.Pool m backend -> m a +customRunSqlPool act p = Custom.withResource p $ runSqlConn act diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 19f51277b..738be7aaf 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -20,6 +20,8 @@ module Utils.Metrics , LRUMetrics, LRULabel(..) , lruMetrics , InjectInhibitMetrics, injectInhibitMetrics + , PoolMetrics, PoolLabel(..) + , poolMetrics ) where import Import.NoModel hiding (Vector, Info) @@ -50,6 +52,8 @@ import qualified Data.IntervalMap.Strict as IntervalMap import qualified Data.Foldable as F +import qualified Utils.Pool as Custom + {-# ANN module ("HLint: ignore Use even" :: String) #-} @@ -328,6 +332,45 @@ injectInhibitMetrics tvar = Metric $ return (InjectInhibitMetrics, collectInject "Number of distinct time intervals in which we don't transfer some files from upload cache to db" hashesInfo = Info "uni2work_inject_inhibited_hashes_count" "Number of files which we don't transfer from upload cache to db during some interval" + +data PoolMetrics = PoolMetrics + +data PoolLabel = PoolDatabaseConnections + deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) + deriving anyclass (Universe, Finite) + +nullaryPathPiece ''PoolLabel $ camelToPathPiece' 1 + +poolMetrics :: PoolLabel + -> Custom.Pool m a + -> Metric PoolMetrics +poolMetrics lbl pool = Metric $ return (PoolMetrics, collectPoolMetrics) + where + labelPool = relabel "pool" $ toPathPiece lbl + + collectPoolMetrics = map labelPool <$> do + (available, inUse, usesCount) <- atomically $ (,,) + <$> Custom.getPoolAvailableCount pool + <*> Custom.getPoolInUseCount pool + <*> Custom.getPoolUsesCount pool + return + [ SampleGroup availableInfo GaugeType + [ Sample "uni2work_pool_available_count" [] . encodeUtf8 $ tshow available + ] + , SampleGroup inUseInfo GaugeType + [ Sample "uni2work_pool_in_use_count" [] . encodeUtf8 $ tshow inUse + ] + , SampleGroup usesInfo CounterType + [ Sample "uni2work_pool_uses_count" [] . encodeUtf8 $ tshow usesCount + ] + ] + + availableInfo = Info "uni2work_pool_available_count" + "Number of open resources available for taking" + inUseInfo = Info "uni2work_pool_in_use_count" + "Number of resources currently in use" + usesInfo = Info "uni2work_pool_uses_count" + "Number of takes executed against the pool" withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport diff --git a/src/Utils/Pool.hs b/src/Utils/Pool.hs new file mode 100644 index 000000000..3308a12ca --- /dev/null +++ b/src/Utils/Pool.hs @@ -0,0 +1,223 @@ +{-# OPTIONS_GHC -Wno-error=unused-top-binds #-} + +module Utils.Pool + ( Pool, hoistPool + , getPoolAvailableCount, getPoolInUseCount, getPoolUsesCount + , createPool + , purgePool + , withResource + , destroyResources + , takeResource, releaseResource + ) where + +import ClassyPrelude + +import qualified Data.IntMap.Strict as IntMap + +import UnliftIO.Async.Utils +import UnliftIO.Resource (MonadResource, register, release) +import UnliftIO.Concurrent (forkIO) + +import Data.Fixed + +import System.Clock + +import Control.Concurrent.STM.Delay +import Control.Concurrent.STM.TVar (stateTVar) + +import Control.Monad.Writer.Strict (runWriter) +import Control.Monad.Writer.Class (MonadWriter(..)) + +import Data.Semigroup (First(..)) + +import Utils.NTop + +-- + + +newtype PoolResourceIdent = PoolResourceIdent Int + deriving (Eq, Ord, Show, Typeable) + + +data Pool m a = Pool + { create :: m a + , destroy :: a -> m () + , idleTime :: !(Maybe Int) + , maxAvailable :: !(Maybe Int) + , resources :: !(TVar (PoolResources a)) + , aliveRef :: !(IORef ()) + } + +data PoolResources a = PoolResources + { inUseCount, availableCount :: !Int + , inUse :: !(IntMap a) + , available :: !(IntMap [a]) + , inUseTick :: !Int + } deriving (Functor) + + +hoistPool :: (forall b. m b -> n b) -> Pool m a -> Pool n a +hoistPool nat Pool{..} = Pool + { create = nat create + , destroy = nat . destroy + , .. + } + +getPoolAvailableCount, getPoolInUseCount, getPoolUsesCount :: Pool m a -> STM Int +getPoolAvailableCount Pool{..} = availableCount <$> readTVar resources +getPoolInUseCount Pool{..} = inUseCount <$> readTVar resources +getPoolUsesCount Pool{..} = inUseTick <$> readTVar resources + + +toSecond :: TimeSpec -> Int +toSecond = fromIntegral . sec + +currentSecond :: MonadIO m => m Int +currentSecond = liftIO $ toSecond <$> getTime Monotonic + + +createPool :: (MonadResource m, MonadUnliftIO m, MonadUnliftIO m') + => (forall b. m' b -> m b) + -> m' a -- ^ Create + -> (a -> m' ()) -- ^ Destroy + -> Maybe Int -- ^ Timeout in seconds + -> Maybe Int -- ^ Max available + -> m (Pool m' a) +createPool nat create destroy (fmap $ max 0 -> idleTime) (fmap $ max 0 -> maxAvailable) = do + let + inUseCount = 0 + availableCount = 0 + inUseTick = 0 + inUse = IntMap.empty + available = IntMap.empty + aliveRef <- newIORef () + resources <- newTVarIO PoolResources{..} + let pool = Pool{..} + + reaper' <- for idleTime $ allocateLinkedAsync . nat . reaper destroy resources + relKey <- withRunInIO $ \runInIO -> runInIO . register . runInIO $ do + traverse_ cancel reaper' + nat $ purgePool pool + void . mkWeakIORef aliveRef $ release relKey + + return pool + +purgePool :: MonadUnliftIO m => Pool m a -> m () +purgePool = destroyResources $ const True + +reaper :: MonadUnliftIO m => (a -> m ()) -> TVar (PoolResources a) -> Int -> m () +reaper destroy' resources' t = forever $ do + atomically . waitDelay =<< liftIO (newDelay i) + + cutoff <- subtract t <$> currentSecond + toDestroy <- atomically $ do + res@PoolResources{..} <- readTVar resources' + let (toDestroy, pivot, available'') = IntMap.splitLookup cutoff available + available' = maybe id (IntMap.insert cutoff) pivot available'' + writeTVar resources' res + { available = available' + , availableCount = availableCount - IntMap.size toDestroy + } + return toDestroy + forM_ toDestroy . mapM_ $ void . destroy' + + where + MkFixed (fromIntegral -> i) = 1 :: Micro + +takeResource :: MonadIO m => Pool m a -> m (a, PoolResourceIdent) +takeResource Pool{..} = do + takenAvailable <- atomically $ do + PoolResources{..} <- readTVar resources + case IntMap.maxViewWithKey available of + Just ((t, av : avs), available') -> do + let available'' + | null avs = available' + | otherwise = IntMap.insert t avs available' + availableCount' = pred availableCount + inUse' = IntMap.insert inUseTick av inUse + inUseCount' = succ inUseCount + inUseTick' = succ inUseTick + writeTVar resources PoolResources + { inUseCount = inUseCount' + , availableCount = availableCount' + , available = available'' + , inUseTick = inUseTick' + , inUse = inUse' + } + return $ Just (av, inUseTick) + _other -> return Nothing + case takenAvailable of + Just (av, resTick) -> return (av, PoolResourceIdent resTick) + Nothing -> do + newResource <- create + resTick <- atomically . stateTVar resources $ \res@PoolResources{..} -> + let inUseTick' = succ inUseTick + inUseCount' = succ inUseCount + inUse' = IntMap.insert inUseTick newResource inUse + in ( inUseTick + , res{ inUseCount = inUseCount', inUse = inUse', inUseTick = inUseTick' } + ) + return (newResource, PoolResourceIdent resTick) + +releaseResource :: MonadUnliftIO m + => Bool -- ^ Destroy resource and don't return to pool? + -> Pool m a + -> (a, PoolResourceIdent) + -> m () +releaseResource isLost p@Pool{..} (x, ident) + | isLost = do + markResourceLost p ident + void . forkIO $ destroy x + | otherwise + = markResourceAvailable p ident + +markResourceAvailable, markResourceLost :: MonadUnliftIO m => Pool m a -> PoolResourceIdent -> m () +markResourceAvailable = returnResource True +markResourceLost = returnResource False + +returnResource :: MonadUnliftIO m + => Bool -- ^ return to available + -> Pool m a + -> PoolResourceIdent + -> m () +returnResource toAvailable Pool{..} (PoolResourceIdent inUseKey) = do + now <- if | toAvailable -> Just <$> currentSecond + | otherwise -> return Nothing + toDestroy <- atomically . stateTVar resources $ \res@PoolResources{..} -> case deleteView inUseKey inUse of + Nothing -> (Nothing, res) + Just (u, us) | NTop (Just availableCount) >= NTop maxAvailable + -> (Just u,) res + { inUse = us + , inUseCount = pred inUseCount + } + Just (u, us) + -> (Nothing, ) PoolResources + { inUse = us + , inUseCount = pred inUseCount + , availableCount = bool id succ toAvailable availableCount + , available = maybe id (IntMap.alter $ Just . (u :) . fromMaybe []) now available + , inUseTick + } + + forM_ toDestroy $ void . forkIO . destroy + where + deleteView :: Int -> IntMap a -> Maybe (a, IntMap a) + deleteView k vs = (, vs') <$> fmap getFirst fv + where (vs', fv) = runWriter $ IntMap.alterF (\old -> Nothing <$ tell (First <$> old)) k vs + + +withResource :: forall b m a. MonadUnliftIO m => Pool m a -> (a -> m b) -> m b +withResource p act = bracketOnError (takeResource p) (releaseResource True p) (\x'@(x, _) -> act x <* releaseResource False p x') + +destroyResources :: MonadUnliftIO m => (a -> Bool) -> Pool m a -> m () +destroyResources p Pool{..} = do + toDestroy <- atomically . stateTVar resources $ \res@PoolResources{..} + -> let partitioned = partition p <$> available + toDel = foldMap fst partitioned + toKeep = IntMap.mapMaybe (\(_, toKeep') -> toKeep' <$ guard (not $ null toKeep')) partitioned + in (toDel, ) res + { availableCount = availableCount - length toDel + , available = toKeep + } + + forM_ toDestroy $ void . forkIO . destroy diff --git a/src/Web/ServerSession/Backend/Persistent/Memcached.hs b/src/Web/ServerSession/Backend/Persistent/Memcached.hs index b2a56a396..9b6753ce7 100644 --- a/src/Web/ServerSession/Backend/Persistent/Memcached.hs +++ b/src/Web/ServerSession/Backend/Persistent/Memcached.hs @@ -14,7 +14,7 @@ import Utils.Lens import Web.ServerSession.Core -import Database.Persist.Sql (ConnectionPool, runSqlPool) +import qualified Utils.Pool as Custom import qualified Data.Binary as Binary @@ -46,7 +46,7 @@ share [mkPersist sqlSettings, mkMigrate "migrateMemcachedSqlStorage"] data MemcachedSqlStorage sess = MemcachedSqlStorage - { mcdSqlConnPool :: ConnectionPool + { mcdSqlConnPool :: forall m. MonadIO m => Custom.Pool m SqlBackend , mcdSqlMemcached :: Memcached.Connection , mcdSqlMemcachedKey :: AEAD.Key , mcdSqlMemcachedExpiration :: Maybe NominalDiffTime @@ -108,7 +108,7 @@ instance (IsSessionData sess, Binary (Decomposed sess)) => Storage (MemcachedSql type SessionData (MemcachedSqlStorage sess) = sess type TransactionM (MemcachedSqlStorage sess) = SqlPersistT IO - runTransactionM MemcachedSqlStorage{..} = flip runSqlPool mcdSqlConnPool + runTransactionM MemcachedSqlStorage{..} = flip customRunSqlPool mcdSqlConnPool getSession MemcachedSqlStorage{..} sessId = exceptT (maybe (return Nothing) throwM) (return . Just) $ do encSession <- catchIfExceptT (const Nothing) Memcached.isKeyNotFound . liftIO . fmap LBS.toStrict $ Memcached.getAndTouch_ expiry (memcachedSqlSessionId # sessId) mcdSqlMemcached diff --git a/test/Database.hs b/test/Database.hs index 87f85aa84..72fe69707 100755 --- a/test/Database.hs +++ b/test/Database.hs @@ -7,8 +7,6 @@ module Database import "uniworx" Import hiding (Option(..), getArgs) import "uniworx" Application (db', getAppSettings) -import UnliftIO.Pool (destroyAllResources) - import Database.Persist.Postgresql import Control.Monad.Logger @@ -21,6 +19,8 @@ import Database.Persist.Sql.Raw.QQ import Database.Fill (fillDb) +import qualified Utils.Pool as Custom + data DBAction = DBClear | DBTruncate @@ -48,7 +48,7 @@ main = do [executeQQ|drop owned by current_user|] :: ReaderT SqlBackend _ () DBTruncate -> db' $ do foundation <- getYesod - liftIO . destroyAllResources $ appDatabaseConnPool foundation + Custom.purgePool $ appConnPool foundation truncateDb DBMigrate -> db' $ return () DBFill -> db' $ fillDb diff --git a/test/TestImport.hs b/test/TestImport.hs index 3cd9539d4..be362d41d 100644 --- a/test/TestImport.hs +++ b/test/TestImport.hs @@ -13,7 +13,6 @@ import ClassyPrelude as X ) import Database.Persist as X hiding (get) import Database.Persist.Sql as X (SqlPersistM) -import Database.Persist.Sql (runSqlPersistMPool) import Foundation as X import Model as X import Test.Hspec as X @@ -71,6 +70,9 @@ import Utils.Parameters (GlobalPostParam(PostLoginDummy)) import Control.Monad.Morph as X (generalize) +import Control.Monad.Logger (runNoLoggingT) +import Utils.DB (customRunSqlPool) + runDB :: SqlPersistM a -> YesodExample UniWorX a runDB query = do @@ -78,7 +80,7 @@ runDB query = do liftIO $ runDBWithApp app query runDBWithApp :: MonadIO m => UniWorX -> SqlPersistM a -> m a -runDBWithApp app query = liftIO $ runSqlPersistMPool query (appDatabaseConnPool app) +runDBWithApp app query = liftIO . runResourceT . runNoLoggingT . customRunSqlPool query $ appConnPool app runHandler :: Handler a -> YesodExample UniWorX a runHandler handler = do