From db48bbb7765604aaab8f8d5c540793b1ceaff16a Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Sat, 20 Feb 2021 11:16:47 +0100 Subject: [PATCH] fix(jobs): use more read only/deferrable transactions --- src/Foundation/DB.hs | 3 ++- src/Jobs.hs | 28 +++++++++++---------- src/Jobs/Crontab.hs | 6 ++--- src/Jobs/Handler/Files.hs | 10 +++----- src/Jobs/Types.hs | 1 + src/Utils/Sql.hs | 51 +++++++++++++++++++++++++++++---------- 6 files changed, 63 insertions(+), 36 deletions(-) diff --git a/src/Foundation/DB.hs b/src/Foundation/DB.hs index 5261af6a2..f871cc026 100644 --- a/src/Foundation/DB.hs +++ b/src/Foundation/DB.hs @@ -11,6 +11,7 @@ import qualified Control.Retry as Retry import GHC.IO.Exception (IOErrorType(OtherError)) import Database.Persist.Sql (runSqlPool, SqlReadBackend(..)) +import Database.Persist.Sql.Raw.QQ (executeQQ) runSqlPoolRetry :: forall m a backend. @@ -43,4 +44,4 @@ runSqlPoolRetry action pool = do runDBRead :: ReaderT SqlReadBackend (HandlerFor UniWorX) a -> (HandlerFor UniWorX) a runDBRead action = do $logDebugS "YesodPersist" "runDBRead" - runSqlPoolRetry (withReaderT SqlReadBackend action) . appConnPool =<< getYesod + runSqlPoolRetry (withReaderT SqlReadBackend $ [executeQQ|SET TRANSACTION READ ONLY|] *> action) . appConnPool =<< getYesod diff --git a/src/Jobs.hs b/src/Jobs.hs index b6de74e24..b61556caa 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -339,8 +339,8 @@ execCrontab :: RWST JobState () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWo -- seen, wait for the time of the next job and fire it execCrontab = do let - mergeState :: MonadResource m => RWST _ () _ (ReaderT SqlBackend m) () - mergeState = do + mergeState :: (MonadResource m, BackendCompatible SqlReadBackend backend) => RWST _ () _ (ReaderT backend m) () + mergeState = mapRWST (withReaderT $ projectBackend @SqlReadBackend) $ do let mergeLastExec (Entity _leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob @@ -353,7 +353,7 @@ execCrontab = do | otherwise = return () runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued - mapRWST (liftHandler . runDB . setSerializableBatch) mergeState + mapRWST (liftHandler . runDBRead . setSerializableReadOnlyBatch) mergeState refT <- liftIO getCurrentTime settings <- getsYesod appSettings' @@ -555,16 +555,21 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker JobHandlerException act -> do act & withJobWorkerState wNum (JobWorkerExecJob content) runDB $ setSerializableBatch cleanup - JobHandlerAtomicWithFinalizer act fin -> do - res <- runDBJobs . setSerializableBatch $ do - res <- act & withJobWorkerState wNum (JobWorkerExecJob content) - hoist lift cleanup - return res - fin res + JobHandlerAtomicWithFinalizer act fin -> + withJobWorkerState wNum (JobWorkerExecJob content) $ do + fin <=< runDBJobs . setSerializableBatch $ + act <* hoist lift cleanup + JobHandlerAtomicDeferrableWithFinalizer act fin -> do + withJobWorkerState wNum (JobWorkerExecJob content) $ + fin =<< runDBRead (setSerializableDeferrableBatch act) + runDB $ setSerializableBatch cleanup handleCmd JobCtlDetermineCrontab = do $logDebugS logIdent "DetermineCrontab..." - newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab' + newCTab <- liftHandler . runDBRead $ setSerializableReadOnlyBatch determineCrontab $logInfoS logIdent "DetermineCrontab" + $logDebugS logIdent "PruneLastExecs..." + liftHandler . runDB $ pruneLastExecs newCTab + $logInfoS logIdent "PruneLastExecs" -- logDebugS logIdent $ tshow newCTab mapReaderT (liftIO . atomically) $ lift . flip writeTVar newCTab =<< asks jobCrontab @@ -713,9 +718,6 @@ pruneLastExecs crontab = do | otherwise -> return mempty -determineCrontab' :: DB (Crontab JobCtl) -determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab - performJob :: Job -> JobHandler UniWorX performJob = $(dispatchTH ''Job) diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index f29d5e03d..d96247af0 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -33,7 +33,7 @@ import Data.List (iterate) prewarmCacheIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)]) prewarmCacheIntervalsCache = unsafePerformIO $ newTVarIO Map.empty -determineCrontab :: DB (Crontab JobCtl) +determineCrontab :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Crontab JobCtl) -- ^ Extract all future jobs from the database (sheet deadlines, ...) determineCrontab = execWriterT $ do UniWorX{ appSettings' = AppSettings{..} } <- getYesod @@ -61,7 +61,7 @@ determineCrontab = execWriterT $ do Nothing -> mempty let - tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (YesodDB UniWorX) () + tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (ReaderT SqlReadBackend (HandlerFor UniWorX)) () tellPrewarmJobs jcPrewarmSource jcTargetTime = maybeT (return ()) $ do PrewarmCacheConf{..} <- hoistMaybe appFileSourcePrewarmConf @@ -361,7 +361,7 @@ determineCrontab = execWriterT $ do } let - correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB () + correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) (ReaderT SqlReadBackend (HandlerFor UniWorX)) () correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton (JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } ) Cron diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 0427b001d..f4e06a475 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -36,8 +36,6 @@ import System.Clock import qualified Data.Set as Set import qualified Data.Sequence as Seq -import Jobs.Queue (YesodJobDB) - import Jobs.Handler.Intervals.Utils import Data.IntervalMap.Strict (IntervalMap) @@ -86,10 +84,10 @@ workflowFileReferences = Map.fromList $ over (traverse . _1) nameToPathPiece dispatchJobDetectMissingFiles :: JobHandler UniWorX -dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin +dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin where - act :: YesodJobDB UniWorX (Map Text (NonNull (Set FileContentReference))) - act = hoist lift $ do + act :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Map Text (NonNull (Set FileContentReference))) + act = do uploadBucket <- getsYesod $ view _appUploadCacheBucket missingDb <- runConduit . execStateC Map.empty $ do @@ -105,7 +103,7 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin transPipe lift (E.selectSource fileReferencesQuery) .| C.mapMaybe E.unValue .| C.mapM_ (insertRef refKind) iforM_ workflowFileReferences $ \refKind refSource -> - transPipe lift (refSource .| C.filterM (\ref -> not <$> exists [FileContentEntryHash ==. ref])) .| C.mapM_ (insertRef refKind) + transPipe (lift . withReaderT projectBackend) (refSource .| C.filterM (\ref -> not <$> exists [FileContentEntryHash ==. ref])) .| C.mapM_ (insertRef refKind) let allMissingDb :: Set Minio.Object allMissingDb = setOf (folded . folded . re minioFileReference) missingDb diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index f83311d34..7ebb4bf4c 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -241,6 +241,7 @@ data JobHandler site = JobHandlerAtomic (YesodJobDB site ()) | JobHandlerException (HandlerFor site ()) | forall a. JobHandlerAtomicWithFinalizer (YesodJobDB site a) (a -> HandlerFor site ()) + | forall a. JobHandlerAtomicDeferrableWithFinalizer (ReaderT SqlReadBackend (HandlerFor site) a) (a -> HandlerFor site ()) deriving (Typeable) makePrisms ''JobHandler diff --git a/src/Utils/Sql.hs b/src/Utils/Sql.hs index c892e56fa..dfc09d8ac 100644 --- a/src/Utils/Sql.hs +++ b/src/Utils/Sql.hs @@ -1,5 +1,7 @@ module Utils.Sql - ( setSerializable, setSerializableBatch, setSerializable' + ( setSerializable + , setSerializableBatch, setSerializableReadOnlyBatch, setSerializableDeferrableBatch + , SerializableMode(..), setSerializable' , catchSql, handleSql , isUniqueConstraintViolation , catchIfSql, handleIfSql @@ -13,7 +15,9 @@ import Database.PostgreSQL.Simple (SqlError(..)) import Database.PostgreSQL.Simple.Errors (isSerializationError) import Control.Monad.Catch -import Database.Persist.Sql +import Database.Persist.Sql hiding (IsolationLevel(..)) +import qualified Database.Persist.Sql as Persist (IsolationLevel(..)) +import Database.Persist.Sql.Types.Instances () import Database.Persist.Sql.Raw.QQ import qualified Data.ByteString as ByteString @@ -29,6 +33,10 @@ import Text.Shakespeare.Text (st) import Control.Concurrent.Async (ExceptionInLinkedThread(..)) +import Data.Universe + +import Control.Monad.Trans.Reader (withReaderT) + fromExceptionWrapped :: Exception exc => SomeException -> Maybe exc fromExceptionWrapped (fromException -> Just exc) = Just exc @@ -36,43 +44,60 @@ fromExceptionWrapped (fromException >=> \(ExceptionInLinkedThread _ exc') -> fro fromExceptionWrapped _ = Nothing +data SerializableMode = Serializable + | SerializableReadOnly + | SerializableReadOnlyDeferrable + deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) + deriving anyclass (Universe, Finite) + setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a -setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 +setSerializable = setSerializable' Serializable $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 setSerializableBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a -setSerializableBatch = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6 +setSerializableBatch = setSerializable' Serializable $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6 + +setSerializableReadOnlyBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (ReaderT SqlReadBackend m)) => ReaderT SqlReadBackend m a -> ReaderT SqlReadBackend m a +setSerializableReadOnlyBatch = setSerializable' SerializableReadOnly $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6 -setSerializable' :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => RetryPolicyM (SqlPersistT m) -> SqlPersistT m a -> ReaderT SqlBackend m a -setSerializable' policy act = do +setSerializableDeferrableBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (ReaderT SqlReadBackend m)) => ReaderT SqlReadBackend m a -> ReaderT SqlReadBackend m a +setSerializableDeferrableBatch = setSerializable' SerializableReadOnlyDeferrable $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6 + +setSerializable' :: forall backend m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (ReaderT backend m), BackendCompatible SqlBackend backend) => SerializableMode -> RetryPolicyM (ReaderT backend m) -> ReaderT backend m a -> ReaderT backend m a +setSerializable' mode policy act = do LogSettings{logSerializableTransactionRetryLimit} <- readLogSettings didCommit <- newTVarIO False recovering policy (skipAsyncExceptions `snoc` logRetries suggestRetry (logRetry logSerializableTransactionRetryLimit)) $ act' didCommit where - suggestRetry :: SomeException -> ReaderT SqlBackend m Bool + suggestRetry :: SomeException -> ReaderT backend m Bool suggestRetry = return . maybe False isSerializationError . fromExceptionWrapped logRetry :: Maybe Natural -> Bool -- ^ Will retry -> SomeException -> RetryStatus - -> ReaderT SqlBackend m () + -> ReaderT backend m () logRetry _ shouldRetry@False err status = $logErrorS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status logRetry (Just limit) shouldRetry err status | fromIntegral limit <= rsIterNumber status = $logInfoS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status logRetry _ shouldRetry err status = $logDebugS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status - act' :: TVar Bool -> RetryStatus -> ReaderT SqlBackend m a + (setTransactionLevel, beginTransactionLevel) = case mode of + Serializable -> ([executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|], [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE|] ) + SerializableReadOnly -> ([executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY|], [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY|] ) + SerializableReadOnlyDeferrable -> ([executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE|], [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE|]) + + act' :: TVar Bool -> RetryStatus -> ReaderT backend m a act' didCommit RetryStatus{..} = do prevCommited <- atomically $ swapTVar didCommit False $logDebugS "SQL.setSerializable" $ "prevCommited = " <> tshow prevCommited <> "; rsIterNumber = " <> tshow rsIterNumber if - | rsIterNumber == 0 -> [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act'' - | prevCommited -> [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act'' - | otherwise -> transactionUndoWithIsolation Serializable *> act'' + | rsIterNumber == 0 -> setTransactionLevel *> act'' + | prevCommited -> beginTransactionLevel *> act'' + | otherwise -> withReaderT projectBackend transactionUndo *> setTransactionLevel *> act'' where act'' = do res <- act atomically $ writeTVar didCommit True - transactionSaveWithIsolation ReadCommitted + withReaderT projectBackend $ transactionSaveWithIsolation Persist.ReadCommitted return res catchSql :: forall e m a. (MonadCatch m, MonadIO m, Exception e) => SqlPersistT m a -> (e -> SqlPersistT m a) -> SqlPersistT m a