fix(avs): fix #7 by sequencing avs background jobs one after another
This commit is contained in:
parent
aaa2d679fd
commit
6dc3d8d059
@ -35,8 +35,8 @@ UserAvsCard
|
||||
deriving Generic
|
||||
|
||||
AvsSync
|
||||
personId AvsPersonId
|
||||
creationTime UTCTime
|
||||
pause Day Maybe
|
||||
UniqueAvsSyncId personId
|
||||
user UserId -- Note: we need to lookup UserAvs Entity anyway, so no benefit from storing AvsPersonId here
|
||||
creationTime UTCTime
|
||||
pause Day Maybe
|
||||
UniqueAvsSyncUser user
|
||||
deriving Generic
|
||||
@ -43,8 +43,7 @@ getAdminProblemsR = do
|
||||
now <- liftIO getCurrentTime
|
||||
let nowaday = utctDay now
|
||||
cutOffPrintDays = 7
|
||||
cutOffPrintJob = addLocalDays (-cutOffPrintDays) now
|
||||
cutOffAvsSynch = Just $ addUTCTime (-nominalHour) now -- update at most once per hour
|
||||
cutOffPrintJob = addLocalDays (-cutOffPrintDays) now
|
||||
|
||||
(usersAreReachable, driversHaveAvsIds, rDriversHaveFs, noStalePrintJobs) <- runDB $ (,,,)
|
||||
<$> areAllUsersReachable
|
||||
@ -57,7 +56,7 @@ getAdminProblemsR = do
|
||||
(Right AvsLicenceDifferences{..}) -> do
|
||||
let problemIds = avsLicenceDiffRevokeAll <> avsLicenceDiffGrantVorfeld <> avsLicenceDiffRevokeRollfeld <> avsLicenceDiffGrantRollfeld
|
||||
-- mapM_ (queueJob' . flip JobSynchroniseAvsId cutOffAvsSynch) problemIds
|
||||
runDBJobs . forM_ problemIds $ queueDBJob . flip JobSynchroniseAvsId cutOffAvsSynch
|
||||
runDBJobs . forM_ problemIds $ queueDBJob . flip JobSynchroniseAvsId (Just nowaday)
|
||||
return $ Right
|
||||
( Set.size avsLicenceDiffRevokeAll
|
||||
, Set.size avsLicenceDiffGrantVorfeld
|
||||
|
||||
@ -347,7 +347,7 @@ determineCrontab = execWriterT $ do
|
||||
{ jEpoch = fromInteger nextEpoch
|
||||
, jNumIterations = fromInteger numIntervals
|
||||
, jIteration = fromInteger nextInterval
|
||||
, jSynchAfter = Just $ toMidnight $ addDays (-7) nowaday -- at most once per week
|
||||
, jSynchAfter = Just $ addDays (-7) nowaday -- at most once per week
|
||||
})
|
||||
Cron
|
||||
{ cronInitial = CronTimestamp $ utcToLocalTimeTZ appTZ $ toTimeOfDay 22 0 0 $ utctDay nextIntervalTime
|
||||
|
||||
@ -1,16 +1,18 @@
|
||||
-- SPDX-FileCopyrightText: 2022-23 Gregor Kleen <gregor.kleen@ifi.lmu.de>, Steffen Jost <s.jost@fraport.de>
|
||||
-- SPDX-FileCopyrightText: 2022-23 Steffen Jost <s.jost@fraport.de>
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
module Jobs.Handler.SynchroniseAvs
|
||||
( dispatchJobSynchroniseAvs
|
||||
( dispatchJobSynchroniseAvs
|
||||
, dispatchJobSynchroniseAvsId
|
||||
, dispatchJobSynchroniseAvsUser
|
||||
, dispatchJobSynchroniseAvsUser
|
||||
, dispatchJobSynchroniseAvsNext
|
||||
, dispatchJobSynchroniseAvsQueue
|
||||
) where
|
||||
|
||||
import Import
|
||||
|
||||
import qualified Database.Esqueleto.Legacy as E
|
||||
import qualified Database.Esqueleto.Legacy as E hiding (upsert)
|
||||
import qualified Database.Esqueleto.PostgreSQL as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
@ -19,8 +21,9 @@ import Jobs.Queue
|
||||
|
||||
import Handler.Utils.Avs
|
||||
|
||||
dispatchJobSynchroniseAvs :: Natural -> Natural -> Natural -> Maybe UTCTime -> JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvs :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvs numIterations epoch iteration pause
|
||||
-- TODO: refactor so that the AvsIdLookup becomes obsolete
|
||||
= JobHandlerAtomic . runConduit $
|
||||
readUsers .| filterIteration .| sinkDBJobs
|
||||
where
|
||||
@ -33,52 +36,59 @@ dispatchJobSynchroniseAvs numIterations epoch iteration pause
|
||||
userIteration, currentIteration :: Integer
|
||||
userIteration = toInteger (hash epoch `hashWithSalt` userId) `mod` toInteger numIterations
|
||||
currentIteration = toInteger iteration `mod` toInteger numIterations
|
||||
$logDebugS "SynchronisAvs" [st|User ##{tshow (fromSqlKey userId)}: sync on #{tshow userIteration}/#{tshow numIterations}, now #{tshow currentIteration}|]
|
||||
-- $logDebugS "SynchronisAvs" [st|User ##{tshow (fromSqlKey userId)}: sync on #{tshow userIteration}/#{tshow numIterations}, now #{tshow currentIteration}|]
|
||||
guard $ userIteration == currentIteration
|
||||
|
||||
return $ JobSynchroniseAvsUser userId pause
|
||||
|
||||
workJobSynchroniseAvs :: Either AvsPersonId UserId -> Maybe Day -> JobHandler UniWorX
|
||||
workJobSynchroniseAvs eauid pause = JobHandlerAtomic $ do
|
||||
either (return . Just) getAId eauid >>= \case ->
|
||||
Nothing -> return ()
|
||||
Just avsid -> do
|
||||
now <- liftIO getCurrentTime
|
||||
E.upsert
|
||||
AvsSync { avsSyncPersonId = avsid
|
||||
, avsSyncCreationTime = now
|
||||
, avsSyncPause = pause
|
||||
}
|
||||
[ \oldSync -> AvsSyncPause E.=. E.least (E.val pause) (oldSync E.^. AvsSyncPause) ]
|
||||
where
|
||||
getAId uid = userAvsPersonId <<$>> getBy (UniqueUserAvsUser uid)
|
||||
-- dispatchJobSynchroniseAvs' :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX
|
||||
-- dispatchJobSynchroniseAvs' numIterations epoch iteration pause = JobHandlerAtomic $ do
|
||||
|
||||
|
||||
dispatchJobSynchroniseAvsId :: AvsPersonId -> Maybe Day -> JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsId = workJobSynchroniseAvs . Left
|
||||
dispatchJobSynchroniseAvsId apid pause = JobHandlerException $ do
|
||||
ok <- runDBJobs $
|
||||
getBy (UniqueUserAvsId apid) >>= \case
|
||||
(Just Entity{entityVal=UserAvs{userAvsUser=uid}}) -> do -- known user
|
||||
workJobSychronizeAvs uid pause
|
||||
return True
|
||||
Nothing -> -- unknown avsPersonId, attempt to create user
|
||||
return False
|
||||
-- flip (maybeM $ return False) (getBy $ UniqueUserAvsId apid) $ \Entity{entityVal=UserAvs{userAvsUser=uid}} -> do -- known user
|
||||
-- workJobSychronizeAvs uid pause
|
||||
-- return True
|
||||
unless ok $ void $ maybeCatchAll $ upsertAvsUserById apid
|
||||
|
||||
|
||||
dispatchJobSynchroniseAvsUser :: UserId -> Maybe Day -> JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsUser = workJobSynchroniseAvs . Right
|
||||
dispatchJobSynchroniseAvsUser uid pause = JobHandlerException $ runDBJobs $ workJobSychronizeAvs uid pause
|
||||
|
||||
dispatchJobSynchroniseAvsNext :: JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsNext = JobHandlerException $ do
|
||||
liftIO $ yield >> threadDelay 100000
|
||||
queueJob' JobSynchroniseAvs
|
||||
workJobSychronizeAvs :: UserId -> Maybe Day -> JobDB ()
|
||||
workJobSychronizeAvs uid pause = do
|
||||
now <- liftIO getCurrentTime
|
||||
void $ E.upsert
|
||||
AvsSync { avsSyncUser = uid
|
||||
, avsSyncCreationTime = now
|
||||
, avsSyncPause = pause
|
||||
}
|
||||
[ \oldSync -> (AvsSyncPause E.=. E.greatest (E.val pause) (oldSync E.^. AvsSyncPause)) oldSync ]
|
||||
queueDBJob JobSynchroniseAvsQueue
|
||||
|
||||
dispatchJobSynchroniseAvs :: JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvs = JobHandlerException $ do
|
||||
dispatchJobSynchroniseAvsQueue :: JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsQueue = JobHandlerException $ do
|
||||
syncJob <- runDB $
|
||||
selectFirst [] [Asc AvsSyncCreationTime] >>= \case
|
||||
Nothing -> return Nothing -- nothing more to do
|
||||
Just Entity{entityKey=asid, entityVal=AvsSync{..}} -> do
|
||||
res <- getBy (UniqueUserAvsId avsSyncPersonId) >>= \case
|
||||
Just Entity{entityKey=avsKey, entityVal=UserAvs{..} }
|
||||
| maybe True (utctDay userAvsLastSynch <) avsSyncPause -> return $ Just avsSyncPersonId
|
||||
delete asid
|
||||
getBy (UniqueUserAvsUser avsSyncUser) >>= \case
|
||||
Just uae@Entity{entityVal=UserAvs{userAvsLastSynch} }
|
||||
| maybe True (utctDay userAvsLastSynch <) avsSyncPause -> return $ Just uae
|
||||
_other -> return Nothing -- we just updated this one within the given limit or the entity does not exist
|
||||
queueDBJob JobSynchroniseAvsNext
|
||||
delete asid
|
||||
return res
|
||||
ifMaybeM syncJob () $ \avsSyncPersonId -> do
|
||||
catch (void $ upsertAvsUserById avsSyncPersonId) -- already updates UserAvsLastSynch
|
||||
|
||||
ifMaybeM syncJob () $ \Entity{entityKey=avsKey, entityVal=UserAvs{userAvsPersonId=apid}} -> do
|
||||
void $ queueJob JobSynchroniseAvsNext
|
||||
catch (void $ upsertAvsUserById apid) -- already updates UserAvsLastSynch
|
||||
(\exc -> do
|
||||
now <- liftIO getCurrentTime
|
||||
let excMsg = tshow exc <> " at " <> tshow now
|
||||
@ -87,4 +97,8 @@ dispatchJobSynchroniseAvs = JobHandlerException $ do
|
||||
AvsInterfaceUnavailable -> return () -- ignore and retry later
|
||||
AvsUserUnknownByAvs _ -> return () -- ignore for users no longer listed in AVS
|
||||
otherExc -> throwM otherExc
|
||||
)
|
||||
)
|
||||
|
||||
-- needed, since JobSynchroniseAvsQueue cannot requeue itself due to JobNoQueueSame (and having no parameters)
|
||||
dispatchJobSynchroniseAvsNext :: JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsNext = JobHandlerException $ void $ queueJob JobSynchroniseAvsQueue
|
||||
|
||||
@ -93,14 +93,16 @@ data Job
|
||||
| JobSynchroniseAvs { jNumIterations
|
||||
, jEpoch
|
||||
, jIteration :: Natural
|
||||
, jSynchAfter :: Maybe UTCTime
|
||||
, jSynchAfter :: Maybe Day
|
||||
}
|
||||
| JobSynchroniseAvsUser { jUser :: UserId
|
||||
, jSynchAfter :: Maybe UTCTime
|
||||
}
|
||||
| JobSynchroniseAvsId { jAvsId:: AvsPersonId
|
||||
, jSynchAfter :: Maybe UTCTime
|
||||
, jSynchAfter :: Maybe Day
|
||||
}
|
||||
| JobSynchroniseAvsId { jAvsId :: AvsPersonId
|
||||
, jSynchAfter :: Maybe Day
|
||||
}
|
||||
| JobSynchroniseAvsQueue
|
||||
| JobSynchroniseAvsNext
|
||||
| JobChangeUserDisplayEmail { jUser :: UserId
|
||||
, jDisplayEmail :: UserEmail
|
||||
}
|
||||
@ -337,9 +339,13 @@ jobNoQueueSame = \case
|
||||
JobTruncateTransactionLog{} -> Just JobNoQueueSame
|
||||
JobPruneInvitations{} -> Just JobNoQueueSame
|
||||
JobDeleteTransactionLogIPs{} -> Just JobNoQueueSame
|
||||
JobSynchroniseLdap{} -> Just JobNoQueueSame
|
||||
JobSynchroniseLdapUser{} -> Just JobNoQueueSame
|
||||
JobSynchroniseAvsUser{} -> Just JobNoQueueSameTag
|
||||
JobSynchroniseAvs{} -> Just JobNoQueueSame
|
||||
JobSynchroniseAvsUser{} -> Just JobNoQueueSame
|
||||
JobSynchroniseAvsId{} -> Just JobNoQueueSame
|
||||
JobSynchroniseAvsQueue{} -> Just JobNoQueueSame
|
||||
JobSynchroniseAvsNext{} -> Just JobNoQueueSame
|
||||
JobChangeUserDisplayEmail{} -> Just JobNoQueueSame
|
||||
JobPruneSessionFiles{} -> Just JobNoQueueSameTag
|
||||
JobPruneUnreferencedFiles{} -> Just JobNoQueueSameTag
|
||||
|
||||
Loading…
Reference in New Issue
Block a user