fix(jobs): use more read only/deferrable transactions

This commit is contained in:
Gregor Kleen 2021-02-20 11:16:47 +01:00
parent ca22061e72
commit db48bbb776
6 changed files with 63 additions and 36 deletions

View File

@ -11,6 +11,7 @@ import qualified Control.Retry as Retry
import GHC.IO.Exception (IOErrorType(OtherError))
import Database.Persist.Sql (runSqlPool, SqlReadBackend(..))
import Database.Persist.Sql.Raw.QQ (executeQQ)
runSqlPoolRetry :: forall m a backend.
@ -43,4 +44,4 @@ runSqlPoolRetry action pool = do
runDBRead :: ReaderT SqlReadBackend (HandlerFor UniWorX) a -> (HandlerFor UniWorX) a
runDBRead action = do
$logDebugS "YesodPersist" "runDBRead"
runSqlPoolRetry (withReaderT SqlReadBackend action) . appConnPool =<< getYesod
runSqlPoolRetry (withReaderT SqlReadBackend $ [executeQQ|SET TRANSACTION READ ONLY|] *> action) . appConnPool =<< getYesod

View File

@ -339,8 +339,8 @@ execCrontab :: RWST JobState () (HashMap JobCtl (Max UTCTime)) (HandlerFor UniWo
-- seen, wait for the time of the next job and fire it
execCrontab = do
let
mergeState :: MonadResource m => RWST _ () _ (ReaderT SqlBackend m) ()
mergeState = do
mergeState :: (MonadResource m, BackendCompatible SqlReadBackend backend) => RWST _ () _ (ReaderT backend m) ()
mergeState = mapRWST (withReaderT $ projectBackend @SqlReadBackend) $ do
let
mergeLastExec (Entity _leId CronLastExec{..})
| Just job <- Aeson.parseMaybe parseJSON cronLastExecJob
@ -353,7 +353,7 @@ execCrontab = do
| otherwise = return ()
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec
runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued
mapRWST (liftHandler . runDB . setSerializableBatch) mergeState
mapRWST (liftHandler . runDBRead . setSerializableReadOnlyBatch) mergeState
refT <- liftIO getCurrentTime
settings <- getsYesod appSettings'
@ -555,16 +555,21 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
JobHandlerException act -> do
act & withJobWorkerState wNum (JobWorkerExecJob content)
runDB $ setSerializableBatch cleanup
JobHandlerAtomicWithFinalizer act fin -> do
res <- runDBJobs . setSerializableBatch $ do
res <- act & withJobWorkerState wNum (JobWorkerExecJob content)
hoist lift cleanup
return res
fin res
JobHandlerAtomicWithFinalizer act fin ->
withJobWorkerState wNum (JobWorkerExecJob content) $ do
fin <=< runDBJobs . setSerializableBatch $
act <* hoist lift cleanup
JobHandlerAtomicDeferrableWithFinalizer act fin -> do
withJobWorkerState wNum (JobWorkerExecJob content) $
fin =<< runDBRead (setSerializableDeferrableBatch act)
runDB $ setSerializableBatch cleanup
handleCmd JobCtlDetermineCrontab = do
$logDebugS logIdent "DetermineCrontab..."
newCTab <- liftHandler . runDB $ setSerializableBatch determineCrontab'
newCTab <- liftHandler . runDBRead $ setSerializableReadOnlyBatch determineCrontab
$logInfoS logIdent "DetermineCrontab"
$logDebugS logIdent "PruneLastExecs..."
liftHandler . runDB $ pruneLastExecs newCTab
$logInfoS logIdent "PruneLastExecs"
-- logDebugS logIdent $ tshow newCTab
mapReaderT (liftIO . atomically) $
lift . flip writeTVar newCTab =<< asks jobCrontab
@ -713,9 +718,6 @@ pruneLastExecs crontab = do
| otherwise
-> return mempty
determineCrontab' :: DB (Crontab JobCtl)
determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab
performJob :: Job -> JobHandler UniWorX
performJob = $(dispatchTH ''Job)

View File

@ -33,7 +33,7 @@ import Data.List (iterate)
prewarmCacheIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
prewarmCacheIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
determineCrontab :: DB (Crontab JobCtl)
determineCrontab :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Crontab JobCtl)
-- ^ Extract all future jobs from the database (sheet deadlines, ...)
determineCrontab = execWriterT $ do
UniWorX{ appSettings' = AppSettings{..} } <- getYesod
@ -61,7 +61,7 @@ determineCrontab = execWriterT $ do
Nothing -> mempty
let
tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (YesodDB UniWorX) ()
tellPrewarmJobs :: JobCtlPrewarmSource -> UTCTime -> WriterT (Crontab JobCtl) (ReaderT SqlReadBackend (HandlerFor UniWorX)) ()
tellPrewarmJobs jcPrewarmSource jcTargetTime = maybeT (return ()) $ do
PrewarmCacheConf{..} <- hoistMaybe appFileSourcePrewarmConf
@ -361,7 +361,7 @@ determineCrontab = execWriterT $ do
}
let
correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) DB ()
correctorNotifications :: Map (UserId, SheetId) (Max UTCTime) -> WriterT (Crontab JobCtl) (ReaderT SqlReadBackend (HandlerFor UniWorX)) ()
correctorNotifications = (tell .) . Map.foldMapWithKey $ \(nUser, nSheet) (Max time) -> HashMap.singleton
(JobCtlQueue $ JobQueueNotification NotificationCorrectionsAssigned { nUser, nSheet } )
Cron

View File

@ -36,8 +36,6 @@ import System.Clock
import qualified Data.Set as Set
import qualified Data.Sequence as Seq
import Jobs.Queue (YesodJobDB)
import Jobs.Handler.Intervals.Utils
import Data.IntervalMap.Strict (IntervalMap)
@ -86,10 +84,10 @@ workflowFileReferences = Map.fromList $ over (traverse . _1) nameToPathPiece
dispatchJobDetectMissingFiles :: JobHandler UniWorX
dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin
dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin
where
act :: YesodJobDB UniWorX (Map Text (NonNull (Set FileContentReference)))
act = hoist lift $ do
act :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Map Text (NonNull (Set FileContentReference)))
act = do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
missingDb <- runConduit . execStateC Map.empty $ do
@ -105,7 +103,7 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin
transPipe lift (E.selectSource fileReferencesQuery) .| C.mapMaybe E.unValue .| C.mapM_ (insertRef refKind)
iforM_ workflowFileReferences $ \refKind refSource ->
transPipe lift (refSource .| C.filterM (\ref -> not <$> exists [FileContentEntryHash ==. ref])) .| C.mapM_ (insertRef refKind)
transPipe (lift . withReaderT projectBackend) (refSource .| C.filterM (\ref -> not <$> exists [FileContentEntryHash ==. ref])) .| C.mapM_ (insertRef refKind)
let allMissingDb :: Set Minio.Object
allMissingDb = setOf (folded . folded . re minioFileReference) missingDb

View File

@ -241,6 +241,7 @@ data JobHandler site
= JobHandlerAtomic (YesodJobDB site ())
| JobHandlerException (HandlerFor site ())
| forall a. JobHandlerAtomicWithFinalizer (YesodJobDB site a) (a -> HandlerFor site ())
| forall a. JobHandlerAtomicDeferrableWithFinalizer (ReaderT SqlReadBackend (HandlerFor site) a) (a -> HandlerFor site ())
deriving (Typeable)
makePrisms ''JobHandler

View File

@ -1,5 +1,7 @@
module Utils.Sql
( setSerializable, setSerializableBatch, setSerializable'
( setSerializable
, setSerializableBatch, setSerializableReadOnlyBatch, setSerializableDeferrableBatch
, SerializableMode(..), setSerializable'
, catchSql, handleSql
, isUniqueConstraintViolation
, catchIfSql, handleIfSql
@ -13,7 +15,9 @@ import Database.PostgreSQL.Simple (SqlError(..))
import Database.PostgreSQL.Simple.Errors (isSerializationError)
import Control.Monad.Catch
import Database.Persist.Sql
import Database.Persist.Sql hiding (IsolationLevel(..))
import qualified Database.Persist.Sql as Persist (IsolationLevel(..))
import Database.Persist.Sql.Types.Instances ()
import Database.Persist.Sql.Raw.QQ
import qualified Data.ByteString as ByteString
@ -29,6 +33,10 @@ import Text.Shakespeare.Text (st)
import Control.Concurrent.Async (ExceptionInLinkedThread(..))
import Data.Universe
import Control.Monad.Trans.Reader (withReaderT)
fromExceptionWrapped :: Exception exc => SomeException -> Maybe exc
fromExceptionWrapped (fromException -> Just exc) = Just exc
@ -36,43 +44,60 @@ fromExceptionWrapped (fromException >=> \(ExceptionInLinkedThread _ exc') -> fro
fromExceptionWrapped _ = Nothing
data SerializableMode = Serializable
| SerializableReadOnly
| SerializableReadOnlyDeferrable
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
deriving anyclass (Universe, Finite)
setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a
setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
setSerializable = setSerializable' Serializable $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
setSerializableBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a
setSerializableBatch = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6
setSerializableBatch = setSerializable' Serializable $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6
setSerializableReadOnlyBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (ReaderT SqlReadBackend m)) => ReaderT SqlReadBackend m a -> ReaderT SqlReadBackend m a
setSerializableReadOnlyBatch = setSerializable' SerializableReadOnly $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6
setSerializable' :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => RetryPolicyM (SqlPersistT m) -> SqlPersistT m a -> ReaderT SqlBackend m a
setSerializable' policy act = do
setSerializableDeferrableBatch :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (ReaderT SqlReadBackend m)) => ReaderT SqlReadBackend m a -> ReaderT SqlReadBackend m a
setSerializableDeferrableBatch = setSerializable' SerializableReadOnlyDeferrable $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 3600e6
setSerializable' :: forall backend m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (ReaderT backend m), BackendCompatible SqlBackend backend) => SerializableMode -> RetryPolicyM (ReaderT backend m) -> ReaderT backend m a -> ReaderT backend m a
setSerializable' mode policy act = do
LogSettings{logSerializableTransactionRetryLimit} <- readLogSettings
didCommit <- newTVarIO False
recovering policy (skipAsyncExceptions `snoc` logRetries suggestRetry (logRetry logSerializableTransactionRetryLimit)) $ act' didCommit
where
suggestRetry :: SomeException -> ReaderT SqlBackend m Bool
suggestRetry :: SomeException -> ReaderT backend m Bool
suggestRetry = return . maybe False isSerializationError . fromExceptionWrapped
logRetry :: Maybe Natural
-> Bool -- ^ Will retry
-> SomeException
-> RetryStatus
-> ReaderT SqlBackend m ()
-> ReaderT backend m ()
logRetry _ shouldRetry@False err status = $logErrorS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status
logRetry (Just limit) shouldRetry err status
| fromIntegral limit <= rsIterNumber status = $logInfoS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status
logRetry _ shouldRetry err status = $logDebugS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status
act' :: TVar Bool -> RetryStatus -> ReaderT SqlBackend m a
(setTransactionLevel, beginTransactionLevel) = case mode of
Serializable -> ([executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|], [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE|] )
SerializableReadOnly -> ([executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY|], [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY|] )
SerializableReadOnlyDeferrable -> ([executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE|], [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE|])
act' :: TVar Bool -> RetryStatus -> ReaderT backend m a
act' didCommit RetryStatus{..} = do
prevCommited <- atomically $ swapTVar didCommit False
$logDebugS "SQL.setSerializable" $ "prevCommited = " <> tshow prevCommited <> "; rsIterNumber = " <> tshow rsIterNumber
if
| rsIterNumber == 0 -> [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act''
| prevCommited -> [executeQQ|BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act''
| otherwise -> transactionUndoWithIsolation Serializable *> act''
| rsIterNumber == 0 -> setTransactionLevel *> act''
| prevCommited -> beginTransactionLevel *> act''
| otherwise -> withReaderT projectBackend transactionUndo *> setTransactionLevel *> act''
where act'' = do
res <- act
atomically $ writeTVar didCommit True
transactionSaveWithIsolation ReadCommitted
withReaderT projectBackend $ transactionSaveWithIsolation Persist.ReadCommitted
return res
catchSql :: forall e m a. (MonadCatch m, MonadIO m, Exception e) => SqlPersistT m a -> (e -> SqlPersistT m a) -> SqlPersistT m a