parent
6626cecd3a
commit
e5acdad134
19
src/Jobs.hs
19
src/Jobs.hs
@ -37,6 +37,7 @@ import Control.Monad.Writer.Class (MonadWriter(..))
|
||||
import Control.Monad.Trans.Cont (ContT(..), callCC)
|
||||
import Control.Monad.Random.Lazy (evalRandTIO, mapRandT)
|
||||
import Control.Monad.Logger
|
||||
import qualified Control.Monad.Catch as Exc
|
||||
|
||||
import Data.Time.Zones
|
||||
|
||||
@ -63,6 +64,8 @@ import Jobs.Handler.PruneFiles
|
||||
|
||||
import Jobs.HealthReport
|
||||
|
||||
import Control.Exception.Base (AsyncException)
|
||||
|
||||
|
||||
data JobQueueException = JInvalid QueuedJobId QueuedJob
|
||||
| JLocked QueuedJobId InstanceId UTCTime
|
||||
@ -355,7 +358,7 @@ mkLogIdent wId = "Job-Executor " <> showWorkerId wId
|
||||
handleJobs' :: JobWorkerId -> ConduitT JobCtl Void (ReaderT JobContext Handler) ()
|
||||
handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
$logDebugS logIdent $ tshow jctl
|
||||
res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try $ handleCmd jctl
|
||||
res <- fmap (either Just $ const Nothing) . withJobWorkerState wNum (JobWorkerExecJobCtl jctl) . try' $ handleCmd jctl
|
||||
sentRes <- mapReaderT (liftIO . atomically) $ do
|
||||
resVars <- HashMap.lookup jctl <$> (lift . readTVar =<< asks jobConfirm)
|
||||
lift $ foldrM (\resVar -> bool (tryPutTMVar resVar res) $ return True) False (maybe [] NonEmpty.toList resVars)
|
||||
@ -365,6 +368,18 @@ handleJobs' wNum = C.mapM_ $ \jctl -> withJobWorkerState wNum JobWorkerBusy $ do
|
||||
_other -> return ()
|
||||
where
|
||||
logIdent = mkLogIdent wNum
|
||||
|
||||
try' = flip catches
|
||||
[ Exc.Handler $ \(e :: AsyncException) -> throwM e
|
||||
, Exc.Handler $ \(e :: SomeAsyncException) -> throwM e
|
||||
#ifdef DEVELOPMENT
|
||||
, Exc.Handler $ \case
|
||||
MailNotAvailable -> return $ Right ()
|
||||
e -> return . Left $ SomeException e
|
||||
, Exc.Handler $ \SynchroniseLdapNoLdap -> return $ Right ()
|
||||
#endif
|
||||
, Exc.Handler $ \(e :: SomeException) -> return $ Left e
|
||||
] . fmap Right
|
||||
|
||||
handleQueueException :: MonadLogger m => JobQueueException -> m ()
|
||||
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
|
||||
@ -446,7 +461,7 @@ jLocked jId act = do
|
||||
| otherwise
|
||||
-> throwM $ JLocked jId lockInstance lockTime
|
||||
when hadStale .
|
||||
$logWarnS "Jobs" $ "Ignored stale lock: " <> tshow qj
|
||||
$logWarnS "Jobs" $ "Ignored stale lock: " <> tshow (Entity jId qj)
|
||||
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID'
|
||||
, QueuedJobLockTime =. Just now
|
||||
]
|
||||
|
||||
@ -4,7 +4,8 @@ module Utils.Sql
|
||||
|
||||
import ClassyPrelude.Yesod
|
||||
|
||||
import Database.PostgreSQL.Simple (SqlError(SqlError), sqlErrorHint)
|
||||
import Database.PostgreSQL.Simple (SqlError)
|
||||
import Database.PostgreSQL.Simple.Errors (isSerializationError)
|
||||
import Control.Monad.Catch (MonadMask)
|
||||
|
||||
import Database.Persist.Sql
|
||||
@ -16,21 +17,23 @@ import Control.Lens ((&))
|
||||
|
||||
|
||||
setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m) => ReaderT SqlBackend m a -> ReaderT SqlBackend m a
|
||||
setSerializable act = recovering policy [logRetries suggestRetry logRetry] act'
|
||||
setSerializable act = recovering policy (skipAsyncExceptions `snoc` logRetries suggestRetry logRetry) act'
|
||||
where
|
||||
policy :: RetryPolicyM (ReaderT SqlBackend m)
|
||||
policy = fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6
|
||||
|
||||
suggestRetry :: SqlError -> ReaderT SqlBackend m Bool
|
||||
suggestRetry SqlError{sqlErrorHint} = return $ "The transaction might succeed if retried." `isInfixOf` sqlErrorHint
|
||||
suggestRetry = return . isSerializationError
|
||||
|
||||
logRetry :: Bool -- ^ Will retry
|
||||
-> SqlError
|
||||
-> RetryStatus
|
||||
-> ReaderT SqlBackend m ()
|
||||
logRetry shouldRetry err status = $logDebugS "SQL" . pack $ defaultLogMsg shouldRetry err status
|
||||
logRetry shouldRetry@False err status = $logErrorS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status
|
||||
logRetry shouldRetry@True err status = $logDebugS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status
|
||||
|
||||
act' :: RetryStatus -> ReaderT SqlBackend m a
|
||||
act' RetryStatus{..}
|
||||
| rsIterNumber == 0 = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act
|
||||
| otherwise = transactionUndoWithIsolation Serializable *> act
|
||||
| rsIterNumber == 0 = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act''
|
||||
| otherwise = [executeQQ|ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE|] *> act''
|
||||
where act'' = act <* transactionSaveWithIsolation ReadCommitted
|
||||
|
||||
Loading…
Reference in New Issue
Block a user