From 1926917dd7463a4ed11b9e7ee64fab6c8167de6f Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 15 Jul 2020 11:50:26 +0200 Subject: [PATCH] feat(files): further balance file jobs --- config/settings.yml | 6 ++-- src/Handler/Utils/Files.hs | 6 +--- src/Handler/Utils/Minio.hs | 6 ++++ src/Import/NoModel.hs | 1 + src/Jobs/Handler/Files.hs | 55 +++++++++++++----------------- src/System/Clock/Instances.hs | 13 +++++++ src/Utils.hs | 15 ++++++++ src/Utils/Metrics.hs | 17 ++++----- src/Utils/PersistentTokenBucket.hs | 33 ++++++++++++++++-- 9 files changed, 102 insertions(+), 50 deletions(-) create mode 100644 src/System/Clock/Instances.hs diff --git a/config/settings.yml b/config/settings.yml index 929188996..8f21d7277 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -156,7 +156,7 @@ upload-cache: auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true" disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false" upload-cache-bucket: "uni2work-uploads" -inject-files: 60 +inject-files: 10 server-sessions: idle-timeout: 28807 @@ -220,10 +220,10 @@ favourites-quick-actions-cache-ttl: 120 # s token-buckets: inject-files: - depth: 62914560 # 60MiB + depth: 20971520 # 20MiB inv-rate: 9.5e-7 # 1MiB/s initial-value: 0 prune-files: - depth: 1258291200 # 1200MiB + depth: 1572864000 # 1500MiB inv-rate: 1.9e-6 # 2MiB/s initial-value: 0 diff --git a/src/Handler/Utils/Files.hs b/src/Handler/Utils/Files.hs index 27c80c6d5..e8af2f9ad 100644 --- a/src/Handler/Utils/Files.hs +++ b/src/Handler/Utils/Files.hs @@ -35,11 +35,7 @@ sourceFile FileReference{..} = do let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket fmap Just . (hoistMaybe =<<) . runAppMinio . runMaybeT $ do - let isDoesNotExist :: HttpException -> Bool - isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _)) - = responseStatus resp == notFound404 - isDoesNotExist _ = False - objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions + objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions lift . runConduit $ Minio.gorObjectStream objRes .| C.fold | fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent -> throwM SourceFilesMismatchedHashes diff --git a/src/Handler/Utils/Minio.hs b/src/Handler/Utils/Minio.hs index 939e10b87..5d85ff633 100644 --- a/src/Handler/Utils/Minio.hs +++ b/src/Handler/Utils/Minio.hs @@ -1,5 +1,6 @@ module Handler.Utils.Minio ( runAppMinio + , minioIsDoesNotExist ) where import Import.NoFoundation @@ -17,3 +18,8 @@ runAppMinio :: ( MonadHandler m, HandlerSite m ~ UniWorX runAppMinio act = do conn <- hoistMaybe =<< getsYesod appUploadCache either throwM return <=< liftIO $ Minio.runMinioWith conn act + +minioIsDoesNotExist :: HttpException -> Bool +minioIsDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _)) + = responseStatus resp == notFound404 +minioIsDoesNotExist _ = False diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index 1fca376b5..9f26ad2cf 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -163,6 +163,7 @@ import Web.Cookie.Instances as Import () import Network.HTTP.Types.Method.Instances as Import () import Crypto.Random.Instances as Import () import Network.Minio.Instances as Import () +import System.Clock.Instances as Import () import Crypto.Hash as Import (Digest, SHA3_256, SHA3_512) import Crypto.Random as Import (ChaChaDRG, Seed) diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 4ed1d10d3..796c02c55 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -46,8 +46,11 @@ fileReferences (E.just -> fHash) dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do + interval <- getsYesod $ view _appPruneUnreferencedFiles Sum n <- runConduit $ getCandidates - .| C.mapAccumWhileM tbAccum () + .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + .| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64) + .| C.map (view $ _1 . _Value) .| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef]) .| C.fold $logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|] @@ -58,16 +61,11 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do , E.length_ $ fileContent E.^. FileContentContent ) - tbAccum :: (E.Value FileContentReference, E.Value Word64) - -> () - -> DB (Either () ((), FileContentReference)) - tbAccum (E.Value fRef, E.Value fSize) () - = bool (Left ()) (Right ((), fRef)) <$> persistentTokenBucketTryAlloc' TokenBucketPruneFiles fSize - dispatchJobInjectFiles :: JobHandler UniWorX -dispatchJobInjectFiles = JobHandlerAtomic . hoist lift . maybeT (return ()) $ do - uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket +dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do + uploadBucket <- getsYesod $ view _appUploadCacheBucket + interval <- getsYesod $ view _appInjectFiles let extractReference (Minio.ListItemObject oi) @@ -76,36 +74,29 @@ dispatchJobInjectFiles = JobHandlerAtomic . hoist lift . maybeT (return ()) $ do = Just (oi, fRef) extractReference _ = Nothing - tbAccum :: (Minio.ObjectInfo, FileContentReference) - -> () - -> DB (Either () ((), (Minio.Object, FileContentReference))) - tbAccum (oi, fRef) () - = bool (Left ()) (Right ((), (Minio.oiObject oi, fRef))) <$> persistentTokenBucketTryAlloc' TokenBucketInjectFiles (Minio.oiSize oi) - injectOrDelete :: (Minio.Object, FileContentReference) - -> DB (Sum Int64, Sum Int64, Sum Int64) -- ^ Deleted, Injected, Existed + -> Handler (Sum Int64, Sum Int64, Sum Int64) -- ^ Deleted, Injected, Existed injectOrDelete (obj, fRef) = maybeT (return mempty) $ do - isReferenced <- lift . E.selectExists . E.where_ . E.any E.exists . fileReferences $ E.val fRef - res <- if | isReferenced -> do - alreadyInjected <- lift $ exists [ FileContentHash ==. fRef ] - if | alreadyInjected -> return (mempty, mempty, Sum 1) - | otherwise -> do - content <- (hoistMaybe =<<) . runAppMinio . runMaybeT $ do - objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions - lift . runConduit $ Minio.gorObjectStream objRes .| C.fold - lift $ (mempty, Sum 1, mempty) <$ insert (FileContent fRef content) - | otherwise -> return (Sum 1, mempty, mempty) - runAppMinio . maybeT (return ()) . catchIfMaybeT isDoesNotExist $ Minio.removeObject uploadBucket obj + res <- hoist runDB $ do + isReferenced <- lift . E.selectExists . E.where_ . E.any E.exists . fileReferences $ E.val fRef + if | isReferenced -> do + alreadyInjected <- lift $ exists [ FileContentHash ==. fRef ] + if | alreadyInjected -> return (mempty, mempty, Sum 1) + | otherwise -> do + content <- (hoistMaybe =<<) . runAppMinio . runMaybeT $ do + objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions + lift . runConduit $ Minio.gorObjectStream objRes .| C.fold + lift $ (mempty, Sum 1, mempty) <$ insert (FileContent fRef content) + | otherwise -> return (Sum 1, mempty, mempty) + runAppMinio . maybeT (return ()) . catchIfMaybeT minioIsDoesNotExist $ Minio.removeObject uploadBucket obj return res - where isDoesNotExist :: HttpException -> Bool - isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _)) - = responseStatus resp == notFound404 - isDoesNotExist _ = False (Sum del, Sum inj, Sum exc) <- runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) .| C.mapMaybe extractReference - .| transPipe lift (C.mapAccumWhileM tbAccum ()) + .| maybe (C.map id) (takeWhileTime . (/ 2)) interval + .| transPipe (lift . runDB) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) + .| C.map (over _1 Minio.oiObject) .| transPipe lift (C.mapM injectOrDelete) .| C.fold diff --git a/src/System/Clock/Instances.hs b/src/System/Clock/Instances.hs new file mode 100644 index 000000000..16082a700 --- /dev/null +++ b/src/System/Clock/Instances.hs @@ -0,0 +1,13 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} + +module System.Clock.Instances + ( + ) where + +import ClassyPrelude +import System.Clock +import Data.Ratio ((%)) + + +instance Real TimeSpec where + toRational TimeSpec{..} = fromIntegral sec + fromIntegral nsec % 1e9 diff --git a/src/Utils.hs b/src/Utils.hs index 218f2fd1a..3181e52d5 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -840,6 +840,21 @@ yieldMMany = C.yieldMany <=< lift eitherC :: Monad m => ConduitT l o m () -> ConduitT r o m () -> ConduitT (Either l r) o m () eitherC lC rC = void $ sequenceConduits [C.mapMaybe (preview _Left) .| lC, C.mapMaybe (preview _Right) .| rC] +takeWhileMC :: forall a m. Monad m => (a -> m Bool) -> ConduitT a a m () +takeWhileMC f = loop + where loop = do + x <- await + whenIsJust x $ \x' -> + whenM (lift $ f x') $ yield x' *> loop + +takeWhileTime :: forall a m. MonadIO m => NominalDiffTime -> ConduitT a a m () +takeWhileTime maxT = do + sTime <- liftIO getCurrentTime + takeWhileMC . const $ do + now <- liftIO getCurrentTime + let tDelta = now `diffUTCTime` sTime + return $ tDelta < maxT + ----------------- -- Alternative -- ----------------- diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 8f2cfcfa9..7509c83fd 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -14,6 +14,7 @@ import Prometheus.Metric.GHC import qualified Data.List as List +import System.Clock import Data.Time.Clock.POSIX import Network.Wai (Middleware) @@ -102,9 +103,9 @@ favouritesQuickActionsDuration = unsafeRegister $ histogram info buckets withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport withHealthReportMetrics act = do - before <- liftIO getPOSIXTime + before <- liftIO $ getTime Monotonic report <- act - after <- liftIO getPOSIXTime + after <- liftIO $ getTime Monotonic let checkVal = toPathPiece $ classifyHealthReport report statusVal = toPathPiece $ healthReportStatus report @@ -121,9 +122,9 @@ registerGHCMetrics = void $ register ghcMetrics observeHTTPRequestLatency :: forall site. ParseRoute site => (Route site -> String) -> Middleware observeHTTPRequestLatency classifyHandler app req respond' = do - start <- getPOSIXTime + start <- getTime Monotonic app req $ \res -> do - end <- getPOSIXTime + end <- getTime Monotonic let method = decodeUtf8 $ Wai.requestMethod req status = tshow . HTTP.statusCode $ Wai.responseStatus res route :: Maybe (Route site) @@ -145,9 +146,9 @@ withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Label4 -> m a -> m a withJobWorkerStateLbls newLbls act = do liftIO $ withLabel jobWorkerStateTransitions newLbls incCounter - start <- liftIO getPOSIXTime + start <- liftIO $ getTime Monotonic res <- handleAll (return . Left) $ Right <$> act - end <- liftIO getPOSIXTime + end <- liftIO $ getTime Monotonic liftIO . withLabel jobWorkerStateDuration newLbls . flip observe . realToFrac $ end - start @@ -163,9 +164,9 @@ observeYesodCacheSize = do observeFavouritesQuickActionsDuration :: (MonadIO m, MonadMask m) => m a -> m a observeFavouritesQuickActionsDuration act = do - start <- liftIO getPOSIXTime + start <- liftIO $ getTime Monotonic res <- handleAll (return . Left) $ Right <$> act - end <- liftIO getPOSIXTime + end <- liftIO $ getTime Monotonic liftIO . observe favouritesQuickActionsDuration . realToFrac $ end - start diff --git a/src/Utils/PersistentTokenBucket.hs b/src/Utils/PersistentTokenBucket.hs index 3f6d3aa03..5cec01339 100644 --- a/src/Utils/PersistentTokenBucket.hs +++ b/src/Utils/PersistentTokenBucket.hs @@ -1,10 +1,12 @@ module Utils.PersistentTokenBucket ( TokenBucketSettings(..) - , persistentTokenBucketTryAlloc' - , persistentTokenBucketTryAlloc + , persistentTokenBucketTryAlloc', persistentTokenBucketTakeC' + , persistentTokenBucketTryAlloc, persistentTokenBucketTakeC ) where import Import.NoFoundation + +import qualified Data.Conduit.Combinators as C data TokenBucketSettings = TokenBucketSettings @@ -54,3 +56,30 @@ persistentTokenBucketTryAlloc TokenBucketSettings{..} (fromIntegral -> tokens) = | otherwise -> do update (TokenBucketKey tbsIdent) [ TokenBucketLastValue =. currentValue - tokens, TokenBucketLastAccess =. addUTCTime (- deltaT') now ] return True + + +persistentTokenBucketTakeC' :: forall i m a. + (MonadHandler m, HasAppSettings (HandlerSite m), Integral a) + => TokenBucketIdent + -> (i -> a) + -> ConduitT i i (ReaderT SqlBackend m) () +persistentTokenBucketTakeC' tbsIdent cTokens = do + TokenBucketConf{..} <- getsYesod $ views _appPersistentTokenBuckets ($ tbsIdent) + flip persistentTokenBucketTakeC cTokens TokenBucketSettings + { tbsIdent + , tbsDepth = tokenBucketDepth + , tbsInvRate = tokenBucketInvRate + , tbsInitialValue = tokenBucketInitialValue + } + +persistentTokenBucketTakeC :: forall i m a. + (MonadIO m, Integral a) + => TokenBucketSettings + -> (i -> a) + -> ConduitT i i (ReaderT SqlBackend m) () +persistentTokenBucketTakeC tbs cTokens = C.mapAccumWhileM tbAccum () + where tbAccum :: i + -> () + -> SqlPersistT m (Either () ((), i)) + tbAccum x () + = bool (Left ()) (Right ((), x)) <$> persistentTokenBucketTryAlloc tbs (cTokens x)