107 lines
4.9 KiB
Haskell
107 lines
4.9 KiB
Haskell
-- SPDX-FileCopyrightText: 2022-23 Steffen Jost <s.jost@fraport.de>
|
|
--
|
|
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
module Jobs.Handler.SynchroniseAvs
|
|
( dispatchJobSynchroniseAvs
|
|
, dispatchJobSynchroniseAvsId
|
|
, dispatchJobSynchroniseAvsUser
|
|
, dispatchJobSynchroniseAvsNext
|
|
, dispatchJobSynchroniseAvsQueue
|
|
) where
|
|
|
|
import Import
|
|
|
|
-- import qualified Database.Esqueleto.Legacy as E hiding (upsert)
|
|
-- import qualified Database.Esqueleto.PostgreSQL as E
|
|
-- import qualified Database.Esqueleto.Utils as E
|
|
|
|
import qualified Data.Conduit.List as C
|
|
import Jobs.Queue
|
|
|
|
import Handler.Utils.Avs
|
|
|
|
dispatchJobSynchroniseAvs :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX
|
|
dispatchJobSynchroniseAvs numIterations epoch iteration pause
|
|
= JobHandlerException . runDB $ do
|
|
now <- liftIO getCurrentTime
|
|
todos <- runConduit $ readUsers .| filterIteration now .| sinkList
|
|
putMany todos
|
|
where
|
|
readUsers :: ConduitT () UserId _ ()
|
|
readUsers = selectKeys [] []
|
|
|
|
filterIteration :: UTCTime -> ConduitT UserId AvsSync _ ()
|
|
filterIteration now = C.mapMaybeM $ \userId -> runMaybeT $ do
|
|
let
|
|
userIteration, currentIteration :: Integer
|
|
userIteration = toInteger (hash epoch `hashWithSalt` userId) `mod` toInteger numIterations
|
|
currentIteration = toInteger iteration `mod` toInteger numIterations
|
|
-- $logDebugS "SynchronisAvs" [st|User ##{tshow (fromSqlKey userId)}: sync on #{tshow userIteration}/#{tshow numIterations}, now #{tshow currentIteration}|]
|
|
guard $ userIteration == currentIteration
|
|
return $ AvsSync userId now pause
|
|
|
|
-- dispatchJobSynchroniseAvs' :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX
|
|
-- dispatchJobSynchroniseAvs' numIterations epoch iteration pause = JobHandlerAtomic $ do
|
|
|
|
|
|
dispatchJobSynchroniseAvsId :: AvsPersonId -> Maybe Day -> JobHandler UniWorX
|
|
dispatchJobSynchroniseAvsId apid pause = JobHandlerException $ do
|
|
ok <- runDB $ getBy (UniqueUserAvsId apid) >>=
|
|
\case
|
|
(Just Entity{entityVal=UserAvs{userAvsUser=uid}}) -> do -- known user
|
|
workJobSychronizeAvs uid pause
|
|
return True
|
|
_ -> -- unknown avsPersonId, attempt to create user
|
|
return False
|
|
unless ok $ void $ maybeCatchAll $ upsertAvsUserById apid
|
|
|
|
|
|
dispatchJobSynchroniseAvsUser :: UserId -> Maybe Day -> JobHandler UniWorX
|
|
dispatchJobSynchroniseAvsUser uid pause = JobHandlerException $ runDB $ workJobSychronizeAvs uid pause
|
|
|
|
workJobSychronizeAvs :: UserId -> Maybe Day -> DB ()
|
|
workJobSychronizeAvs uid pause = do
|
|
now <- liftIO getCurrentTime
|
|
-- void $ E.upsert
|
|
-- AvsSync { avsSyncUser = uid
|
|
-- , avsSyncCreationTime = now
|
|
-- , avsSyncPause = pause
|
|
-- }
|
|
-- [ \oldSync -> (AvsSyncPause E.=. E.greatest (E.val pause) (oldSync E.^. AvsSyncPause)) oldSync ] -- causes Esqueleto to call undefined at Database.Esqueleto.Internal.Internal.renderUpdates:1308
|
|
maybeM
|
|
(insert_ AvsSync{avsSyncUser=uid, avsSyncCreationTime=now, avsSyncPause=pause})
|
|
(\Entity{entityKey=asid, entityVal=AvsSync{avsSyncPause=oldPause}} ->
|
|
update asid [AvsSyncPause =. max pause oldPause, AvsSyncCreationTime =. now])
|
|
(getBy $ UniqueAvsSyncUser uid)
|
|
queueJob' JobSynchroniseAvsQueue
|
|
|
|
dispatchJobSynchroniseAvsQueue :: JobHandler UniWorX
|
|
dispatchJobSynchroniseAvsQueue = JobHandlerException $ do
|
|
syncJob <- runDB $
|
|
selectFirst [] [Asc AvsSyncCreationTime] >>= \case
|
|
Nothing -> return Nothing -- nothing more to do
|
|
Just Entity{entityKey=asid, entityVal=AvsSync{..}} -> do
|
|
delete asid
|
|
getBy (UniqueUserAvsUser avsSyncUser) >>= \case
|
|
Just uae@Entity{entityVal=UserAvs{userAvsLastSynch} }
|
|
| maybe True (utctDay userAvsLastSynch <) avsSyncPause -> return $ Just uae
|
|
_other -> return Nothing -- we just updated this one within the given limit or the entity does not exist
|
|
|
|
ifMaybeM syncJob () $ \Entity{entityKey=avsKey, entityVal=UserAvs{userAvsPersonId=apid}} -> do
|
|
void $ queueJob JobSynchroniseAvsNext
|
|
catch (void $ upsertAvsUserById apid) -- already updates UserAvsLastSynch
|
|
(\exc -> do
|
|
now <- liftIO getCurrentTime
|
|
let excMsg = tshow exc <> " at " <> tshow now
|
|
runDB (update avsKey [UserAvsLastSynchError =. Just excMsg, UserAvsLastSynch =. now])
|
|
case exc of
|
|
AvsInterfaceUnavailable -> return () -- ignore and retry later
|
|
AvsUserUnknownByAvs _ -> return () -- ignore for users no longer listed in AVS
|
|
otherExc -> throwM otherExc
|
|
)
|
|
|
|
-- needed, since JobSynchroniseAvsQueue cannot requeue itself due to JobNoQueueSame (and having no parameters)
|
|
dispatchJobSynchroniseAvsNext :: JobHandler UniWorX
|
|
dispatchJobSynchroniseAvsNext = JobHandlerException $ void $ queueJob JobSynchroniseAvsQueue
|