{-# LANGUAGE BangPatterns #-} module Jobs.Handler.Files ( dispatchJobPruneSessionFiles , dispatchJobPruneUnreferencedFiles , dispatchJobInjectFiles, dispatchJobRechunkFiles , dispatchJobDetectMissingFiles ) where import Import hiding (matching, maximumBy, init) import Database.Persist.Sql (deleteWhereCount) import qualified Database.Esqueleto.Legacy as E import qualified Database.Esqueleto.PostgreSQL as E import qualified Database.Esqueleto.Utils as E import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.List as C (mapMaybe, unfoldM) import Handler.Utils.Minio import qualified Network.Minio as Minio import Crypto.Hash (hashDigestSize, digestFromByteString) import qualified Data.Map.Strict as Map import System.IO.Unsafe import Handler.Utils.Files (sourceFileDB) import Control.Monad.Logger (askLoggerIO, runLoggingT) import System.Clock import qualified Data.Set as Set import Jobs.Handler.Intervals.Utils import Data.IntervalMap.Strict (IntervalMap) import qualified Data.IntervalMap.Strict as IntervalMap import Control.Concurrent.STM.TVar (stateTVar) import qualified Data.Foldable as F import qualified Control.Monad.State.Class as State dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin where act = hoist lift $ do now <- liftIO getCurrentTime expires <- getsYesod $ view _appSessionFilesExpire deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ] fin n = $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|] fileReferences :: E.SqlExpr (E.Value FileContentReference) -> [E.SqlQuery ()] fileReferences fHash'@(E.just -> fHash) = [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash , E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash , E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash , E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash , E.from $ \sheetFile -> E.where_ $ sheetFile E.^. PersonalisedSheetFileContent E.==. fHash , E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash , E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash , E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash , E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash , E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash , E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \(fileContentEntry `E.InnerJoin` fileContentChunk) -> do E.on $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkHash E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. fHash' E.where_ $ fileContentChunk E.^. FileContentChunkHash E.==. chunkLock E.^. FileChunkLockHash ] dispatchJobDetectMissingFiles :: JobHandler UniWorX dispatchJobDetectMissingFiles = JobHandlerAtomicDeferrableWithFinalizer act fin where act :: ReaderT SqlReadBackend (HandlerFor UniWorX) (Map Text (NonNull (Set FileContentReference))) act = do uploadBucket <- getsYesod $ view _appUploadCacheBucket missingDb <- runConduit . execStateC Map.empty $ do let insertRef refKind ref = State.modify' $ Map.alter (Just . Set.insert ref . fromMaybe Set.empty) refKind iforM_ trackedReferences $ \refKind refQuery -> do let fileReferencesQuery = do ref <- refQuery E.where_ . E.not_ $ E.isNothing ref E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref E.distinctOnOrderBy [E.asc ref] $ return ref transPipe lift (E.selectSource fileReferencesQuery) .| C.mapMaybe E.unValue .| C.mapM_ (insertRef refKind) let allMissingDb :: Set Minio.Object allMissingDb = setOf (folded . folded . re minioFileReference) missingDb filterMissingDb :: forall m. Monad m => Set Minio.Object -> ConduitT Minio.ListItem (Set Minio.Object) m () filterMissingDb remaining = maybeT (yield remaining) $ do nextMinio <- MaybeT await remaining' <- case nextMinio of Minio.ListItemObject oi -> do let (missingMinio, remaining') = Set.split (Minio.oiObject oi) remaining lift $ yield missingMinio return remaining' _other -> return remaining lift $ filterMissingDb remaining' allMissingMinio <- maybeT (return $ fold missingDb) . hoistMaybeM . runAppMinio . runMaybeT . runConduit $ transPipe lift (Minio.listObjects uploadBucket Nothing True) .| filterMissingDb allMissingDb .| C.foldMapE (setOf minioFileReference) return $ Map.mapMaybe (fromNullable . Set.intersection allMissingMinio) missingDb fin :: Map Text (NonNull (Set FileContentReference)) -> Handler () fin missingCounts = do imapM_ observeMissingFiles $ olength <$> missingCounts iforM_ missingCounts $ \refIdent missingFiles -> let missingRefs = unlines . map (views _Wrapped tshow) . Set.toList $ toNullable missingFiles newl :: Text newl = "\n" in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}#{newl}#{missingRefs}|] when (Map.null missingCounts) $ $logInfoS "MissingFiles" [st|No missing files|] trackedReferences = Map.fromList $ over (traverse . _1) nameToPathPiece [ (''CourseApplicationFile, E.from $ \appFile -> return $ appFile E.^. CourseApplicationFileContent ) , (''MaterialFile, E.from $ \matFile -> return $ matFile E.^. MaterialFileContent ) , (''CourseNewsFile, E.from $ \newsFile -> return $ newsFile E.^. CourseNewsFileContent ) , (''SheetFile, E.from $ \sheetFile -> return $ sheetFile E.^. SheetFileContent ) , (''PersonalisedSheetFile, E.from $ \personalisedSheetFile -> return $ personalisedSheetFile E.^. PersonalisedSheetFileContent ) , (''CourseAppInstructionFile, E.from $ \appInstr -> return $ appInstr E.^. CourseAppInstructionFileContent) , (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent ) , (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent ) , (''AllocationMatching, E.from $ \matching -> return . E.just $ matching E.^. AllocationMatchingLog) ] {-# NOINLINE pruneUnreferencedFilesIntervalsCache #-} pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)]) pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomicWithFinalizer act fin where act = hoist lift $ do now <- liftIO getCurrentTime interval <- getsYesod $ view _appPruneUnreferencedFilesInterval keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles let chunkHashBytes :: forall h. ( Unwrapped FileContentChunkReference ~ Digest h ) => Integer chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h)) (minBoundDgst, maxBoundDgst) <- currentIntervalCached pruneUnreferencedFilesIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) numIterations epoch iteration let chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool) chunkIdFilter cRef = E.and $ catMaybes [ minBoundDgst <&> \b -> cRef E.>=. E.val b , maxBoundDgst <&> \b -> cRef E.<. E.val b ] $logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst) E.insertSelectWithConflict (UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint") (E.from $ \fileContentChunk -> do E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkHash return $ E.any E.exists (fileReferences $ fileContentEntry E.^. FileContentEntryHash) E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now ) (\current excluded -> [ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ] ) E.delete . E.from $ \fileContentChunkUnreferenced -> do let unreferencedChunkHash = E.unKey $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. unreferencedChunkHash return $ E.any E.exists (fileReferences $ fileContentEntry E.^. FileContentEntryHash) E.where_ $ chunkIdFilter unreferencedChunkHash let getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do let unreferencedChunkHash = E.unKey $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. unreferencedChunkHash E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash E.where_ $ chunkIdFilter unreferencedChunkHash return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince E.groupBy $ fileContentEntry E.^. FileContentEntryHash E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ] return $ fileContentEntry E.^. FileContentEntryHash deleteEntry :: _ -> DB (Sum Natural) deleteEntry (E.Value fRef) = bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef] Sum deletedEntries <- runConduit $ getEntryCandidates .| takeWhileTime (interval / 3) .| C.mapM deleteEntry .| C.fold let getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do let unreferencedChunkHash = E.unKey $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now) E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. unreferencedChunkHash E.where_ $ chunkIdFilter unreferencedChunkHash return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash , E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent) ) deleteChunk :: _ -> DB (Sum Natural, Sum Word64) deleteChunk (E.Value cRef, E.Value size) = do deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ] (, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef] (Sum deletedChunks, Sum deletedChunkSize) <- runConduit $ getChunkCandidates .| takeWhileTime (interval / 3) .| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64) .| C.mapM deleteChunk .| C.fold return (deletedEntries, deletedChunks, deletedChunkSize) fin (deletedEntries, deletedChunks, deletedChunkSize) = do observeDeletedUnreferencedFiles deletedEntries $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|] observeDeletedUnreferencedChunks deletedChunks deletedChunkSize $logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{textBytes deletedChunkSize})|] dispatchJobInjectFiles :: JobHandler UniWorX dispatchJobInjectFiles = JobHandlerException . maybeT_ $ do uploadBucket <- getsYesod $ view _appUploadCacheBucket interval <- getsYesod $ view _appInjectFiles now <- liftIO getCurrentTime let extractInhibited :: IntervalMap UTCTime (Set FileContentReference) -> (Set FileContentReference, IntervalMap UTCTime (Set FileContentReference)) extractInhibited cState = (F.fold current, IntervalMap.union current upcoming) where (_, current, upcoming) = IntervalMap.splitIntersecting cState $ IntervalMap.OpenInterval (addUTCTime (-2) now) (addUTCTime 2 now) inhibited <- atomically . flip stateTVar extractInhibited =<< getsYesod appFileInjectInhibit let extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference extractReference _ = Nothing injectOrDelete :: (Minio.ObjectInfo, FileContentReference) -> Handler (Sum Natural, Sum Word64) injectOrDelete (objInfo, fRef) = do let obj = Minio.oiObject objInfo sz = fromIntegral $ Minio.oiSize objInfo fRef' <- runDB $ do logger <- askLoggerIO chunkVar <- newEmptyTMVarIO dbAsync <- allocateLinkedAsync $ do let report = go 0 0 Nothing =<< liftIO (getTime Monotonic) where go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString ByteString m () go c accsz lastReport startT = do currT <- liftIO $ getTime Monotonic chunk' <- await whenIsJust chunk' $ \chunk -> do let csz = fromIntegral $ olength chunk !c' = succ c !sz' = accsz + csz !lastReport' | toRational currT - toRational (fromMaybe startT lastReport) > 5 = Just currT | otherwise = lastReport when (csz > 0) $ do let p :: Centi p = realToFrac $ (toInteger sz' % toInteger sz) * 100 eta :: Maybe Integer eta = do accsz' <- assertM' (/= 0) accsz return . ceiling $ (toRational currT - toRational startT) / fromIntegral accsz' * (fromIntegral sz - fromIntegral accsz) when (lastReport' /= lastReport || sz' >= fromIntegral sz) $ flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes [ pure [st|Sinking chunk ##{tshow c}: #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|] , eta <&> \eta' -> [st| ETA #{textDuration eta'}|] , pure "..." ] yield chunk go c' sz' lastReport' startT atomically $ isEmptyTMVar chunkVar >>= guard . not sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) () .| persistentTokenBucketRateLimit' TokenBucketInjectFiles olength .| report didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar . Just) return True if | not didSend -> Nothing <$ cancel dbAsync | otherwise -> do atomically $ putTMVar chunkVar Nothing Just <$> waitAsync dbAsync let matchesFRef = is _Just $ assertM (== fRef) fRef' if | matchesFRef -> maybeT_ . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj | otherwise -> $logErrorS "InjectFiles" [st|Minio object “#{obj}”'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|] return . bool mempty (Sum 1, Sum sz) $ is _Just fRef' (Sum injectedFiles, Sum injectedSize) <- runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True) .| C.mapMaybe extractReference .| C.filter (views _2 (`Set.notMember` inhibited)) .| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFilesCount $ const 1) .| C.mapM (lift . injectOrDelete) .| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeInjectedFiles inj sz) .| C.fold $logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|] data RechunkFileException = RechunkFileExceptionHashMismatch { oldHash, newHash :: FileContentReference } deriving (Eq, Ord, Show, Generic, Typeable) deriving anyclass (Exception) dispatchJobRechunkFiles :: JobHandler UniWorX dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin where act = hoist lift $ do interval <- getsYesod $ view _appRechunkFiles let getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do E.on $ fileContentChunk E.^. FileContentChunkHash E.==. fileContentEntry' E.^. FileContentEntryChunkHash E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do E.on $ fileContentChunk E.^. FileContentChunkHash E.==. fileContentEntry' E.^. FileContentEntryChunkHash E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64)) return ( fileContentEntry E.^. FileContentEntryHash , size ) rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64) rechunkFile fRef sz = do fRef' <- sinkFileDB True $ sourceFileDB fRef unless (fRef == fRef') $ throwM $ RechunkFileExceptionHashMismatch fRef fRef' return (Sum 1, Sum sz) (Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $ getEntryCandidates .| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz) .| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64) .| C.mapM (uncurry rechunkFile) .| C.fold return (rechunkedFiles, rechunkedSize) fin (rechunkedFiles, rechunkedSize) = do observeRechunkedFiles rechunkedFiles rechunkedSize $logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{textBytes rechunkedSize} bytes)|]