feat(files): further balance file jobs

This commit is contained in:
Gregor Kleen 2020-07-15 11:50:26 +02:00
parent 1380d9d21e
commit 1926917dd7
9 changed files with 102 additions and 50 deletions

View File

@ -156,7 +156,7 @@ upload-cache:
auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true"
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
upload-cache-bucket: "uni2work-uploads"
inject-files: 60
inject-files: 10
server-sessions:
idle-timeout: 28807
@ -220,10 +220,10 @@ favourites-quick-actions-cache-ttl: 120 # s
token-buckets:
inject-files:
depth: 62914560 # 60MiB
depth: 20971520 # 20MiB
inv-rate: 9.5e-7 # 1MiB/s
initial-value: 0
prune-files:
depth: 1258291200 # 1200MiB
depth: 1572864000 # 1500MiB
inv-rate: 1.9e-6 # 2MiB/s
initial-value: 0

View File

@ -35,11 +35,7 @@ sourceFile FileReference{..} = do
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
fmap Just . (hoistMaybe =<<) . runAppMinio . runMaybeT $ do
let isDoesNotExist :: HttpException -> Bool
isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _))
= responseStatus resp == notFound404
isDoesNotExist _ = False
objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
| fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent
-> throwM SourceFilesMismatchedHashes

View File

@ -1,5 +1,6 @@
module Handler.Utils.Minio
( runAppMinio
, minioIsDoesNotExist
) where
import Import.NoFoundation
@ -17,3 +18,8 @@ runAppMinio :: ( MonadHandler m, HandlerSite m ~ UniWorX
runAppMinio act = do
conn <- hoistMaybe =<< getsYesod appUploadCache
either throwM return <=< liftIO $ Minio.runMinioWith conn act
minioIsDoesNotExist :: HttpException -> Bool
minioIsDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _))
= responseStatus resp == notFound404
minioIsDoesNotExist _ = False

View File

@ -163,6 +163,7 @@ import Web.Cookie.Instances as Import ()
import Network.HTTP.Types.Method.Instances as Import ()
import Crypto.Random.Instances as Import ()
import Network.Minio.Instances as Import ()
import System.Clock.Instances as Import ()
import Crypto.Hash as Import (Digest, SHA3_256, SHA3_512)
import Crypto.Random as Import (ChaChaDRG, Seed)

View File

@ -46,8 +46,11 @@ fileReferences (E.just -> fHash)
dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX
dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
interval <- getsYesod $ view _appPruneUnreferencedFiles
Sum n <- runConduit $ getCandidates
.| C.mapAccumWhileM tbAccum ()
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
.| C.map (view $ _1 . _Value)
.| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef])
.| C.fold
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|]
@ -58,16 +61,11 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
, E.length_ $ fileContent E.^. FileContentContent
)
tbAccum :: (E.Value FileContentReference, E.Value Word64)
-> ()
-> DB (Either () ((), FileContentReference))
tbAccum (E.Value fRef, E.Value fSize) ()
= bool (Left ()) (Right ((), fRef)) <$> persistentTokenBucketTryAlloc' TokenBucketPruneFiles fSize
dispatchJobInjectFiles :: JobHandler UniWorX
dispatchJobInjectFiles = JobHandlerAtomic . hoist lift . maybeT (return ()) $ do
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
interval <- getsYesod $ view _appInjectFiles
let
extractReference (Minio.ListItemObject oi)
@ -76,36 +74,29 @@ dispatchJobInjectFiles = JobHandlerAtomic . hoist lift . maybeT (return ()) $ do
= Just (oi, fRef)
extractReference _ = Nothing
tbAccum :: (Minio.ObjectInfo, FileContentReference)
-> ()
-> DB (Either () ((), (Minio.Object, FileContentReference)))
tbAccum (oi, fRef) ()
= bool (Left ()) (Right ((), (Minio.oiObject oi, fRef))) <$> persistentTokenBucketTryAlloc' TokenBucketInjectFiles (Minio.oiSize oi)
injectOrDelete :: (Minio.Object, FileContentReference)
-> DB (Sum Int64, Sum Int64, Sum Int64) -- ^ Deleted, Injected, Existed
-> Handler (Sum Int64, Sum Int64, Sum Int64) -- ^ Deleted, Injected, Existed
injectOrDelete (obj, fRef) = maybeT (return mempty) $ do
isReferenced <- lift . E.selectExists . E.where_ . E.any E.exists . fileReferences $ E.val fRef
res <- if | isReferenced -> do
alreadyInjected <- lift $ exists [ FileContentHash ==. fRef ]
if | alreadyInjected -> return (mempty, mempty, Sum 1)
| otherwise -> do
content <- (hoistMaybe =<<) . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
lift $ (mempty, Sum 1, mempty) <$ insert (FileContent fRef content)
| otherwise -> return (Sum 1, mempty, mempty)
runAppMinio . maybeT (return ()) . catchIfMaybeT isDoesNotExist $ Minio.removeObject uploadBucket obj
res <- hoist runDB $ do
isReferenced <- lift . E.selectExists . E.where_ . E.any E.exists . fileReferences $ E.val fRef
if | isReferenced -> do
alreadyInjected <- lift $ exists [ FileContentHash ==. fRef ]
if | alreadyInjected -> return (mempty, mempty, Sum 1)
| otherwise -> do
content <- (hoistMaybe =<<) . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
lift $ (mempty, Sum 1, mempty) <$ insert (FileContent fRef content)
| otherwise -> return (Sum 1, mempty, mempty)
runAppMinio . maybeT (return ()) . catchIfMaybeT minioIsDoesNotExist $ Minio.removeObject uploadBucket obj
return res
where isDoesNotExist :: HttpException -> Bool
isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _))
= responseStatus resp == notFound404
isDoesNotExist _ = False
(Sum del, Sum inj, Sum exc) <-
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
.| C.mapMaybe extractReference
.| transPipe lift (C.mapAccumWhileM tbAccum ())
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| transPipe (lift . runDB) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
.| C.map (over _1 Minio.oiObject)
.| transPipe lift (C.mapM injectOrDelete)
.| C.fold

View File

@ -0,0 +1,13 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
module System.Clock.Instances
(
) where
import ClassyPrelude
import System.Clock
import Data.Ratio ((%))
instance Real TimeSpec where
toRational TimeSpec{..} = fromIntegral sec + fromIntegral nsec % 1e9

View File

@ -840,6 +840,21 @@ yieldMMany = C.yieldMany <=< lift
eitherC :: Monad m => ConduitT l o m () -> ConduitT r o m () -> ConduitT (Either l r) o m ()
eitherC lC rC = void $ sequenceConduits [C.mapMaybe (preview _Left) .| lC, C.mapMaybe (preview _Right) .| rC]
takeWhileMC :: forall a m. Monad m => (a -> m Bool) -> ConduitT a a m ()
takeWhileMC f = loop
where loop = do
x <- await
whenIsJust x $ \x' ->
whenM (lift $ f x') $ yield x' *> loop
takeWhileTime :: forall a m. MonadIO m => NominalDiffTime -> ConduitT a a m ()
takeWhileTime maxT = do
sTime <- liftIO getCurrentTime
takeWhileMC . const $ do
now <- liftIO getCurrentTime
let tDelta = now `diffUTCTime` sTime
return $ tDelta < maxT
-----------------
-- Alternative --
-----------------

View File

@ -14,6 +14,7 @@ import Prometheus.Metric.GHC
import qualified Data.List as List
import System.Clock
import Data.Time.Clock.POSIX
import Network.Wai (Middleware)
@ -102,9 +103,9 @@ favouritesQuickActionsDuration = unsafeRegister $ histogram info buckets
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
withHealthReportMetrics act = do
before <- liftIO getPOSIXTime
before <- liftIO $ getTime Monotonic
report <- act
after <- liftIO getPOSIXTime
after <- liftIO $ getTime Monotonic
let checkVal = toPathPiece $ classifyHealthReport report
statusVal = toPathPiece $ healthReportStatus report
@ -121,9 +122,9 @@ registerGHCMetrics = void $ register ghcMetrics
observeHTTPRequestLatency :: forall site. ParseRoute site => (Route site -> String) -> Middleware
observeHTTPRequestLatency classifyHandler app req respond' = do
start <- getPOSIXTime
start <- getTime Monotonic
app req $ \res -> do
end <- getPOSIXTime
end <- getTime Monotonic
let method = decodeUtf8 $ Wai.requestMethod req
status = tshow . HTTP.statusCode $ Wai.responseStatus res
route :: Maybe (Route site)
@ -145,9 +146,9 @@ withJobWorkerStateLbls :: (MonadIO m, MonadMask m) => Label4 -> m a -> m a
withJobWorkerStateLbls newLbls act = do
liftIO $ withLabel jobWorkerStateTransitions newLbls incCounter
start <- liftIO getPOSIXTime
start <- liftIO $ getTime Monotonic
res <- handleAll (return . Left) $ Right <$> act
end <- liftIO getPOSIXTime
end <- liftIO $ getTime Monotonic
liftIO . withLabel jobWorkerStateDuration newLbls . flip observe . realToFrac $ end - start
@ -163,9 +164,9 @@ observeYesodCacheSize = do
observeFavouritesQuickActionsDuration :: (MonadIO m, MonadMask m) => m a -> m a
observeFavouritesQuickActionsDuration act = do
start <- liftIO getPOSIXTime
start <- liftIO $ getTime Monotonic
res <- handleAll (return . Left) $ Right <$> act
end <- liftIO getPOSIXTime
end <- liftIO $ getTime Monotonic
liftIO . observe favouritesQuickActionsDuration . realToFrac $ end - start

View File

@ -1,10 +1,12 @@
module Utils.PersistentTokenBucket
( TokenBucketSettings(..)
, persistentTokenBucketTryAlloc'
, persistentTokenBucketTryAlloc
, persistentTokenBucketTryAlloc', persistentTokenBucketTakeC'
, persistentTokenBucketTryAlloc, persistentTokenBucketTakeC
) where
import Import.NoFoundation
import qualified Data.Conduit.Combinators as C
data TokenBucketSettings = TokenBucketSettings
@ -54,3 +56,30 @@ persistentTokenBucketTryAlloc TokenBucketSettings{..} (fromIntegral -> tokens) =
| otherwise -> do
update (TokenBucketKey tbsIdent) [ TokenBucketLastValue =. currentValue - tokens, TokenBucketLastAccess =. addUTCTime (- deltaT') now ]
return True
persistentTokenBucketTakeC' :: forall i m a.
(MonadHandler m, HasAppSettings (HandlerSite m), Integral a)
=> TokenBucketIdent
-> (i -> a)
-> ConduitT i i (ReaderT SqlBackend m) ()
persistentTokenBucketTakeC' tbsIdent cTokens = do
TokenBucketConf{..} <- getsYesod $ views _appPersistentTokenBuckets ($ tbsIdent)
flip persistentTokenBucketTakeC cTokens TokenBucketSettings
{ tbsIdent
, tbsDepth = tokenBucketDepth
, tbsInvRate = tokenBucketInvRate
, tbsInitialValue = tokenBucketInitialValue
}
persistentTokenBucketTakeC :: forall i m a.
(MonadIO m, Integral a)
=> TokenBucketSettings
-> (i -> a)
-> ConduitT i i (ReaderT SqlBackend m) ()
persistentTokenBucketTakeC tbs cTokens = C.mapAccumWhileM tbAccum ()
where tbAccum :: i
-> ()
-> SqlPersistT m (Either () ((), i))
tbAccum x ()
= bool (Left ()) (Right ((), x)) <$> persistentTokenBucketTryAlloc tbs (cTokens x)