fradrive/src/Jobs/Handler/Files.hs
2020-09-22 13:43:58 +02:00

334 lines
17 KiB
Haskell

{-# LANGUAGE BangPatterns #-}
module Jobs.Handler.Files
( dispatchJobPruneSessionFiles
, dispatchJobPruneUnreferencedFiles
, dispatchJobInjectFiles, dispatchJobRechunkFiles
) where
import Import hiding (matching, maximumBy, init)
import Database.Persist.Sql (deleteWhereCount)
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.PostgreSQL as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (mapMaybe, unfoldM)
import Handler.Utils.Minio
import qualified Network.Minio as Minio
import Crypto.Hash (hashDigestSize, digestFromByteString)
import Data.List ((!!), unfoldr, maximumBy, init, genericLength)
import qualified Data.ByteString as ByteString
import Data.Bits (Bits(shiftR))
import qualified Data.Map.Strict as Map
import Control.Monad.Random.Lazy
import System.Random.Shuffle (shuffleM)
import System.IO.Unsafe
import Handler.Utils.Files (sourceFileDB)
import Control.Monad.Logger (askLoggerIO, runLoggingT)
import System.Clock
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
now <- liftIO getCurrentTime
expires <- getsYesod $ view _appSessionFilesExpire
deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
fin n = $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
fileReferences :: E.SqlExpr (E.Value FileContentReference) -> [E.SqlQuery ()]
fileReferences (E.just -> fHash)
= [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash
, E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash
, E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash
, E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash
, E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash
, E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
, E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash
, E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash
E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash)
]
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
now <- liftIO getCurrentTime
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
let
chunkHashBytes :: forall h.
( Unwrapped FileContentChunkReference ~ Digest h )
=> Integer
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
chunkHashBits = chunkHashBytes * 8
base :: Integer
base = 2 ^ chunkHashBits
intervals :: [Integer]
-- | Exclusive upper bounds
intervals
| numIterations <= 0 = pure base
| otherwise = go protoIntervals ^.. folded . _1
where
go [] = []
go ints
| maximumOf (folded . _1) ints == Just base = ints
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
where
closest = maximumBy (comparing $ view _2) ints
(lts, geqs) = partition (((>) `on` view _1) closest) ints
gts = filter (((<) `on` view _1) closest) geqs
-- | Exclusive upper bounds
protoIntervals :: [(Integer, Integer)]
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
| i <- [1 .. toInteger numIterations]
]
intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals
toDigest :: Integer -> Maybe FileContentChunkReference
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
where step i
| i <= 0 = Nothing
| otherwise = Just (fromIntegral i, i `shiftR` 8)
pad bs
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
| otherwise = pad $ ByteString.cons 0 bs
intervalsDgsts <- atomically $ do
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
case Map.lookup numIterations cachedDgsts of
Just c -> return c
Nothing -> do
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
return intervalsDgsts'
let
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
chunkIdFilter cRef = E.and $ catMaybes
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
, maxBoundDgst <&> \b -> cRef E.<. E.val b
]
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
E.insertSelectWithConflict
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
(E.from $ \fileContentChunk -> do
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
)
(\current excluded ->
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
)
E.delete . E.from $ \fileContentChunkUnreferenced -> do
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
return . E.any E.exists . fileReferences $ fileContentEntry E.^. FileContentEntryHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
return $ fileContentEntry E.^. FileContentEntryHash
deleteEntry :: _ -> DB (Sum Natural)
deleteEntry (E.Value fRef) =
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
Sum deletedEntries <- runConduit $
getEntryCandidates
.| takeWhileTime (interval / 3)
.| C.mapM deleteEntry
.| C.fold
let
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
)
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
deleteChunk (E.Value cRef, E.Value size) = do
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
getChunkCandidates
.| takeWhileTime (interval / 3)
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
.| C.mapM deleteChunk
.| C.fold
return (deletedEntries, deletedChunks, deletedChunkSize)
fin (deletedEntries, deletedChunks, deletedChunkSize) = do
observeDeletedUnreferencedFiles deletedEntries
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
observeDeletedUnreferencedChunks deletedChunks deletedChunkSize
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{textBytes deletedChunkSize})|]
dispatchJobInjectFiles :: JobHandler UniWorX
dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
interval <- getsYesod $ view _appInjectFiles
let
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
extractReference _ = Nothing
injectOrDelete :: (Minio.ObjectInfo, FileContentReference)
-> Handler (Sum Natural, Sum Word64)
injectOrDelete (objInfo, fRef) = do
let obj = Minio.oiObject objInfo
sz = fromIntegral $ Minio.oiSize objInfo
fRef' <- runDB $ do
chunkVar <- newEmptyTMVarIO
dbAsync <- allocateLinkedAsync $ do
atomically $ isEmptyTMVar chunkVar >>= guard . not
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
logger <- askLoggerIO
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
let sendChunks = go 0 0 Nothing . toNanoSecs =<< liftIO (getTime Monotonic)
where
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe Integer -> Integer -> ConduitT ByteString Void m ()
go c accsz lastReport startT = do
currT <- liftIO $ toNanoSecs <$> getTime Monotonic
chunk' <- await
whenIsJust chunk' $ \chunk -> do
let csz = fromIntegral $ olength chunk
!sz' = accsz + csz
p :: Centi
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
!c' = succ c
eta :: Integer
eta = ceiling $ ((toRational currT - toRational startT) / toRational accsz) * toRational (sz - fromIntegral accsz)
!lastReport'
| currT - fromMaybe startT lastReport > 5e9 = Just currT
| otherwise = lastReport
when (lastReport' /= lastReport) $
runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|]
atomically . putTMVar chunkVar $ Just chunk
go c' sz' lastReport' startT
lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks
return True
if
| not didSend -> Nothing <$ cancel dbAsync
| otherwise -> do
atomically $ putTMVar chunkVar Nothing
Just <$> waitAsync dbAsync
let matchesFRef = is _Just $ assertM (== fRef) fRef'
if | matchesFRef ->
maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj
| otherwise ->
$logErrorS "InjectFiles" [st|Minio object #{obj}'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|]
return . bool mempty (Sum 1, Sum sz) $ is _Just fRef'
(Sum injectedFiles, Sum injectedSize) <-
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
.| C.mapMaybe extractReference
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
.| C.mapM (lift . injectOrDelete)
.| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeRechunkedFiles inj sz)
.| C.fold
$logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|]
data RechunkFileException
= RechunkFileExceptionHashMismatch
{ oldHash, newHash :: FileContentReference }
deriving (Eq, Ord, Show, Generic, Typeable)
deriving anyclass (Exception)
dispatchJobRechunkFiles :: JobHandler UniWorX
dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
interval <- getsYesod $ view _appRechunkFiles
let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
return ( fileContentEntry E.^. FileContentEntryHash
, size
)
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
rechunkFile fRef sz = do
fRef' <- sinkFileDB True $ sourceFileDB fRef
unless (fRef == fRef') $
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
return (Sum 1, Sum sz)
(Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $
getEntryCandidates
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
.| C.mapM (uncurry rechunkFile)
.| C.fold
return (rechunkedFiles, rechunkedSize)
fin (rechunkedFiles, rechunkedSize) = do
observeRechunkedFiles rechunkedFiles rechunkedSize
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{textBytes rechunkedSize} bytes)|]