diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 6c44b230d..0427b001d 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -47,6 +47,8 @@ import Control.Concurrent.STM.TVar (stateTVar) import qualified Data.Foldable as F +import qualified Control.Monad.State.Class as State + dispatchJobPruneSessionFiles :: JobHandler UniWorX dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin @@ -76,10 +78,11 @@ fileReferences (E.just -> fHash) E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash) ] -workflowFileReferences :: MonadResource m => ConduitT () FileContentReference (SqlPersistT m) () -workflowFileReferences = mconcat [ E.selectSource (E.from $ pure . (E.^. SharedWorkflowGraphGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) - , E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowState )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue) - ] +workflowFileReferences :: MonadResource m => Map Text (ConduitT () FileContentReference (SqlPersistT m) ()) +workflowFileReferences = Map.fromList $ over (traverse . _1) nameToPathPiece + [ (''SharedWorkflowGraph, E.selectSource (E.from $ pure . (E.^. SharedWorkflowGraphGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)) + , (''WorkflowWorkflow, E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowState )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)) + ] dispatchJobDetectMissingFiles :: JobHandler UniWorX @@ -89,15 +92,20 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin act = hoist lift $ do uploadBucket <- getsYesod $ view _appUploadCacheBucket - missingDb <- execWriterT $ do - tellM . forM trackedReferences $ \refQuery -> - fmap (Set.fromList . mapMaybe E.unValue) . E.select $ do - ref <- refQuery - E.where_ . E.not_ $ E.isNothing ref - E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> - E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref - E.distinctOnOrderBy [E.asc ref] $ return ref - tellM . fmap (Map.singleton "workflows") . runConduit $ workflowFileReferences .| C.foldMap Set.singleton + missingDb <- runConduit . execStateC Map.empty $ do + let insertRef refKind ref = State.modify' $ Map.alter (Just . Set.insert ref . fromMaybe Set.empty) refKind + + iforM_ trackedReferences $ \refKind refQuery -> do + let fileReferencesQuery = do + ref <- refQuery + E.where_ . E.not_ $ E.isNothing ref + E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry -> + E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref + E.distinctOnOrderBy [E.asc ref] $ return ref + transPipe lift (E.selectSource fileReferencesQuery) .| C.mapMaybe E.unValue .| C.mapM_ (insertRef refKind) + + iforM_ workflowFileReferences $ \refKind refSource -> + transPipe lift (refSource .| C.filterM (\ref -> not <$> exists [FileContentEntryHash ==. ref])) .| C.mapM_ (insertRef refKind) let allMissingDb :: Set Minio.Object allMissingDb = setOf (folded . folded . re minioFileReference) missingDb @@ -123,12 +131,13 @@ dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin fin :: Map Text (NonNull (Set FileContentReference)) -> Handler () fin missingCounts = do - forM_ (Map.keysSet trackedReferences) $ \refIdent -> - observeMissingFiles refIdent . maybe 0 olength $ missingCounts Map.!? refIdent + imapM_ observeMissingFiles $ olength <$> missingCounts iforM_ missingCounts $ \refIdent missingFiles - -> let missingRefs = unlines . map tshow . Set.toList $ toNullable missingFiles - in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}\n#{missingRefs}|] + -> let missingRefs = unlines . map (views _Wrapped tshow) . Set.toList $ toNullable missingFiles + newl :: Text + newl = "\n" + in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}#{newl}#{missingRefs}|] when (Map.null missingCounts) $ $logInfoS "MissingFiles" [st|No missing files|] @@ -201,7 +210,7 @@ dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtom return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash) chunkSize = 100 - in runConduit $ workflowFileReferences .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkWorkflowFiles + in runConduit $ sequence_ workflowFileReferences .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkWorkflowFiles let getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do