From 2ca024b9351df800b57d3235c4a00776cd669952 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 22 Sep 2020 13:43:58 +0200 Subject: [PATCH] fix(files): don't inject serializable --- src/Jobs/Handler/Files.hs | 12 ++++++------ src/Utils.hs | 2 +- src/Utils/Sql.hs | 19 ++++++++++++++----- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 69f969538..fa1eec303 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -211,7 +211,7 @@ dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtom observeDeletedUnreferencedFiles deletedEntries $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|] observeDeletedUnreferencedChunks deletedChunks deletedChunkSize - $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{tshow deletedChunkSize} bytes)|] + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{textBytes deletedChunkSize})|] dispatchJobInjectFiles :: JobHandler UniWorX @@ -229,7 +229,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do let obj = Minio.oiObject objInfo sz = fromIntegral $ Minio.oiSize objInfo - fRef' <- runDB . setSerializable $ do + fRef' <- runDB $ do chunkVar <- newEmptyTMVarIO dbAsync <- allocateLinkedAsync $ do atomically $ isEmptyTMVar chunkVar >>= guard . not @@ -251,12 +251,12 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do p = realToFrac $ (toInteger sz' % toInteger sz) * 100 !c' = succ c eta :: Integer - eta = ceiling $ (toRational (currT - startT) / fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz) + eta = ceiling $ ((toRational currT - toRational startT) / toRational accsz) * toRational (sz - fromIntegral accsz) !lastReport' | currT - fromMaybe startT lastReport > 5e9 = Just currT | otherwise = lastReport when (lastReport' /= lastReport) $ - runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|] + runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|] atomically . putTMVar chunkVar $ Just chunk go c' sz' lastReport' startT lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks @@ -282,7 +282,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do .| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeRechunkedFiles inj sz) .| C.fold - $logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{tshow injectedSize} bytes)|] + $logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|] data RechunkFileException @@ -330,4 +330,4 @@ dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin return (rechunkedFiles, rechunkedSize) fin (rechunkedFiles, rechunkedSize) = do observeRechunkedFiles rechunkedFiles rechunkedSize - $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{tshow rechunkedSize} bytes)|] + $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{textBytes rechunkedSize} bytes)|] diff --git a/src/Utils.hs b/src/Utils.hs index 2aa71f0db..4e0a169a5 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -307,7 +307,7 @@ textDuration n' = view _2 $ foldr acc (toInteger n', "") units , (1, "s") ] acc (mult, unit) (n, t) - | unitCount > 0 = (unitRem, t <> tshow unitCount <> tshow unit) + | unitCount > 0 = (unitRem, t <> tshow unitCount <> unit) | otherwise = (n, t) where (unitCount, unitRem) = n `divMod` mult diff --git a/src/Utils/Sql.hs b/src/Utils/Sql.hs index c0470ba30..19d9eda9f 100644 --- a/src/Utils/Sql.hs +++ b/src/Utils/Sql.hs @@ -27,6 +27,14 @@ import Control.Monad.Random.Class (MonadRandom(getRandom)) import Text.Shakespeare.Text (st) +import Control.Concurrent.Async (ExceptionInLinkedThread(..)) + + +fromExceptionWrapped :: Exception exc => SomeException -> Maybe exc +fromExceptionWrapped (fromException -> Just exc) = Just exc +fromExceptionWrapped ((fromException >=> \(ExceptionInLinkedThread _ exc') -> fromExceptionWrapped exc') -> Just exc) = Just exc +fromExceptionWrapped _ = Nothing + setSerializable :: forall m a. (MonadLogger m, MonadMask m, MonadIO m, ReadLogSettings (SqlPersistT m)) => SqlPersistT m a -> SqlPersistT m a setSerializable = setSerializable' $ fullJitterBackoff 1e3 & limitRetriesByCumulativeDelay 10e6 @@ -40,12 +48,12 @@ setSerializable' policy act = do didCommit <- newTVarIO False recovering policy (skipAsyncExceptions `snoc` logRetries suggestRetry (logRetry logSerializableTransactionRetryLimit)) $ act' didCommit where - suggestRetry :: SqlError -> ReaderT SqlBackend m Bool - suggestRetry = return . isSerializationError + suggestRetry :: SomeException -> ReaderT SqlBackend m Bool + suggestRetry = return . maybe False isSerializationError . fromExceptionWrapped logRetry :: Maybe Natural -> Bool -- ^ Will retry - -> SqlError + -> SomeException -> RetryStatus -> ReaderT SqlBackend m () logRetry _ shouldRetry@False err status = $logErrorS "SQL.setSerializable" . pack $ defaultLogMsg shouldRetry err status @@ -74,10 +82,11 @@ handleSql :: forall m a. (MonadCatch m, MonadIO m) => (SqlError -> SqlPersistT m handleSql recover act = do savepointName <- liftIO $ UUID.toString <$> getRandom - let recover' :: SqlError -> SqlPersistT m a - recover' exc = do + let recover' :: SomeException -> SqlPersistT m a + recover' (fromExceptionWrapped -> Just exc) = do rawExecute [st|ROLLBACK TO SAVEPOINT "#{savepointName}"|] [] recover exc + recover' exc = throwM exc handle recover' $ do rawExecute [st|SAVEPOINT "#{savepointName}"|] []