fradrive/src/Jobs/Handler/Files.hs

408 lines
22 KiB
Haskell

{-# LANGUAGE BangPatterns #-}
module Jobs.Handler.Files
( dispatchJobPruneSessionFiles
, dispatchJobPruneUnreferencedFiles
, dispatchJobInjectFiles, dispatchJobRechunkFiles
, dispatchJobDetectMissingFiles
) where
import Import hiding (matching, maximumBy, init)
import Database.Persist.Sql (deleteWhereCount)
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.PostgreSQL as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (mapMaybe, unfoldM)
import Handler.Utils.Minio
import qualified Network.Minio as Minio
import Crypto.Hash (hashDigestSize, digestFromByteString)
import qualified Data.Map.Strict as Map
import System.IO.Unsafe
import Handler.Utils.Files (sourceFileDB)
import Control.Monad.Logger (askLoggerIO, runLoggingT)
import System.Clock
import qualified Data.Set as Set
import qualified Data.Sequence as Seq
import Jobs.Queue (YesodJobDB)
import Jobs.Handler.Intervals.Utils
import Data.IntervalMap.Strict (IntervalMap)
import qualified Data.IntervalMap.Strict as IntervalMap
import Control.Concurrent.STM.TVar (stateTVar)
import qualified Data.Foldable as F
import qualified Control.Monad.State.Class as State
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
now <- liftIO getCurrentTime
expires <- getsYesod $ view _appSessionFilesExpire
deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
fin n = $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
fileReferences :: E.SqlExpr (E.Value FileContentReference) -> [E.SqlQuery ()]
fileReferences (E.just -> fHash)
= [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash
, E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash
, E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash
, E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash
, E.from $ \sheetFile -> E.where_ $ sheetFile E.^. PersonalisedSheetFileContent E.==. fHash
, E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash
, E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
, E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash
, E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash
E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash)
]
workflowFileReferences :: MonadResource m => Map Text (ConduitT () FileContentReference (SqlPersistT m) ())
workflowFileReferences = Map.fromList $ over (traverse . _1) nameToPathPiece
[ (''SharedWorkflowGraph, E.selectSource (E.from $ pure . (E.^. SharedWorkflowGraphGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue))
, (''WorkflowWorkflow, E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowState )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue))
]
dispatchJobDetectMissingFiles :: JobHandler UniWorX
dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin
where
act :: YesodJobDB UniWorX (Map Text (NonNull (Set FileContentReference)))
act = hoist lift $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
missingDb <- runConduit . execStateC Map.empty $ do
let insertRef refKind ref = State.modify' $ Map.alter (Just . Set.insert ref . fromMaybe Set.empty) refKind
iforM_ trackedReferences $ \refKind refQuery -> do
let fileReferencesQuery = do
ref <- refQuery
E.where_ . E.not_ $ E.isNothing ref
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref
E.distinctOnOrderBy [E.asc ref] $ return ref
transPipe lift (E.selectSource fileReferencesQuery) .| C.mapMaybe E.unValue .| C.mapM_ (insertRef refKind)
iforM_ workflowFileReferences $ \refKind refSource ->
transPipe lift (refSource .| C.filterM (\ref -> not <$> exists [FileContentEntryHash ==. ref])) .| C.mapM_ (insertRef refKind)
let allMissingDb :: Set Minio.Object
allMissingDb = setOf (folded . folded . re minioFileReference) missingDb
filterMissingDb :: forall m. Monad m
=> Set Minio.Object
-> ConduitT Minio.ListItem (Set Minio.Object) m ()
filterMissingDb remaining = maybeT (yield remaining) $ do
nextMinio <- MaybeT await
remaining' <- case nextMinio of
Minio.ListItemObject oi -> do
let (missingMinio, remaining') = Set.split (Minio.oiObject oi) remaining
lift $ yield missingMinio
return remaining'
_other -> return remaining
lift $ filterMissingDb remaining'
allMissingMinio <- maybeT (return $ fold missingDb) . hoistMaybeM . runAppMinio . runMaybeT . runConduit $
transPipe lift (Minio.listObjects uploadBucket Nothing True)
.| filterMissingDb allMissingDb
.| C.foldMapE (setOf minioFileReference)
return $ Map.mapMaybe (fromNullable . Set.intersection allMissingMinio) missingDb
fin :: Map Text (NonNull (Set FileContentReference)) -> Handler ()
fin missingCounts = do
imapM_ observeMissingFiles $ olength <$> missingCounts
iforM_ missingCounts $ \refIdent missingFiles
-> let missingRefs = unlines . map (views _Wrapped tshow) . Set.toList $ toNullable missingFiles
newl :: Text
newl = "\n"
in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}#{newl}#{missingRefs}|]
when (Map.null missingCounts) $
$logInfoS "MissingFiles" [st|No missing files|]
trackedReferences = Map.fromList $ over (traverse . _1) nameToPathPiece
[ (''CourseApplicationFile, E.from $ \appFile -> return $ appFile E.^. CourseApplicationFileContent )
, (''MaterialFile, E.from $ \matFile -> return $ matFile E.^. MaterialFileContent )
, (''CourseNewsFile, E.from $ \newsFile -> return $ newsFile E.^. CourseNewsFileContent )
, (''SheetFile, E.from $ \sheetFile -> return $ sheetFile E.^. SheetFileContent )
, (''PersonalisedSheetFile, E.from $ \personalisedSheetFile -> return $ personalisedSheetFile E.^. PersonalisedSheetFileContent )
, (''CourseAppInstructionFile, E.from $ \appInstr -> return $ appInstr E.^. CourseAppInstructionFileContent)
, (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent )
, (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent )
, (''AllocationMatching, E.from $ \matching -> return . E.just $ matching E.^. AllocationMatchingLog)
]
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
now <- liftIO getCurrentTime
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
let
chunkHashBytes :: forall h.
( Unwrapped FileContentChunkReference ~ Digest h )
=> Integer
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
(minBoundDgst, maxBoundDgst) <- currentIntervalCached pruneUnreferencedFilesIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) numIterations epoch iteration
let
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
chunkIdFilter cRef = E.and $ catMaybes
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
, maxBoundDgst <&> \b -> cRef E.<. E.val b
]
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
E.insertSelectWithConflict
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
(E.from $ \fileContentChunk -> do
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
return $ E.any E.exists (fileReferences $ fileContentEntry E.^. FileContentEntryHash)
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
)
(\current excluded ->
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
)
E.delete . E.from $ \fileContentChunkUnreferenced -> do
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
return $ E.any E.exists (fileReferences $ fileContentEntry E.^. FileContentEntryHash)
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
let unmarkWorkflowFiles (otoList -> fRefs) = E.delete . E.from $ \fileContentChunkUnreferenced -> do
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
chunkSize = 100
in runConduit $ sequence_ workflowFileReferences .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkWorkflowFiles
let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
return $ fileContentEntry E.^. FileContentEntryHash
deleteEntry :: _ -> DB (Sum Natural)
deleteEntry (E.Value fRef) =
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
Sum deletedEntries <- runConduit $
getEntryCandidates
.| takeWhileTime (interval / 3)
.| C.mapM deleteEntry
.| C.fold
let
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
)
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
deleteChunk (E.Value cRef, E.Value size) = do
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
getChunkCandidates
.| takeWhileTime (interval / 3)
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
.| C.mapM deleteChunk
.| C.fold
return (deletedEntries, deletedChunks, deletedChunkSize)
fin (deletedEntries, deletedChunks, deletedChunkSize) = do
observeDeletedUnreferencedFiles deletedEntries
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
observeDeletedUnreferencedChunks deletedChunks deletedChunkSize
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{textBytes deletedChunkSize})|]
dispatchJobInjectFiles :: JobHandler UniWorX
dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
interval <- getsYesod $ view _appInjectFiles
now <- liftIO getCurrentTime
let
extractInhibited :: IntervalMap UTCTime (Set FileContentReference)
-> (Set FileContentReference, IntervalMap UTCTime (Set FileContentReference))
extractInhibited cState = (F.fold current, IntervalMap.union current upcoming)
where
(_, current, upcoming) = IntervalMap.splitIntersecting cState $ IntervalMap.OpenInterval (addUTCTime (-2) now) (addUTCTime 2 now)
inhibited <- atomically . flip stateTVar extractInhibited =<< getsYesod appFileInjectInhibit
let
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
extractReference _ = Nothing
injectOrDelete :: (Minio.ObjectInfo, FileContentReference)
-> Handler (Sum Natural, Sum Word64)
injectOrDelete (objInfo, fRef) = do
let obj = Minio.oiObject objInfo
sz = fromIntegral $ Minio.oiSize objInfo
fRef' <- runDB $ do
logger <- askLoggerIO
chunkVar <- newEmptyTMVarIO
dbAsync <- allocateLinkedAsync $ do
let report = go 0 0 Nothing =<< liftIO (getTime Monotonic)
where
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString ByteString m ()
go c accsz lastReport startT = do
currT <- liftIO $ getTime Monotonic
chunk' <- await
whenIsJust chunk' $ \chunk -> do
let csz = fromIntegral $ olength chunk
!c' = succ c
!sz' = accsz + csz
!lastReport'
| toRational currT - toRational (fromMaybe startT lastReport) > 5 = Just currT
| otherwise = lastReport
when (csz > 0) $ do
let p :: Centi
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
eta :: Maybe Integer
eta = do
accsz' <- assertM' (/= 0) accsz
return . ceiling $ (toRational currT - toRational startT) / fromIntegral accsz' * (fromIntegral sz - fromIntegral accsz)
when (lastReport' /= lastReport || sz' >= fromIntegral sz) $
flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes
[ pure [st|Sinking chunk ##{tshow c}: #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
, eta <&> \eta' -> [st| ETA #{textDuration eta'}|]
, pure "..."
]
yield chunk
go c' sz' lastReport' startT
atomically $ isEmptyTMVar chunkVar >>= guard . not
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () .| persistentTokenBucketRateLimit' TokenBucketInjectFiles olength .| report
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just)
return True
if
| not didSend -> Nothing <$ cancel dbAsync
| otherwise -> do
atomically $ putTMVar chunkVar Nothing
Just <$> waitAsync dbAsync
let matchesFRef = is _Just $ assertM (== fRef) fRef'
if | matchesFRef ->
maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj
| otherwise ->
$logErrorS "InjectFiles" [st|Minio object #{obj}'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|]
return . bool mempty (Sum 1, Sum sz) $ is _Just fRef'
(Sum injectedFiles, Sum injectedSize) <-
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
.| C.mapMaybe extractReference
.| C.filter (views _2 (`Set.notMember` inhibited))
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1)
.| C.mapM (lift . injectOrDelete)
.| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeInjectedFiles inj sz)
.| C.fold
$logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|]
data RechunkFileException
= RechunkFileExceptionHashMismatch
{ oldHash, newHash :: FileContentReference }
deriving (Eq, Ord, Show, Generic, Typeable)
deriving anyclass (Exception)
dispatchJobRechunkFiles :: JobHandler UniWorX
dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
interval <- getsYesod $ view _appRechunkFiles
let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
return ( fileContentEntry E.^. FileContentEntryHash
, size
)
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
rechunkFile fRef sz = do
fRef' <- sinkFileDB True $ sourceFileDB fRef
unless (fRef == fRef') $
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
return (Sum 1, Sum sz)
(Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $
getEntryCandidates
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
.| C.mapM (uncurry rechunkFile)
.| C.fold
return (rechunkedFiles, rechunkedSize)
fin (rechunkedFiles, rechunkedSize) = do
observeRechunkedFiles rechunkedFiles rechunkedSize
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{textBytes rechunkedSize} bytes)|]