refactor(jobs): SynchroniseUserdb -> SynchroniseUsers

This commit is contained in:
Sarah Vaupel 2024-02-20 00:05:56 +01:00
parent b8e7ee2b3d
commit 115452035d
4 changed files with 75 additions and 78 deletions

View File

@ -317,7 +317,7 @@ determineCrontab = execWriterT $ do
forM_ nextIntervals $ \(nextEpoch, nextInterval, nextIntervalTime, numIntervals) -> do
tell $ HashMap.singleton
(JobCtlQueue JobSynchroniseUserdb
(JobCtlQueue JobSynchroniseUsers
{ jEpoch = fromInteger nextEpoch
, jNumIterations = fromInteger numIntervals
, jIteration = fromInteger nextInterval

View File

@ -0,0 +1,69 @@
-- SPDX-FileCopyrightText: 2022-2024 Sarah Vaupel <sarah.vaupel@uniworx.de>, Gregor Kleen <gregor.kleen@ifi.lmu.de>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
module Jobs.Handler.SynchroniseUser
( dispatchJobSynchroniseUsers, dispatchJobSynchroniseUser
, SynchroniseUserException(..)
) where
import Import
import qualified Data.Conduit.List as C
import Auth.LDAP
import Auth.OAuth2
import Foundation.Yesod.Auth (UserConversionException, upsertUser)
import Jobs.Queue
data SynchroniseUserException
= SynchroniseUserNoSource
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic)
instance Exception SynchroniseUserException
dispatchJobSynchroniseUsers :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobSynchroniseUsers numIterations epoch iteration
= JobHandlerAtomic . runConduit $
readUsers .| filterIteration .| sinkDBJobs
where
readUsers :: ConduitT () UserId (YesodJobDB UniWorX) ()
readUsers = selectKeys [] []
filterIteration :: ConduitT UserId Job (YesodJobDB UniWorX) ()
filterIteration = C.mapMaybeM $ \userId -> runMaybeT $ do
let
userIteration, currentIteration :: Integer
userIteration = toInteger (hash epoch `hashWithSalt` userId) `mod` toInteger numIterations
currentIteration = toInteger iteration `mod` toInteger numIterations
$logDebugS "SynchroniseUsers" [st|User ##{tshow (fromSqlKey userId)}: sync on #{tshow userIteration}/#{tshow numIterations}, now #{tshow currentIteration}|]
guard $ userIteration == currentIteration
return $ JobSynchroniseUser userId
dispatchJobSynchroniseUser :: UserId -> JobHandler UniWorX
dispatchJobSynchroniseUser jUser = JobHandlerException $ do
userSourceConf <- getsYesod $ view _appUserAuthConf
case userSourceConf of
UserAuthConfSingleSource (AuthSourceConfLdap _ldapConf) ->
runDB . void . runMaybeT . handleExc $ do
ldapPool@(upsertUserLdapConf,_) <- MaybeT . getsYesod $ view _appLdapPool
user@User{userIdent = upsertUserIdent} <- MaybeT $ get jUser
$logInfoS "SynchroniseUser" [st|Synchronising #{upsertUserIdent} with LDAP|]
-- reTestAfter <- getsYesod $ view _appUserdbRetestFailover
-- ldapAttrs <- MaybeT $ campusUserReTest' ldapConf ((>= reTestAfter) . realToFrac) FailoverUnlimited user
upsertUserLdapData <- MaybeT $ ldapUser' ldapPool user
void . lift $ upsertUser UpsertUserSync{..} UpsertUserDataLdap{..}
UserAuthConfSingleSource (AuthSourceConfAzureAdV2 upsertUserAzureConf) ->
runDB . void . runMaybeT . handleExc $ do
user@User{userIdent = upsertUserIdent} <- MaybeT $ get jUser
$logInfoS "SynchroniseUser" [st|Synchronising #{upsertUserIdent} with Azure|]
upsertUserAzureData <- MaybeT $ azureUser' upsertUserAzureConf user
void . lift $ upsertUser UpsertUserSync{..} UpsertUserDataAzure{..}
where
handleExc :: MaybeT DB a -> MaybeT DB a
handleExc
= catchMPlus (Proxy @AzureUserException)
. catchMPlus (Proxy @LdapUserException)
. catchMPlus (Proxy @UserConversionException)

View File

@ -1,72 +0,0 @@
-- SPDX-FileCopyrightText: 2022-2024 Sarah Vaupel <sarah.vaupel@uniworx.de>, Gregor Kleen <gregor.kleen@ifi.lmu.de>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
module Jobs.Handler.SynchroniseUserdb
( dispatchJobSynchroniseUserdb, dispatchJobSynchroniseUserdbUser
, SynchroniseUserdbException(..)
) where
import Import
import qualified Data.CaseInsensitive as CI
import qualified Data.Conduit.List as C
import qualified Data.UUID as UUID
import Auth.LDAP
import Auth.OAuth2
import Foundation.Yesod.Auth (CampusUserConversionException, upsertLdapUser, upsertAzureUser)
import Jobs.Queue
data SynchroniseUserdbException
= SynchroniseUserdbNoUserdb
deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic)
instance Exception SynchroniseUserdbException
dispatchJobSynchroniseUserdb :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobSynchroniseUserdb numIterations epoch iteration
= JobHandlerAtomic . runConduit $
readUsers .| filterIteration .| sinkDBJobs
where
readUsers :: ConduitT () UserId (YesodJobDB UniWorX) ()
readUsers = selectKeys [] []
filterIteration :: ConduitT UserId Job (YesodJobDB UniWorX) ()
filterIteration = C.mapMaybeM $ \userId -> runMaybeT $ do
let
userIteration, currentIteration :: Integer
userIteration = toInteger (hash epoch `hashWithSalt` userId) `mod` toInteger numIterations
currentIteration = toInteger iteration `mod` toInteger numIterations
$logDebugS "SynchroniseUserdb" [st|User ##{tshow (fromSqlKey userId)}: sync on #{tshow userIteration}/#{tshow numIterations}, now #{tshow currentIteration}|]
guard $ userIteration == currentIteration
return $ JobSynchroniseUserdbUser userId
dispatchJobSynchroniseUserdbUser :: UserId -> JobHandler UniWorX
dispatchJobSynchroniseUserdbUser jUser = JobHandlerException $ do
userSourceConf <- getsYesod $ view _appUserSourceConf
case userSourceConf of
UserSourceConfSingleSource (UserSourceLdap _ldapConf) ->
runDB . void . runMaybeT . handleExc $ do
ldapPool <- MaybeT . getsYesod $ view _appLdapPool
user@User{userIdent,userLdapPrimaryKey} <- MaybeT $ get jUser
let upsertIdent = maybe userIdent CI.mk userLdapPrimaryKey
$logInfoS "SynchroniseUserdb" [st|Synchronising #{upsertIdent} with LDAP|]
-- reTestAfter <- getsYesod $ view _appUserdbRetestFailover
-- ldapAttrs <- MaybeT $ campusUserReTest' ldapConf ((>= reTestAfter) . realToFrac) FailoverUnlimited user
ldapAttrs <- MaybeT $ ldapUser' ldapPool user
void . lift $ upsertLdapUser (UpsertUserSync upsertIdent) ldapAttrs
UserSourceConfSingleSource (UserSourceAzureAdV2 azureConf) ->
runDB . void . runMaybeT . handleExc $ do
user@User{userIdent,userAzurePrimaryKey} <- MaybeT $ get jUser
let upsertIdent = maybe userIdent (CI.mk . UUID.toText) userAzurePrimaryKey -- TODO: use userPrincipalName
$logInfoS "SynchroniseUserdb" [st|Synchronising #{upsertIdent} with Azure|]
azureAttrs <- MaybeT $ azureUser' azureConf user
void . lift $ upsertAzureUser (UpsertUserSync upsertIdent) azureAttrs
where
handleExc :: MaybeT DB a -> MaybeT DB a
handleExc
= catchMPlus (Proxy @CampusUserException)
. catchMPlus (Proxy @CampusUserConversionException)

View File

@ -1,4 +1,4 @@
-- SPDX-FileCopyrightText: 2022-23 Gregor Kleen <gregor.kleen@ifi.lmu.de>,Sarah Vaupel <sarah.vaupel@ifi.lmu.de>,Steffen Jost <s.jost@fraport.de>
-- SPDX-FileCopyrightText: 2022-2024 Sarah Vaupel <sarah.vaupel@uniworx.de>, Gregor Kleen <gregor.kleen@ifi.lmu.de>,Sarah Vaupel <sarah.vaupel@ifi.lmu.de>,Steffen Jost <s.jost@fraport.de>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
@ -92,11 +92,11 @@ data Job
| JobTruncateTransactionLog
| JobPruneInvitations
| JobDeleteTransactionLogIPs
| JobSynchroniseUserdb { jNumIterations
| JobSynchroniseUsers { jNumIterations
, jEpoch
, jIteration :: Natural
}
| JobSynchroniseUserdbUser { jUser :: UserId }
| JobSynchroniseUser { jUser :: UserId }
| JobSynchroniseAvs { jNumIterations
, jEpoch
, jIteration :: Natural
@ -348,8 +348,8 @@ jobNoQueueSame = \case
JobTruncateTransactionLog{} -> Just JobNoQueueSame
JobPruneInvitations{} -> Just JobNoQueueSame
JobDeleteTransactionLogIPs{} -> Just JobNoQueueSame
JobSynchroniseUserdb{} -> Just JobNoQueueSame
JobSynchroniseUserdbUser{} -> Just JobNoQueueSame
JobSynchroniseUsers{} -> Just JobNoQueueSame
JobSynchroniseUser{} -> Just JobNoQueueSame
JobSynchroniseAvs{} -> Just JobNoQueueSame
JobSynchroniseAvsUser{} -> Just JobNoQueueSame
JobSynchroniseAvsId{} -> Just JobNoQueueSame