feat(files): chunk prune-unreferenced-files finer
This commit is contained in:
parent
8f608c1955
commit
58c2420458
@ -35,7 +35,8 @@ bearer-expiration: 604800
|
||||
bearer-encoding: HS256
|
||||
maximum-content-length: "_env:MAX_UPLOAD_SIZE:134217728"
|
||||
session-files-expire: 3600
|
||||
prune-unreferenced-files: 28800
|
||||
prune-unreferenced-files-within: 57600
|
||||
prune-unreferenced-files-interval: 3600
|
||||
keep-unreferenced-files: 86400
|
||||
health-check-interval:
|
||||
matching-cluster-config: "_env:HEALTHCHECK_INTERVAL_MATCHING_CLUSTER_CONFIG:600"
|
||||
|
||||
@ -48,15 +48,6 @@ determineCrontab = execWriterT $ do
|
||||
, cronRateLimit = appJobCronInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
whenIsJust appPruneUnreferencedFiles $ \pInterval ->
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobPruneUnreferencedFiles)
|
||||
Cron
|
||||
{ cronInitial = CronAsap
|
||||
, cronRepeat = CronRepeatScheduled CronAsap
|
||||
, cronRateLimit = pInterval
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
|
||||
oldestInvitationMUTC <- lift $ preview (_head . _entityVal . _invitationExpiresAt . _Just) <$> selectList [InvitationExpiresAt !=. Nothing] [Asc InvitationExpiresAt, LimitTo 1]
|
||||
whenIsJust oldestInvitationMUTC $ \oldestInvUTC -> tell $ HashMap.singleton
|
||||
@ -138,33 +129,30 @@ determineCrontab = execWriterT $ do
|
||||
, cronNotAfter = Right CronNotScheduled
|
||||
}
|
||||
|
||||
let
|
||||
getNextIntervals within interval = do
|
||||
now <- liftIO getPOSIXTime
|
||||
return $ do
|
||||
let
|
||||
epochInterval = within / 2
|
||||
(currEpoch, epochNow) = now `divMod'` epochInterval
|
||||
currInterval = epochNow `div'` interval
|
||||
numIntervals = floor $ epochInterval / interval
|
||||
n = ceiling $ 4 * appJobCronInterval / interval
|
||||
i <- [ negate (ceiling $ n % 2) .. ceiling $ n % 2 ]
|
||||
let
|
||||
((+ currEpoch) -> nextEpoch, nextInterval) = (currInterval + i) `divMod` numIntervals
|
||||
nextIntervalTime
|
||||
= posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval
|
||||
return (nextEpoch, nextInterval, nextIntervalTime, numIntervals)
|
||||
|
||||
if
|
||||
| is _Just appLdapConf
|
||||
, is _Just appLdapConf
|
||||
, Just syncWithin <- appSynchroniseLdapUsersWithin
|
||||
-> do
|
||||
now <- liftIO getPOSIXTime
|
||||
let
|
||||
epochInterval = syncWithin / 2
|
||||
interval = appSynchroniseLdapUsersInterval
|
||||
nextIntervals <- getNextIntervals syncWithin appSynchroniseLdapUsersInterval
|
||||
|
||||
(ldapEpoch, epochNow) = now `divMod'` epochInterval
|
||||
ldapInterval = epochNow `div'` interval
|
||||
numIntervals = floor $ epochInterval / interval
|
||||
|
||||
nextIntervals = do
|
||||
let
|
||||
n = ceiling $ 4 * appJobCronInterval / appSynchroniseLdapUsersInterval
|
||||
i <- [negate (ceiling $ n % 2) .. ceiling $ n % 2]
|
||||
let
|
||||
((+ ldapEpoch) -> nextEpoch, nextInterval) = (ldapInterval + i) `divMod` numIntervals
|
||||
nextIntervalTime
|
||||
= posixSecondsToUTCTime $ fromInteger nextEpoch * epochInterval + fromInteger nextInterval * interval
|
||||
return (nextEpoch, nextInterval, nextIntervalTime)
|
||||
|
||||
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime) -> do
|
||||
$logDebugS "SynchroniseLdap" [st|currentTime: #{tshow ldapEpoch}.#{tshow epochNow}; upcomingSync: #{tshow nextEpoch}.#{tshow (fromInteger nextInterval * interval)}; upcomingData: #{tshow (numIntervals, nextEpoch, nextInterval)}|]
|
||||
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobSynchroniseLdap
|
||||
{ jEpoch = fromInteger nextEpoch
|
||||
@ -180,6 +168,22 @@ determineCrontab = execWriterT $ do
|
||||
| otherwise
|
||||
-> return ()
|
||||
|
||||
whenIsJust appPruneUnreferencedFilesWithin $ \within -> do
|
||||
nextIntervals <- getNextIntervals within appPruneUnreferencedFilesInterval
|
||||
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
|
||||
tell $ HashMap.singleton
|
||||
(JobCtlQueue JobPruneUnreferencedFiles
|
||||
{ jEpoch = fromInteger nextEpoch
|
||||
, jNumIterations = fromInteger numIntervals
|
||||
, jIteration = fromInteger nextInterval
|
||||
}
|
||||
)
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
|
||||
, cronRepeat = CronRepeatNever
|
||||
, cronRateLimit = appPruneUnreferencedFilesInterval
|
||||
, cronNotAfter = Left within
|
||||
}
|
||||
|
||||
let
|
||||
sheetJobs (Entity nSheet Sheet{..}) = do
|
||||
|
||||
@ -4,7 +4,7 @@ module Jobs.Handler.Files
|
||||
, dispatchJobInjectFiles
|
||||
) where
|
||||
|
||||
import Import hiding (matching)
|
||||
import Import hiding (matching, maximumBy, init)
|
||||
|
||||
import Database.Persist.Sql (deleteWhereCount)
|
||||
|
||||
@ -18,6 +18,18 @@ import qualified Data.Conduit.List as C (mapMaybe, unfoldM)
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import Crypto.Hash (hashDigestSize, digestFromByteString)
|
||||
|
||||
import Data.List ((!!), unfoldr, maximumBy, init, genericLength)
|
||||
import qualified Data.ByteString as ByteString
|
||||
import Data.Bits (Bits(shiftR))
|
||||
|
||||
import qualified Data.Map.Strict as Map
|
||||
|
||||
import Control.Monad.Random.Lazy
|
||||
import System.Random.Shuffle (shuffleM)
|
||||
import System.IO.Unsafe
|
||||
|
||||
|
||||
dispatchJobPruneSessionFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneSessionFiles = JobHandlerAtomic . hoist lift $ do
|
||||
@ -45,12 +57,74 @@ fileReferences (E.just -> fHash)
|
||||
]
|
||||
|
||||
|
||||
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
|
||||
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
|
||||
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
|
||||
|
||||
dispatchJobPruneUnreferencedFiles :: JobHandler UniWorX
|
||||
dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX
|
||||
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomic . hoist lift $ do
|
||||
now <- liftIO getCurrentTime
|
||||
interval <- fmap (fmap $ max 0) . getsYesod $ view _appPruneUnreferencedFiles
|
||||
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
|
||||
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
|
||||
|
||||
let
|
||||
chunkHashBytes :: forall h.
|
||||
( Unwrapped FileContentChunkReference ~ Digest h )
|
||||
=> Integer
|
||||
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
|
||||
chunkHashBits = chunkHashBytes * 8
|
||||
base :: Integer
|
||||
base = 2 ^ chunkHashBits
|
||||
intervals :: [Integer]
|
||||
-- | Exclusive upper bounds
|
||||
intervals
|
||||
| numIterations <= 0 = pure base
|
||||
| otherwise = go protoIntervals ^.. folded . _1
|
||||
where
|
||||
go [] = []
|
||||
go ints
|
||||
| maximumOf (folded . _1) ints == Just base = ints
|
||||
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
|
||||
where
|
||||
closest = maximumBy (comparing $ view _2) ints
|
||||
(lts, geqs) = partition (((>) `on` view _1) closest) ints
|
||||
gts = filter (((<) `on` view _1) closest) geqs
|
||||
-- | Exclusive upper bounds
|
||||
protoIntervals :: [(Integer, Integer)]
|
||||
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
|
||||
| i <- [1 .. toInteger numIterations]
|
||||
]
|
||||
|
||||
intervalsDgsts' = map (over both $ toDigest <=< assertM' (> 0)) $ zip (0 : init intervals) intervals
|
||||
|
||||
toDigest :: Integer -> Maybe FileContentChunkReference
|
||||
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
|
||||
where step i
|
||||
| i <= 0 = Nothing
|
||||
| otherwise = Just (fromIntegral i, i `shiftR` 8)
|
||||
pad bs
|
||||
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
|
||||
| otherwise = pad $ ByteString.cons 0 bs
|
||||
|
||||
intervalsDgsts <- atomically $ do
|
||||
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
|
||||
case Map.lookup numIterations cachedDgsts of
|
||||
Just c -> return c
|
||||
Nothing -> do
|
||||
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
|
||||
return intervalsDgsts'
|
||||
|
||||
let
|
||||
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
|
||||
|
||||
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
|
||||
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
|
||||
chunkIdFilter cRef = E.and $ catMaybes
|
||||
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
|
||||
, maxBoundDgst <&> \b -> cRef E.<. E.val b
|
||||
]
|
||||
|
||||
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
|
||||
|
||||
E.insertSelectWithConflict
|
||||
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
|
||||
@ -58,22 +132,25 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
|
||||
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
|
||||
)
|
||||
(\current excluded ->
|
||||
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
|
||||
)
|
||||
|
||||
E.delete . E.from $ \fileContentChunkUnreferenced ->
|
||||
E.delete . E.from $ \fileContentChunkUnreferenced -> do
|
||||
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
|
||||
let
|
||||
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
|
||||
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
|
||||
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
|
||||
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
|
||||
|
||||
@ -88,7 +165,7 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
|
||||
Sum deletedEntries <- runConduit $
|
||||
getEntryCandidates
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 3)) interval
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| C.mapM deleteEntry
|
||||
.| C.fold
|
||||
|
||||
@ -100,6 +177,8 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
|
||||
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
|
||||
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
|
||||
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
|
||||
|
||||
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
|
||||
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
|
||||
@ -112,7 +191,7 @@ dispatchJobPruneUnreferencedFiles = JobHandlerAtomic . hoist lift $ do
|
||||
|
||||
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
|
||||
getChunkCandidates
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 3)) interval
|
||||
.| takeWhileTime (interval / 3)
|
||||
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
|
||||
.| C.mapM deleteChunk
|
||||
.| C.fold
|
||||
|
||||
@ -86,7 +86,10 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica
|
||||
, jDisplayEmail :: UserEmail
|
||||
}
|
||||
| JobPruneSessionFiles
|
||||
| JobPruneUnreferencedFiles
|
||||
| JobPruneUnreferencedFiles { jNumIterations
|
||||
, jEpoch
|
||||
, jIteration :: Natural
|
||||
}
|
||||
| JobInjectFiles
|
||||
| JobPruneFallbackPersonalisedSheetFilesKeys
|
||||
deriving (Eq, Ord, Show, Read, Generic, Typeable)
|
||||
|
||||
@ -140,8 +140,10 @@ data AppSettings = AppSettings
|
||||
, appLdapReTestFailover :: DiffTime
|
||||
|
||||
, appSessionFilesExpire :: NominalDiffTime
|
||||
, appPruneUnreferencedFiles :: Maybe NominalDiffTime
|
||||
, appKeepUnreferencedFiles :: NominalDiffTime
|
||||
|
||||
, appPruneUnreferencedFilesWithin :: Maybe NominalDiffTime
|
||||
, appPruneUnreferencedFilesInterval :: NominalDiffTime
|
||||
|
||||
, appInitialLogSettings :: LogSettings
|
||||
|
||||
@ -472,11 +474,13 @@ instance FromJSON AppSettings where
|
||||
appLdapReTestFailover <- o .: "ldap-re-test-failover"
|
||||
|
||||
appSessionFilesExpire <- o .: "session-files-expire"
|
||||
appPruneUnreferencedFiles <- o .:? "prune-unreferenced-files"
|
||||
appKeepUnreferencedFiles <- o .:? "keep-unreferenced-files" .!= 0
|
||||
appInjectFiles <- o .:? "inject-files"
|
||||
appFileUploadDBChunksize <- o .: "file-upload-db-chunksize"
|
||||
|
||||
appPruneUnreferencedFilesWithin <- o .: "prune-unreferenced-files-within"
|
||||
appPruneUnreferencedFilesInterval <- o .: "prune-unreferenced-files-interval"
|
||||
|
||||
appMaximumContentLength <- o .: "maximum-content-length"
|
||||
|
||||
appReloadTemplates <- o .:? "reload-templates" .!= defaultDev
|
||||
|
||||
Loading…
Reference in New Issue
Block a user