feat: ensure cached study feature relevance is up to date

This commit is contained in:
Gregor Kleen 2021-02-08 19:53:00 +01:00
parent 5698e9ca0b
commit 8798f547a6
14 changed files with 196 additions and 61 deletions

View File

@ -63,6 +63,9 @@ health-check-matching-cluster-config-timeout: "_env:HEALTHCHECK_MATCHING_CLUSTER
synchronise-ldap-users-within: "_env:SYNCHRONISE_LDAP_WITHIN:1209600"
synchronise-ldap-users-interval: "_env:SYNCHRONISE_LDAP_INTERVAL:3600"
study-features-recache-relevance-within: 172800
study-features-recache-relevance-interval: 293
log-settings:
detailed: "_env:DETAILED_LOGGING:false"
all: "_env:LOG_ALL:false"

View File

@ -8,7 +8,7 @@ StudyFeatures -- multiple entries possible for students pursuing several degree
firstObserved UTCTime Maybe
lastObserved UTCTime default=now() -- last update from LDAP
valid Bool default=true
relevanceCached Bool default=false
relevanceCached UUID Maybe
UniqueStudyFeatures user degree field type semester
deriving Eq Show
-- UniqueUserSubject ubuser degree field -- There exists a counterexample

View File

@ -352,7 +352,7 @@ upsertCampusUser upsertMode ldapData = do
, studyFeaturesFirstObserved = Just now
, studyFeaturesLastObserved = now
, studyFeaturesValid = True
, studyFeaturesRelevanceCached = False
, studyFeaturesRelevanceCached = Nothing
}
(sf :) <$> assimilateSubTerms subterms unusedFeats
Nothing

View File

@ -30,6 +30,8 @@ import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.PostgreSQL as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.Conduit.Combinators as C
data UserTableStudyFeature = UserTableStudyFeature
{ userTableField
@ -123,7 +125,7 @@ isRelevantStudyFeatureCached termField record studyFeatures
E.where_ $ relevantStudyFeatures E.^. RelevantStudyFeaturesTerm E.==. record E.^. termField
E.&&. relevantStudyFeatures E.^. RelevantStudyFeaturesStudyFeatures E.==. studyFeatures E.^. StudyFeaturesId
cacheStudyFeatureRelevance :: MonadIO m
cacheStudyFeatureRelevance :: MonadResource m
=> (E.SqlExpr (Entity StudyFeatures) -> E.SqlExpr (E.Value Bool))
-> SqlPersistT m ()
cacheStudyFeatureRelevance fFilter = do
@ -135,9 +137,15 @@ cacheStudyFeatureRelevance fFilter = do
return $ RelevantStudyFeatures E.<# (term E.^. TermId) E.<&> (studyFeatures E.^. StudyFeaturesId)
)
( \_current _excluded -> [] )
E.update $ \studyFeatures -> do
E.set studyFeatures [ StudyFeaturesRelevanceCached E.=. E.true ]
E.where_ $ fFilter studyFeatures
let getStudyFeatures = E.selectSource . E.from $ \studyFeatures -> do
E.where_ $ fFilter studyFeatures
E.where_ . E.isNothing $ studyFeatures E.^. StudyFeaturesRelevanceCached
return $ studyFeatures E.^. StudyFeaturesId
migrateStudyFeatures genUUID lift' (E.Value sfId) = do
uuid <- genUUID
lift' $ update sfId [ StudyFeaturesRelevanceCached =. Just uuid ]
in runConduit $ getStudyFeatures .| randUUIDC (\genUUID lift' -> C.mapM_ $ migrateStudyFeatures genUUID lift')
isCourseStudyFeature :: E.SqlExpr (Entity Course) -> E.SqlExpr (Entity StudyFeatures) -> E.SqlExpr (E.Value Bool)
isCourseStudyFeature = isRelevantStudyFeatureCached CourseTerm

View File

@ -43,7 +43,7 @@ pStudyFeatures studyFeaturesUser now = do
studyFeaturesSuperField = Nothing
studyFeaturesFirstObserved = Just now
studyFeaturesLastObserved = now
studyFeaturesRelevanceCached = False
studyFeaturesRelevanceCached = Nothing
return StudyFeatures{..}
pStudyFeature `sepBy1` char '#'

View File

@ -758,7 +758,7 @@ assimilateUser newUserId oldUserId = mapReaderT execWriterT $ do
, StudyFeaturesFirstObserved =. (min `on` studyFeaturesFirstObserved) oldStudyFeatures newStudyFeatures
, StudyFeaturesLastObserved =. (max `on` studyFeaturesLastObserved) oldStudyFeatures newStudyFeatures
, StudyFeaturesValid =. ((||) `on` studyFeaturesValid) oldStudyFeatures newStudyFeatures
, StudyFeaturesRelevanceCached =. ((||) `on` studyFeaturesRelevanceCached) oldStudyFeatures newStudyFeatures
, StudyFeaturesRelevanceCached =. ((<|>) `on` studyFeaturesRelevanceCached) oldStudyFeatures newStudyFeatures
]
E.insertSelectWithConflict
UniqueRelevantStudyFeatures

View File

@ -216,6 +216,23 @@ determineCrontab = execWriterT $ do
, cronNotAfter = Left within
}
whenIsJust ((,) <$> appStudyFeaturesRecacheRelevanceWithin <*> appJobCronInterval) $ \(within, cInterval) -> do
nextIntervals <- getNextIntervals within appStudyFeaturesRecacheRelevanceInterval cInterval
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
tell $ HashMap.singleton
(JobCtlQueue JobStudyFeaturesRecacheRelevance
{ jEpoch = fromInteger nextEpoch
, jNumIterations = fromInteger numIntervals
, jIteration = fromInteger nextInterval
}
)
Cron
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ nextIntervalTime
, cronRepeat = CronRepeatNever
, cronRateLimit = appStudyFeaturesRecacheRelevanceInterval
, cronNotAfter = Left within
}
let
sheetJobs (Entity nSheet Sheet{..}) = do
for_ (max <$> sheetVisibleFrom <*> sheetActiveFrom) $ \aFrom ->
@ -484,7 +501,7 @@ determineCrontab = execWriterT $ do
, cronNotAfter = maybe (Right CronNotScheduled) (Right . CronTimestamp . utcToLocalTimeTZ appTZ) $ nBot =<< minimumOf (folded . _entityVal . _allocationStaffAllocationTo . to NTop . filtered (> NTop (Just registerTo))) allocs
}
hasRelevanceUncached <- lift $ exists [StudyFeaturesRelevanceCached !=. True]
hasRelevanceUncached <- lift $ exists [StudyFeaturesRelevanceCached ==. Nothing]
when hasRelevanceUncached . tell $ HashMap.singleton
(JobCtlQueue JobStudyFeaturesCacheRelevance)
Cron

View File

@ -23,14 +23,8 @@ import qualified Network.Minio as Minio
import Crypto.Hash (hashDigestSize, digestFromByteString)
import Data.List ((!!), unfoldr, maximumBy, init, genericLength)
import qualified Data.ByteString as ByteString
import Data.Bits (Bits(shiftR))
import qualified Data.Map.Strict as Map
import Control.Monad.Random.Lazy (evalRand, mkStdGen)
import System.Random.Shuffle (shuffleM)
import System.IO.Unsafe
import Handler.Utils.Files (sourceFileDB)
@ -44,6 +38,8 @@ import qualified Data.Sequence as Seq
import Jobs.Queue (YesodJobDB)
import Jobs.Handler.Intervals.Utils
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
@ -159,52 +155,10 @@ dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtom
( Unwrapped FileContentChunkReference ~ Digest h )
=> Integer
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
chunkHashBits = chunkHashBytes * 8
base :: Integer
base = 2 ^ chunkHashBits
intervals :: [Integer]
-- | Exclusive upper bounds
intervals
| numIterations <= 0 = pure base
| otherwise = go protoIntervals ^.. folded . _1
where
go [] = []
go ints
| maximumOf (folded . _1) ints == Just base = ints
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
where
closest = maximumBy (comparing $ view _2) ints
(lts, geqs) = partition (((>) `on` view _1) closest) ints
gts = filter (((<) `on` view _1) closest) geqs
-- | Exclusive upper bounds
protoIntervals :: [(Integer, Integer)]
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
| i <- [1 .. toInteger numIterations]
]
intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals
toDigest :: Integer -> Maybe FileContentChunkReference
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
where step i
| i <= 0 = Nothing
| otherwise = Just (fromIntegral i, i `shiftR` 8)
pad bs
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
| otherwise = pad $ ByteString.cons 0 bs
intervalsDgsts <- atomically $ do
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
case Map.lookup numIterations cachedDgsts of
Just c -> return c
Nothing -> do
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
return intervalsDgsts'
(minBoundDgst, maxBoundDgst) <- currentIntervalCached pruneUnreferencedFilesIntervalsCache chunkHashBytes (fmap (review _Wrapped) . digestFromByteString) numIterations epoch iteration
let
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
chunkIdFilter cRef = E.and $ catMaybes
[ minBoundDgst <&> \b -> cRef E.>=. E.val b

View File

@ -0,0 +1,77 @@
module Jobs.Handler.Intervals.Utils
( mkIntervals, mkIntervalsCached
, getCurrentInterval
, currentIntervalCached
) where
import Import hiding (init, maximumBy, cached)
import Control.Monad.Random.Lazy (evalRand, mkStdGen)
import System.Random.Shuffle (shuffleM)
import Data.List ((!!), unfoldr, maximumBy, init, genericLength)
import qualified Data.ByteString as ByteString
import Data.Bits (Bits(shiftR))
import qualified Data.Map.Strict as Map
mkIntervals :: forall a. Integer -> (ByteString -> Maybe a) -> Natural -> [(Maybe a, Maybe a)]
mkIntervals bytes fromBS numIterations = zip (Nothing : init intervals') intervals'
where
bits = bytes * 8
base :: Integer
base = 2 ^ bits
-- | Exclusive upper bounds
intervals
| numIterations <= 0 = pure base
| otherwise = go protoIntervals ^.. folded . _1
where
go [] = []
go ints
| maximumOf (folded . _1) ints == Just base = ints
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
where
closest = maximumBy (comparing $ view _2) ints
(lts, geqs) = partition (((>) `on` view _1) closest) ints
gts = filter (((<) `on` view _1) closest) geqs
-- | Exclusive upper bounds
protoIntervals :: [(Integer, Integer)]
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
| i <- [1 .. toInteger numIterations]
]
intervals' = map (fromBS' <=< assertM' (> 0)) intervals
fromBS' :: Integer -> Maybe a
fromBS' = fromBS . pad . ByteString.pack . reverse . unfoldr step
where step i
| i <= 0 || i >= base = Nothing
| otherwise = Just (fromIntegral i, i `shiftR` 8)
pad bs
| toInteger (ByteString.length bs) >= bytes = bs
| otherwise = pad $ ByteString.cons 0 bs
getCurrentInterval :: forall a. Natural -> Natural -> [a] -> a
getCurrentInterval epoch iteration intervals = permIntervals !! fromIntegral (toInteger iteration `mod` genericLength permIntervals)
where permIntervals = shuffleM intervals `evalRand` mkStdGen (hash epoch)
mkIntervalsCached :: forall m a. (NFData a, MonadIO m)
=> TVar (Map Natural [(Maybe a, Maybe a)])
-> Integer -> (ByteString -> Maybe a) -> Natural -> m [(Maybe a, Maybe a)]
mkIntervalsCached cacheTVar bytes fromBS numIterations = atomically $ do
cached <- readTVar cacheTVar
case Map.lookup numIterations cached of
Just c -> return c
Nothing -> do
modifyTVar' cacheTVar $ force . Map.insert numIterations intervals'
return intervals'
where intervals' = mkIntervals bytes fromBS numIterations
currentIntervalCached :: forall m a. (NFData a, MonadIO m)
=> TVar (Map Natural [(Maybe a, Maybe a)])
-> Integer -> (ByteString -> Maybe a)
-> Natural -> Natural -> Natural -> m (Maybe a, Maybe a)
currentIntervalCached cacheTVar bytes fromBS numIterations epoch iteration
= getCurrentInterval epoch iteration <$> mkIntervalsCached cacheTVar bytes fromBS numIterations

View File

@ -1,5 +1,6 @@
module Jobs.Handler.StudyFeatures
( dispatchJobStudyFeaturesCacheRelevance
, dispatchJobStudyFeaturesRecacheRelevance
) where
import Import
@ -7,8 +8,38 @@ import Import
import Handler.Utils.StudyFeatures
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.Map.Strict as Map
import System.IO.Unsafe
import Jobs.Handler.Intervals.Utils
import qualified Data.UUID as UUID
dispatchJobStudyFeaturesCacheRelevance :: JobHandler UniWorX
dispatchJobStudyFeaturesCacheRelevance = JobHandlerAtomic $
cacheStudyFeatureRelevance $ \studyFeatures -> studyFeatures E.^. StudyFeaturesRelevanceCached E.!=. E.val True
cacheStudyFeatureRelevance $ \studyFeatures -> E.isNothing $ studyFeatures E.^. StudyFeaturesRelevanceCached
{-# NOINLINE studyFeaturesRecacheRelevanceIntervalsCache #-}
studyFeaturesRecacheRelevanceIntervalsCache :: TVar (Map Natural [(Maybe UUID, Maybe UUID)])
studyFeaturesRecacheRelevanceIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
dispatchJobStudyFeaturesRecacheRelevance :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobStudyFeaturesRecacheRelevance numIterations epoch iteration = JobHandlerAtomic $ do
(minBoundUUID, maxBoundUUID) <- currentIntervalCached studyFeaturesRecacheRelevanceIntervalsCache 16 (UUID.fromByteString . fromStrict) numIterations epoch iteration
let
uuidFilter :: E.SqlExpr (E.Value (Maybe UUID)) -> E.SqlExpr (E.Value Bool)
uuidFilter cRef = E.and $ catMaybes
[ pure $ E.isJust cRef
, minBoundUUID <&> \b -> cRef E.>=. E.justVal b
, maxBoundUUID <&> \b -> cRef E.<. E.justVal b
]
$logDebugS "StudyFeaturesRecacheRelevance" . tshow $ (minBoundUUID, maxBoundUUID)
cacheStudyFeatureRelevance $ \studyFeatures -> uuidFilter $ studyFeatures E.^. StudyFeaturesRelevanceCached

View File

@ -98,6 +98,10 @@ data Job
| JobDetectMissingFiles
| JobPruneOldSentMails
| JobStudyFeaturesCacheRelevance
| JobStudyFeaturesRecacheRelevance { jNumIterations
, jEpoch
, jIteration :: Natural
}
deriving (Eq, Ord, Show, Read, Generic, Typeable)
data Notification
= NotificationSubmissionRated { nSubmission :: SubmissionId }

View File

@ -100,6 +100,7 @@ data ManualMigration
| Migration20201119RoomTypes
| Migration20210115ExamPartsFrom
| Migration20210201SharedWorkflowGraphs
| Migration20210208StudyFeaturesRelevanceCachedUUIDs
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
deriving anyclass (Universe, Finite)
@ -138,7 +139,7 @@ migrateManual = do
, ("file_content_entry_chunk_hash", "CREATE INDEX file_content_entry_chunk_hash ON \"file_content_entry\" (chunk_hash)" )
, ("sent_mail_bounce_secret", "CREATE INDEX sent_mail_bounce_secret ON \"sent_mail\" (bounce_secret) WHERE bounce_secret IS NOT NULL")
, ("sent_mail_recipient", "CREATE INDEX sent_mail_recipient ON \"sent_mail\" (recipient) WHERE recipient IS NOT NULL")
, ("study_features_relevance_cached", "CREATE INDEX study_features_relevance_cached ON \"study_features\" (relevance_cached) WHERE (relevance_cached <> true)")
, ("study_features_relevance_cached", "CREATE INDEX study_features_relevance_cached ON \"study_features\" (relevance_cached)")
]
where
addIndex :: Text -> Sql -> Migration
@ -1021,6 +1022,24 @@ customMigrations = mapF $ \case
ALTER TABLE "workflow_workflow" RENAME COLUMN "graph_id" TO "graph";
|]
Migration20210208StudyFeaturesRelevanceCachedUUIDs ->
whenM (tableExists "study_features") $ do
[executeQQ|
ALTER TABLE "study_features" ADD COLUMN "relevance_cached_uuid" uuid
|]
let getStudyFeatures = [queryQQ|SELECT "id" FROM "study_features" WHERE relevance_cached|]
migrateStudyFeatures genUUID lift' [ fromPersistValue -> Right (sfId :: StudyFeaturesId) ] = do
uuid <- genUUID
lift' [executeQQ|UPDATE "study_features" SET "relevance_cached_uuid" = #{uuid} WHERE "id" = #{sfId}|]
migrateStudyFeatures _ _ _ = return ()
in runConduit $ getStudyFeatures .| randUUIDC (\genUUID lift' -> C.mapM_ $ migrateStudyFeatures genUUID lift')
[executeQQ|
ALTER TABLE "study_features" DROP COLUMN "relevance_cached";
ALTER TABLE "study_features" RENAME COLUMN "relevance_cached_uuid" TO "relevance_cached";
|]
tableExists :: MonadIO m => Text -> ReaderT SqlBackend m Bool
tableExists table = do

View File

@ -208,6 +208,9 @@ data AppSettings = AppSettings
, appJobMode :: JobMode
, appStudyFeaturesRecacheRelevanceWithin :: Maybe NominalDiffTime
, appStudyFeaturesRecacheRelevanceInterval :: NominalDiffTime
, appMemcacheAuth :: Bool
} deriving Show
@ -540,7 +543,7 @@ instance FromJSON AppSettings where
appFileChunkingHashWindow <- o .: "file-chunking-hash-window"
appFileChunkingParams <- maybe (fail "Could not recommend FastCDCParameters") return $ recommendFastCDCParameters appFileChunkingTargetExponent appFileChunkingHashWindow
appPruneUnreferencedFilesWithin <- o .: "prune-unreferenced-files-within"
appPruneUnreferencedFilesWithin <- o .:? "prune-unreferenced-files-within"
appPruneUnreferencedFilesInterval <- o .: "prune-unreferenced-files-interval"
appMaximumContentLength <- o .: "maximum-content-length"
@ -610,6 +613,9 @@ instance FromJSON AppSettings where
appJobMode <- o .:? "job-mode" .!= JobsLocal True
appStudyFeaturesRecacheRelevanceWithin <- o .:? "study-features-recache-relevance-within"
appStudyFeaturesRecacheRelevanceInterval <- o .: "study-features-recache-relevance-interval"
return AppSettings{..}
makeClassy_ ''AppSettings

View File

@ -71,6 +71,7 @@ import Control.Monad.Catch
import Control.Monad.Morph (hoist)
import Control.Monad.Fail
import Control.Monad.Trans.Cont (ContT, evalContT, callCC)
import qualified Control.Monad.State.Class as State
import Language.Haskell.TH
import Language.Haskell.TH.Instances ()
@ -98,6 +99,7 @@ import qualified Crypto.MAC.KMAC as KMAC
import qualified Crypto.Hash as Crypto
import Crypto.Hash (HashAlgorithm, Digest)
import Crypto.Hash.Instances ()
import qualified Crypto.Random as Crypto
import Data.ByteArray (ByteArrayAccess)
@ -140,6 +142,9 @@ import Text.Hamlet (Translate)
import Data.Ratio ((%))
import Data.UUID (UUID)
import qualified Data.UUID as UUID
{-# ANN module ("HLint: ignore Use asum" :: String) #-}
@ -1405,6 +1410,17 @@ uniforms :: (RandomGen g, MonadSplit g m, Foldable t) => t a -> m [a]
uniforms xs = LazyRand.evalRand go <$> getSplit
where go = (:) <$> interleave (uniform xs) <*> go
randUUIDC :: MonadIO m
=> (forall m'. Monad m' => m' UUID -> (forall a. m a -> m' a) -> ConduitT i o m' r)
-> ConduitT i o m r
randUUIDC cont = do
drg <- liftIO Crypto.drgNew
let
mkUUID = do
uuidBS <- State.state $ Crypto.randomBytesGenerate 16
return . fromMaybe (error $ "Could not convert bytestring to uuid: " <> show uuidBS) . UUID.fromByteString $ fromStrict uuidBS
evalStateC drg $ cont mkUUID lift
----------
-- Lens --
----------