diff --git a/config/settings.yml b/config/settings.yml index 6c217d8ce..2e24f25be 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -156,6 +156,7 @@ upload-cache: auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true" disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false" upload-cache-bucket: "uni2work-uploads" +inject-files: 300 server-sessions: idle-timeout: 28807 @@ -215,3 +216,14 @@ favourites-quick-actions-burstsize: 40 favourites-quick-actions-avg-inverse-rate: 50e3 # µs/token favourites-quick-actions-timeout: 40e-3 # s favourites-quick-actions-cache-ttl: 120 # s + + +token-buckets: + inject-files: + depth: 2097152 # 2MiB + inv-rate: 9.5e-7 # 1MiB/s + initial-value: 0 + prune-files: + depth: 10485760 # 10MiB + inv-rate: 1.9e-6 # 2MiB/s + initial-value: 0 diff --git a/models/jobs.model b/models/jobs.model index 49a21a6e1..7caa80506 100644 --- a/models/jobs.model +++ b/models/jobs.model @@ -22,4 +22,11 @@ SentNotification content Value user UserId time UTCTime - instance InstanceId \ No newline at end of file + instance InstanceId + + +TokenBucket + ident TokenBucketIdent + lastValue Int64 + lastAccess UTCTime + Primary ident \ No newline at end of file diff --git a/package.yaml b/package.yaml index 96aedaf43..2678670a9 100644 --- a/package.yaml +++ b/package.yaml @@ -208,6 +208,7 @@ default-extensions: - RecursiveDo - TypeFamilyDependencies - QuantifiedConstraints + - EmptyDataDeriving ghc-options: - -Wall diff --git a/src/Handler/Utils/Memcached.hs b/src/Handler/Utils/Memcached.hs index 8a9efb46d..3116d6f18 100644 --- a/src/Handler/Utils/Memcached.hs +++ b/src/Handler/Utils/Memcached.hs @@ -39,7 +39,7 @@ import Data.Type.Equality (TestEquality(..)) import qualified Data.HashMap.Strict as HashMap -import Control.Concurrent.TokenBucket (TokenBucket, newTokenBucket, tokenBucketTryAlloc) +import qualified Control.Concurrent.TokenBucket as Concurrent (TokenBucket, newTokenBucket, tokenBucketTryAlloc) import System.IO.Unsafe (unsafePerformIO) @@ -235,7 +235,7 @@ hashableDynamic :: forall a. => a -> HashableDynamic hashableDynamic v = HashableDynamic (typeOf v) v -memcachedLimit :: TVar (HashMap HashableDynamic TokenBucket) +memcachedLimit :: TVar (HashMap HashableDynamic Concurrent.TokenBucket) memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty {-# NOINLINE memcachedLimit #-} @@ -258,13 +258,13 @@ memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate t bucket <- case mBucket of Just bucket -> return bucket Nothing -> liftIO $ do - bucket <- newTokenBucket + bucket <- Concurrent.newTokenBucket atomically $ do hm <- readTVar memcachedLimit let hm' = HashMap.insertWith (flip const) lK bucket hm writeTVar memcachedLimit $! hm' return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm' - sufficientTokens <- liftIO $ tokenBucketTryAlloc bucket burst rate tokens + sufficientTokens <- liftIO $ Concurrent.tokenBucketTryAlloc bucket burst rate tokens $logDebugS "memcachedLimitedWith" $ "Sufficient tokens: " <> tshow sufficientTokens guard sufficientTokens diff --git a/src/Import.hs b/src/Import.hs index 382be90fe..b387f44ad 100644 --- a/src/Import.hs +++ b/src/Import.hs @@ -8,5 +8,6 @@ import Import.NoFoundation as Import import Utils.SystemMessage as Import import Utils.Metrics as Import import Utils.Files as Import +import Utils.PersistentTokenBucket as Import import Jobs.Types as Import (JobHandler(..)) diff --git a/src/Jobs.hs b/src/Jobs.hs index a794b16a2..c97438c32 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -60,7 +60,7 @@ import Jobs.Handler.TransactionLog import Jobs.Handler.SynchroniseLdap import Jobs.Handler.PruneInvitations import Jobs.Handler.ChangeUserDisplayEmail -import Jobs.Handler.PruneFiles +import Jobs.Handler.Files import Jobs.HealthReport diff --git a/src/Jobs/Crontab.hs b/src/Jobs/Crontab.hs index aebc11ad1..9dcb04933 100644 --- a/src/Jobs/Crontab.hs +++ b/src/Jobs/Crontab.hs @@ -78,6 +78,16 @@ determineCrontab = execWriterT $ do , cronNotAfter = Right CronNotScheduled } + whenIsJust (appInjectFiles <* appUploadCacheConf) $ \iInterval -> + tell $ HashMap.singleton + (JobCtlQueue JobInjectFiles) + Cron + { cronInitial = CronAsap + , cronRepeat = CronRepeatScheduled CronAsap + , cronRateLimit = iInterval + , cronNotAfter = Right CronNotScheduled + } + tell . flip foldMap universeF $ \kind -> case appHealthCheckInterval kind of Just int -> HashMap.singleton diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs new file mode 100644 index 000000000..4ed1d10d3 --- /dev/null +++ b/src/Jobs/Handler/Files.hs @@ -0,0 +1,117 @@ +module Jobs.Handler.Files + ( dispatchJobPruneSessionFiles + , dispatchJobPruneUnreferencedFiles + , dispatchJobInjectFiles + ) where + +import Import hiding (matching) + +import Database.Persist.Sql (deleteWhereCount) + +import qualified Database.Esqueleto as E +import qualified Database.Esqueleto.Utils as E + +import qualified Data.Conduit.Combinators as C +import qualified Data.Conduit.List as C (mapMaybe) + +import Handler.Utils.Minio +import qualified Network.Minio as Minio + +import qualified Crypto.Hash as Crypto +import qualified Data.ByteString.Base64.URL as Base64 + + +dispatchJobPruneSessionFiles :: JobHandler UniWorX +dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do + now <- liftIO getCurrentTime + expires <- getsYesod $ view _appSessionFilesExpire + n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] + $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|] + + + +fileReferences :: E.SqlExpr (E.Value FileContentReference) -> [E.SqlQuery ()] +fileReferences (E.just -> fHash) + = [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash + , E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash + , E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash + , E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash + , E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash + , E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash + , E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash + , E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash + ] + + + +dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX +dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do + Sum n <- runConduit $ getCandidates + .| C.mapAccumWhileM tbAccum () + .| C.mapM (\fRef -> Sum <$> deleteWhereCount [FileContentHash ==. fRef]) + .| C.fold + $logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|] + where + getCandidates = E.selectSource . E.from $ \fileContent -> do + E.where_ . E.not_ . E.any E.exists $ fileReferences (fileContent E.^. FileContentHash) + return $ ( fileContent E.^. FileContentHash + , E.length_ $ fileContent E.^. FileContentContent + ) + + tbAccum :: (E.Value FileContentReference, E.Value Word64) + -> () + -> DB (Either () ((), FileContentReference)) + tbAccum (E.Value fRef, E.Value fSize) () + = bool (Left ()) (Right ((), fRef)) <$> persistentTokenBucketTryAlloc' TokenBucketPruneFiles fSize + + +dispatchJobInjectFiles :: JobHandler UniWorX +dispatchJobInjectFiles = JobHandlerAtomic . hoist lift . maybeT (return ()) $ do + uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket + + let + extractReference (Minio.ListItemObject oi) + | Right bs <- Base64.decodeUnpadded . encodeUtf8 $ Minio.oiObject oi + , Just fRef <- Crypto.digestFromByteString bs + = Just (oi, fRef) + extractReference _ = Nothing + + tbAccum :: (Minio.ObjectInfo, FileContentReference) + -> () + -> DB (Either () ((), (Minio.Object, FileContentReference))) + tbAccum (oi, fRef) () + = bool (Left ()) (Right ((), (Minio.oiObject oi, fRef))) <$> persistentTokenBucketTryAlloc' TokenBucketInjectFiles (Minio.oiSize oi) + + injectOrDelete :: (Minio.Object, FileContentReference) + -> DB (Sum Int64, Sum Int64, Sum Int64) -- ^ Deleted, Injected, Existed + injectOrDelete (obj, fRef) = maybeT (return mempty) $ do + isReferenced <- lift . E.selectExists . E.where_ . E.any E.exists . fileReferences $ E.val fRef + res <- if | isReferenced -> do + alreadyInjected <- lift $ exists [ FileContentHash ==. fRef ] + if | alreadyInjected -> return (mempty, mempty, Sum 1) + | otherwise -> do + content <- (hoistMaybe =<<) . runAppMinio . runMaybeT $ do + objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions + lift . runConduit $ Minio.gorObjectStream objRes .| C.fold + lift $ (mempty, Sum 1, mempty) <$ insert (FileContent fRef content) + | otherwise -> return (Sum 1, mempty, mempty) + runAppMinio . maybeT (return ()) . catchIfMaybeT isDoesNotExist $ Minio.removeObject uploadBucket obj + return res + where isDoesNotExist :: HttpException -> Bool + isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _)) + = responseStatus resp == notFound404 + isDoesNotExist _ = False + + (Sum del, Sum inj, Sum exc) <- + runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) + .| C.mapMaybe extractReference + .| transPipe lift (C.mapAccumWhileM tbAccum ()) + .| transPipe lift (C.mapM injectOrDelete) + .| C.fold + + when (del > 0) $ + $logInfoS "InjectFiles" [st|Deleted #{del} unreferenced files from upload cache|] + when (exc > 0) $ + $logInfoS "InjectFiles" [st|Deleted #{exc} files from upload cache because they were already referenced|] + when (inj > 0) $ + $logInfoS "InjectFiles" [st|Injected #{inj} files from upload cache into database|] diff --git a/src/Jobs/Handler/PruneFiles.hs b/src/Jobs/Handler/PruneFiles.hs deleted file mode 100644 index 9fcb84805..000000000 --- a/src/Jobs/Handler/PruneFiles.hs +++ /dev/null @@ -1,38 +0,0 @@ -module Jobs.Handler.PruneFiles - ( dispatchJobPruneSessionFiles - , dispatchJobPruneUnreferencedFiles - ) where - -import Import hiding (matching) - -import Database.Persist.Sql (deleteWhereCount) - -import qualified Database.Esqueleto as E -import qualified Database.Esqueleto.Utils as E - - -dispatchJobPruneSessionFiles :: JobHandler UniWorX -dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do - now <- liftIO getCurrentTime - expires <- getsYesod $ view _appSessionFilesExpire - n <- deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] - $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|] - - -dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX -dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do - n <- E.deleteCount . E.from $ \fileContent -> - E.where_ . E.not_ . E.any E.exists $ references fileContent - $logInfoS "PruneUnreferencedFiles" [st|Deleted #{n} unreferenced files|] - where - references :: E.SqlExpr (Entity FileContent) -> [E.SqlQuery ()] - references (E.just . (E.^. FileContentHash) -> fHash) = - [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash - , E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash - , E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash - , E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash - , E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash - , E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash - , E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash - , E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash - ] diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 7cffea352..b7fdba876 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -80,6 +80,7 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica } | JobPruneSessionFiles | JobPruneUnreferencedFiles + | JobInjectFiles deriving (Eq, Ord, Show, Read, Generic, Typeable) data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId } | NotificationSheetActive { nSheet :: SheetId } diff --git a/src/Model/Types/Misc.hs b/src/Model/Types/Misc.hs index ea7167d48..6e33c53dc 100644 --- a/src/Model/Types/Misc.hs +++ b/src/Model/Types/Misc.hs @@ -11,6 +11,7 @@ module Model.Types.Misc ) where import Import.NoModel +import Model.Types.TH.PathPiece import Data.Maybe (fromJust) @@ -28,6 +29,8 @@ import Database.Persist.Sql (PersistFieldSql(..)) import Utils.Lens.TH +import Web.HttpApiData + data StudyFieldType = FieldPrimary | FieldSecondary deriving (Eq, Ord, Enum, Show, Read, Bounded, Generic) @@ -257,3 +260,19 @@ instance Csv.ToField Sex where toField = Csv.toField . toPathPiece instance Csv.FromField Sex where parseField = maybe (fail "Could not parse Field of type Sex") return . fromPathPiece <=< Csv.parseField + + +data TokenBucketIdent = TokenBucketInjectFiles + | TokenBucketPruneFiles + deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) + deriving anyclass (Universe, Finite, Hashable) + +nullaryPathPiece ''TokenBucketIdent $ camelToPathPiece' 2 +pathPieceJSON ''TokenBucketIdent +pathPieceJSONKey ''TokenBucketIdent +derivePersistFieldPathPiece ''TokenBucketIdent + +instance ToHttpApiData TokenBucketIdent where + toUrlPiece = toPathPiece +instance FromHttpApiData TokenBucketIdent where + parseUrlPiece = maybe (Left "Could not parse TokenBucketIdent") Right . fromPathPiece diff --git a/src/Settings.hs b/src/Settings.hs index c414d8db0..8a3995342 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -168,12 +168,15 @@ data AppSettings = AppSettings , appUploadCacheConf :: Maybe Minio.ConnectInfo , appUploadCacheBucket :: Minio.Bucket + , appInjectFiles :: Maybe NominalDiffTime , appFavouritesQuickActionsBurstsize , appFavouritesQuickActionsAvgInverseRate :: Word64 , appFavouritesQuickActionsTimeout :: DiffTime , appFavouritesQuickActionsCacheTTL :: Maybe DiffTime + , appPersistentTokenBuckets :: TokenBucketIdent -> TokenBucketConf + , appInitialInstanceID :: Maybe (Either FilePath UUID) , appRibbon :: Maybe Text } deriving Show @@ -293,6 +296,16 @@ data SmtpAuthConf = SmtpAuthConf , smtpAuthPassword :: HaskellNet.Password } deriving (Show) +data TokenBucketConf = TokenBucketConf + { tokenBucketDepth :: Word64 + , tokenBucketInvRate :: NominalDiffTime + , tokenBucketInitialValue :: Int64 + } deriving (Eq, Ord, Show, Generic, Typeable) + +deriveJSON defaultOptions + { fieldLabelModifier = camelToPathPiece' 2 + } ''TokenBucketConf + deriveJSON defaultOptions { constructorTagModifier = camelToPathPiece' 2 , fieldLabelModifier = camelToPathPiece' 2 @@ -489,6 +502,7 @@ instance FromJSON AppSettings where appSessionFilesExpire <- o .: "session-files-expire" appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files" + appInjectFiles <- o .:? "inject-files" appMaximumContentLength <- o .: "maximum-content-length" @@ -536,6 +550,8 @@ instance FromJSON AppSettings where appFavouritesQuickActionsTimeout <- o .: "favourites-quick-actions-timeout" appFavouritesQuickActionsCacheTTL <- o .: "favourites-quick-actions-cache-ttl" + appPersistentTokenBuckets <- o .: "token-buckets" + appUploadCacheConf <- assertM (not . null . Minio.connectHost) <$> o .:? "upload-cache" appUploadCacheBucket <- o .: "upload-cache-bucket" diff --git a/src/Utils/PersistentTokenBucket.hs b/src/Utils/PersistentTokenBucket.hs new file mode 100644 index 000000000..3f6d3aa03 --- /dev/null +++ b/src/Utils/PersistentTokenBucket.hs @@ -0,0 +1,56 @@ +module Utils.PersistentTokenBucket + ( TokenBucketSettings(..) + , persistentTokenBucketTryAlloc' + , persistentTokenBucketTryAlloc + ) where + +import Import.NoFoundation + + +data TokenBucketSettings = TokenBucketSettings + { tbsIdent :: TokenBucketIdent + , tbsDepth :: Word64 + , tbsInvRate :: NominalDiffTime + , tbsInitialValue :: Int64 + } + + +persistentTokenBucketTryAlloc' :: (MonadHandler m, HasAppSettings (HandlerSite m), Integral a) + => TokenBucketIdent + -> a + -> SqlPersistT m Bool +persistentTokenBucketTryAlloc' tbsIdent tokens = do + TokenBucketConf{..} <- getsYesod $ views _appPersistentTokenBuckets ($ tbsIdent) + flip persistentTokenBucketTryAlloc tokens TokenBucketSettings + { tbsIdent + , tbsDepth = tokenBucketDepth + , tbsInvRate = tokenBucketInvRate + , tbsInitialValue = tokenBucketInitialValue + } + + +persistentTokenBucketTryAlloc :: (MonadIO m, Integral a) => TokenBucketSettings -> a -> SqlPersistT m Bool +persistentTokenBucketTryAlloc TokenBucketSettings{..} (fromIntegral -> tokens) = do + now <- liftIO getCurrentTime + TokenBucket{..} <- do + existingBucket <- get $ TokenBucketKey tbsIdent + case existingBucket of + Just bkt -> return bkt + Nothing -> do + let bkt = TokenBucket + { tokenBucketIdent = tbsIdent + , tokenBucketLastValue = tbsInitialValue + , tokenBucketLastAccess = now + } + insert_ bkt + return bkt + let currentValue = fromIntegral tbsDepth `min` tokenBucketLastValue + tokenIncrease + deltaT = now `diffUTCTime` tokenBucketLastAccess + (tokenIncrease, deltaT') + | n < 0 = (pred n, (1 + f) * tbsInvRate) + | otherwise = (n, f * tbsInvRate) + where (n, f) = properFraction $ deltaT / tbsInvRate + if | currentValue < 0 -> return False + | otherwise -> do + update (TokenBucketKey tbsIdent) [ TokenBucketLastValue =. currentValue - tokens, TokenBucketLastAccess =. addUTCTime (- deltaT') now ] + return True