From e5acdad134a7c4d8457c1cf8755d69306a52dd9e Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 12 May 2020 10:24:23 +0200 Subject: [PATCH] fix(sql): fix transaction behaviour of setSerializable Fixes #535 --- src/Jobs.hs | 19 +++++++++++++++++-- src/Utils/Sql.hs | 15 +++++++++------ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/Jobs.hs b/src/Jobs.hs index 65e0335e2..a794b16a2 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -37,6 +37,7 @@ import Control.Monad.Writer.Class (MonadWriter(..)) import Control.Monad.Trans.Cont (ContT(..), callCC) import Control.Monad.Random.Lazy (evalRandTIO, mapRandT) import Control.Monad.Logger +import qualified Control.Monad.Catch as Exc import Data.Time.Zones @@ -63,6 +64,8 @@ import Jobs.Handler.PruneFiles import Jobs.HealthReport +import Control.Exception.Base (AsyncException) + data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime @@ -355,7 +358,7 @@ mkLogIdent wId = "Job-Executor " <> showWorkerId wId handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) () handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do $logDebugS logIdent $ tshow jctl - res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl + res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try' $ handleCmd jctl sentRes <- mapReaderT (liftIO . atomically) $ do resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm) lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars) @@ -365,6 +368,18 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do _other -> return () where logIdent = mkLogIdent wNum + + try' = flip catches + [ Exc.Handler $ \(e :: AsyncException) -> throwM e + , Exc.Handler $ \(e :: SomeAsyncException) -> throwM e +#ifdef DEVELOPMENT + , Exc.Handler $ \case + MailNotAvailable -> return $ Right () + e -> return . Left $ SomeException e + , Exc.Handler $ \SynchroniseLdapNoLdap -> return $ Right () +#endif + , Exc.Handler $ \(e :: SomeException) -> return $ Left e + ] . fmap Right handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j @@ -446,7 +461,7 @@ jLocked jId act = do | otherwise -> throwM $ JLocked jId lockInstance lockTime when hadStale . - $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow qj + $logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj) val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID' , QueuedJobLockTime =. Just now ] diff --git a/src/Utils/Sql.hs b/src/Utils/Sql.hs index 97805b159..fabac30ec 100644 --- a/src/Utils/Sql.hs +++ b/src/Utils/Sql.hs @@ -4,7 +4,8 @@ module Utils.Sql import ClassyPrelude.Yesod -import Database.PostgreSQL.Simple (SqlError(SqlError), sqlErrorHint) +import Database.PostgreSQL.Simple (SqlError) +import Database.PostgreSQL.Simple.Errors (isSerializationError) import Control.Monad.Catch (MonadMask) import Database.Persist.Sql @@ -16,21 +17,23 @@ import Control.Lens ((&)) setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m) => ReaderT SqlBackend m a -> ReaderT SqlBackend m a -setSerializable act = recovering policy [logRetries suggestRetry logRetry] act' +setSerializable act = recovering policy (skipAsyncExceptions `snoc` logRetries suggestRetry logRetry) act' where policy :: RetryPolicyM (ReaderT SqlBackend m) policy = fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 suggestRetry :: SqlError -> ReaderT SqlBackend m Bool - suggestRetry SqlError{sqlErrorHint} = return $ "The transaction might succeed if retried." `isInfixOf` sqlErrorHint + suggestRetry = return . isSerializationError logRetry :: Bool -- ^ Will retry -> SqlError -> RetryStatus -> ReaderT SqlBackend m () - logRetry shouldRetry err status = $logDebugS "SQL" . pack $ defaultLogMsg shouldRetry err status + logRetry shouldRetry@False err status = $logErrorS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status + logRetry shouldRetry@True err status = $logDebugS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status act' :: RetryStatus -> ReaderT SqlBackend m a act' RetryStatus{..} - | rsIterNumber == 0 = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act - | otherwise = transactionUndoWithIsolation Serializable *> act + | rsIterNumber == 0 = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act'' + | otherwise = [executeQQ|ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act'' + where act'' = act <* transactionSaveWithIsolation ReadCommitted