feat(files): move uploads from buffer to database

Rate limit pruning of unreferenced files
This commit is contained in:
Gregor Kleen 2020-07-14 17:57:52 +02:00
parent d9e9179a52
commit 9a2cba5c0a
13 changed files with 246 additions and 44 deletions

View File

@ -156,6 +156,7 @@ upload-cache:
auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true"
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
upload-cache-bucket: "uni2work-uploads"
inject-files: 300
server-sessions:
idle-timeout: 28807
@ -215,3 +216,14 @@ favourites-quick-actions-burstsize: 40
favourites-quick-actions-avg-inverse-rate: 50e3 # µs/token
favourites-quick-actions-timeout: 40e-3 # s
favourites-quick-actions-cache-ttl: 120 # s
token-buckets:
inject-files:
depth: 2097152 # 2MiB
inv-rate: 9.5e-7 # 1MiB/s
initial-value: 0
prune-files:
depth: 10485760 # 10MiB
inv-rate: 1.9e-6 # 2MiB/s
initial-value: 0

View File

@ -22,4 +22,11 @@ SentNotification
content Value
user UserId
time UTCTime
instance InstanceId
instance InstanceId
TokenBucket
ident TokenBucketIdent
lastValue Int64
lastAccess UTCTime
Primary ident

View File

@ -208,6 +208,7 @@ default-extensions:
- RecursiveDo
- TypeFamilyDependencies
- QuantifiedConstraints
- EmptyDataDeriving
ghc-options:
- -Wall

View File

@ -39,7 +39,7 @@ import Data.Type.Equality (TestEquality(..))
import qualified Data.HashMap.Strict as HashMap
import Control.Concurrent.TokenBucket (TokenBucket, newTokenBucket, tokenBucketTryAlloc)
import qualified Control.Concurrent.TokenBucket as Concurrent (TokenBucket, newTokenBucket, tokenBucketTryAlloc)
import System.IO.Unsafe (unsafePerformIO)
@ -235,7 +235,7 @@ hashableDynamic :: forall a.
=> a -> HashableDynamic
hashableDynamic v = HashableDynamic (typeOf v) v
memcachedLimit :: TVar (HashMap HashableDynamic TokenBucket)
memcachedLimit :: TVar (HashMap HashableDynamic Concurrent.TokenBucket)
memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty
{-# NOINLINE memcachedLimit #-}
@ -258,13 +258,13 @@ memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate t
bucket <- case mBucket of
Just bucket -> return bucket
Nothing -> liftIO $ do
bucket <- newTokenBucket
bucket <- Concurrent.newTokenBucket
atomically $ do
hm <- readTVar memcachedLimit
let hm' = HashMap.insertWith (flip const) lK bucket hm
writeTVar memcachedLimit $! hm'
return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm'
sufficientTokens <- liftIO $ tokenBucketTryAlloc bucket burst rate tokens
sufficientTokens <- liftIO $ Concurrent.tokenBucketTryAlloc bucket burst rate tokens
$logDebugS "memcachedLimitedWith" $ "Sufficient tokens: " <> tshow sufficientTokens
guard sufficientTokens

View File

@ -8,5 +8,6 @@ import Import.NoFoundation as Import
import Utils.SystemMessage as Import
import Utils.Metrics as Import
import Utils.Files as Import
import Utils.PersistentTokenBucket as Import
import Jobs.Types as Import (JobHandler(..))

View File

@ -60,7 +60,7 @@ import Jobs.Handler.TransactionLog
import Jobs.Handler.SynchroniseLdap
import Jobs.Handler.PruneInvitations
import Jobs.Handler.ChangeUserDisplayEmail
import Jobs.Handler.PruneFiles
import Jobs.Handler.Files
import Jobs.HealthReport

View File

@ -78,6 +78,16 @@ determineCrontab = execWriterT $ do
, cronNotAfter = Right CronNotScheduled
}
whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval ->
tell $ HashMap.singleton
(JobCtlQueue JobInjectFiles)
Cron
{ cronInitial = CronAsap
, cronRepeat = CronRepeatScheduled CronAsap
, cronRateLimit = iInterval
, cronNotAfter = Right CronNotScheduled
}
tell . flip foldMap universeF $ \kind ->
case appHealthCheckInterval kind of
Just int -> HashMap.singleton

117
src/Jobs/Handler/Files.hs Normal file
View File

@ -0,0 +1,117 @@
module Jobs.Handler.Files
( dispatchJobPruneSessionFiles
, dispatchJobPruneUnreferencedFiles
, dispatchJobInjectFiles
) where
import Import hiding (matching)
import Database.Persist.Sql (deleteWhereCount)
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (mapMaybe)
import Handler.Utils.Minio
import qualified Network.Minio as Minio
import qualified Crypto.Hash as Crypto
import qualified Data.ByteString.Base64.URL as Base64
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
now <- liftIO getCurrentTime
expires <- getsYesod $ view _appSessionFilesExpire
n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
$logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
fileReferences :: E.SqlExpr (E.Value FileContentReference) -> [E.SqlQuery ()]
fileReferences (E.just -> fHash)
= [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash
, E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash
, E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash
, E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash
, E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash
, E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
]
dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX
dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
Sum n <- runConduit $ getCandidates
.| C.mapAccumWhileM tbAccum ()
.| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef])
.| C.fold
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|]
where
getCandidates = E.selectSource . E.from $ \fileContent -> do
E.where_ . E.not_ . E.any E.exists $ fileReferences (fileContent E.^. FileContentHash)
return $ ( fileContent E.^. FileContentHash
, E.length_ $ fileContent E.^. FileContentContent
)
tbAccum :: (E.Value FileContentReference, E.Value Word64)
-> ()
-> DB (Either () ((), FileContentReference))
tbAccum (E.Value fRef, E.Value fSize) ()
= bool (Left ()) (Right ((), fRef)) <$> persistentTokenBucketTryAlloc' TokenBucketPruneFiles fSize
dispatchJobInjectFiles :: JobHandler UniWorX
dispatchJobInjectFiles = JobHandlerAtomic . hoist lift . maybeT (return ()) $ do
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
let
extractReference (Minio.ListItemObject oi)
| Right bs <- Base64.decodeUnpadded . encodeUtf8 $ Minio.oiObject oi
, Just fRef <- Crypto.digestFromByteString bs
= Just (oi, fRef)
extractReference _ = Nothing
tbAccum :: (Minio.ObjectInfo, FileContentReference)
-> ()
-> DB (Either () ((), (Minio.Object, FileContentReference)))
tbAccum (oi, fRef) ()
= bool (Left ()) (Right ((), (Minio.oiObject oi, fRef))) <$> persistentTokenBucketTryAlloc' TokenBucketInjectFiles (Minio.oiSize oi)
injectOrDelete :: (Minio.Object, FileContentReference)
-> DB (Sum Int64, Sum Int64, Sum Int64) -- ^ Deleted, Injected, Existed
injectOrDelete (obj, fRef) = maybeT (return mempty) $ do
isReferenced <- lift . E.selectExists . E.where_ . E.any E.exists . fileReferences $ E.val fRef
res <- if | isReferenced -> do
alreadyInjected <- lift $ exists [ FileContentHash ==. fRef ]
if | alreadyInjected -> return (mempty, mempty, Sum 1)
| otherwise -> do
content <- (hoistMaybe =<<) . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
lift $ (mempty, Sum 1, mempty) <$ insert (FileContent fRef content)
| otherwise -> return (Sum 1, mempty, mempty)
runAppMinio . maybeT (return ()) . catchIfMaybeT isDoesNotExist $ Minio.removeObject uploadBucket obj
return res
where isDoesNotExist :: HttpException -> Bool
isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _))
= responseStatus resp == notFound404
isDoesNotExist _ = False
(Sum del, Sum inj, Sum exc) <-
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
.| C.mapMaybe extractReference
.| transPipe lift (C.mapAccumWhileM tbAccum ())
.| transPipe lift (C.mapM injectOrDelete)
.| C.fold
when (del > 0) $
$logInfoS "InjectFiles" [st|Deleted #{del} unreferenced files from upload cache|]
when (exc > 0) $
$logInfoS "InjectFiles" [st|Deleted #{exc} files from upload cache because they were already referenced|]
when (inj > 0) $
$logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|]

View File

@ -1,38 +0,0 @@
module Jobs.Handler.PruneFiles
( dispatchJobPruneSessionFiles
, dispatchJobPruneUnreferencedFiles
) where
import Import hiding (matching)
import Database.Persist.Sql (deleteWhereCount)
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
now <- liftIO getCurrentTime
expires <- getsYesod $ view _appSessionFilesExpire
n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
$logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX
dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
n <- E.deleteCount . E.from $ \fileContent ->
E.where_ . E.not_ . E.any E.exists $ references fileContent
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|]
where
references :: E.SqlExpr (Entity FileContent) -> [E.SqlQuery ()]
references (E.just . (E.^. FileContentHash) -> fHash) =
[ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash
, E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash
, E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash
, E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash
, E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash
, E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
]

View File

@ -80,6 +80,7 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica
}
| JobPruneSessionFiles
| JobPruneUnreferencedFiles
| JobInjectFiles
deriving (Eq, Ord, Show, Read, Generic, Typeable)
data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
| NotificationSheetActive { nSheet :: SheetId }

View File

@ -11,6 +11,7 @@ module Model.Types.Misc
) where
import Import.NoModel
import Model.Types.TH.PathPiece
import Data.Maybe (fromJust)
@ -28,6 +29,8 @@ import Database.Persist.Sql (PersistFieldSql(..))
import Utils.Lens.TH
import Web.HttpApiData
data StudyFieldType = FieldPrimary | FieldSecondary
deriving (Eq, Ord, Enum, Show, Read, Bounded, Generic)
@ -257,3 +260,19 @@ instance Csv.ToField Sex where
toField = Csv.toField . toPathPiece
instance Csv.FromField Sex where
parseField = maybe (fail "Could not parse Field of type Sex") return . fromPathPiece <=< Csv.parseField
data TokenBucketIdent = TokenBucketInjectFiles
| TokenBucketPruneFiles
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
deriving anyclass (Universe, Finite, Hashable)
nullaryPathPiece ''TokenBucketIdent $ camelToPathPiece' 2
pathPieceJSON ''TokenBucketIdent
pathPieceJSONKey ''TokenBucketIdent
derivePersistFieldPathPiece ''TokenBucketIdent
instance ToHttpApiData TokenBucketIdent where
toUrlPiece = toPathPiece
instance FromHttpApiData TokenBucketIdent where
parseUrlPiece = maybe (Left "Could not parse TokenBucketIdent") Right . fromPathPiece

View File

@ -168,12 +168,15 @@ data AppSettings = AppSettings
, appUploadCacheConf :: Maybe Minio.ConnectInfo
, appUploadCacheBucket :: Minio.Bucket
, appInjectFiles :: Maybe NominalDiffTime
, appFavouritesQuickActionsBurstsize
, appFavouritesQuickActionsAvgInverseRate :: Word64
, appFavouritesQuickActionsTimeout :: DiffTime
, appFavouritesQuickActionsCacheTTL :: Maybe DiffTime
, appPersistentTokenBuckets :: TokenBucketIdent -> TokenBucketConf
, appInitialInstanceID :: Maybe (Either FilePath UUID)
, appRibbon :: Maybe Text
} deriving Show
@ -293,6 +296,16 @@ data SmtpAuthConf = SmtpAuthConf
, smtpAuthPassword :: HaskellNet.Password
} deriving (Show)
data TokenBucketConf = TokenBucketConf
{ tokenBucketDepth :: Word64
, tokenBucketInvRate :: NominalDiffTime
, tokenBucketInitialValue :: Int64
} deriving (Eq, Ord, Show, Generic, Typeable)
deriveJSON defaultOptions
{ fieldLabelModifier = camelToPathPiece' 2
} ''TokenBucketConf
deriveJSON defaultOptions
{ constructorTagModifier = camelToPathPiece' 2
, fieldLabelModifier = camelToPathPiece' 2
@ -489,6 +502,7 @@ instance FromJSON AppSettings where
appSessionFilesExpire <- o .: "session-files-expire"
appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files"
appInjectFiles <- o .:? "inject-files"
appMaximumContentLength <- o .: "maximum-content-length"
@ -536,6 +550,8 @@ instance FromJSON AppSettings where
appFavouritesQuickActionsTimeout <- o .: "favourites-quick-actions-timeout"
appFavouritesQuickActionsCacheTTL <- o .: "favourites-quick-actions-cache-ttl"
appPersistentTokenBuckets <- o .: "token-buckets"
appUploadCacheConf <- assertM (not . null . Minio.connectHost) <$> o .:? "upload-cache"
appUploadCacheBucket <- o .: "upload-cache-bucket"

View File

@ -0,0 +1,56 @@
module Utils.PersistentTokenBucket
( TokenBucketSettings(..)
, persistentTokenBucketTryAlloc'
, persistentTokenBucketTryAlloc
) where
import Import.NoFoundation
data TokenBucketSettings = TokenBucketSettings
{ tbsIdent :: TokenBucketIdent
, tbsDepth :: Word64
, tbsInvRate :: NominalDiffTime
, tbsInitialValue :: Int64
}
persistentTokenBucketTryAlloc' :: (MonadHandler m, HasAppSettings (HandlerSite m), Integral a)
=> TokenBucketIdent
-> a
-> SqlPersistT m Bool
persistentTokenBucketTryAlloc' tbsIdent tokens = do
TokenBucketConf{..} <- getsYesod $ views _appPersistentTokenBuckets ($ tbsIdent)
flip persistentTokenBucketTryAlloc tokens TokenBucketSettings
{ tbsIdent
, tbsDepth = tokenBucketDepth
, tbsInvRate = tokenBucketInvRate
, tbsInitialValue = tokenBucketInitialValue
}
persistentTokenBucketTryAlloc :: (MonadIO m, Integral a) => TokenBucketSettings -> a -> SqlPersistT m Bool
persistentTokenBucketTryAlloc TokenBucketSettings{..} (fromIntegral -> tokens) = do
now <- liftIO getCurrentTime
TokenBucket{..} <- do
existingBucket <- get $ TokenBucketKey tbsIdent
case existingBucket of
Just bkt -> return bkt
Nothing -> do
let bkt = TokenBucket
{ tokenBucketIdent = tbsIdent
, tokenBucketLastValue = tbsInitialValue
, tokenBucketLastAccess = now
}
insert_ bkt
return bkt
let currentValue = fromIntegral tbsDepth `min` tokenBucketLastValue + tokenIncrease
deltaT = now `diffUTCTime` tokenBucketLastAccess
(tokenIncrease, deltaT')
| n < 0 = (pred n, (1 + f) * tbsInvRate)
| otherwise = (n, f * tbsInvRate)
where (n, f) = properFraction $ deltaT / tbsInvRate
if | currentValue < 0 -> return False
| otherwise -> do
update (TokenBucketKey tbsIdent) [ TokenBucketLastValue =. currentValue - tokens, TokenBucketLastAccess =. addUTCTime (- deltaT') now ]
return True